在第 6 章Goophr Concierge中,我们构建了负责接收新文档并将其分解为令牌以用于索引的端点。但是,当前 Concierge 的api.indexAdder
实现在将令牌打印到控制台后返回。在本章中,我们将实现 Goophr Library,它可以与礼宾部交互以接受令牌,也可以响应令牌搜索查询。
在本章中,我们将研究以下主题:
- 标准索引模型
- 倒排索引模型
- 文档索引器
- 查询解析器 API
考虑一本书中的索引。每本书都有自己的索引,按字母顺序列出所有单词,显示它们在书中的位置。然而,如果我们想跟踪多本书中的单词出现情况,那么检查每本书的索引是非常低效的。让我们看一个例子。
假设我们有三本书:Book 1
、Book 2
和Book 3
,下面是它们各自的索引。每个单词旁边的数字表示单词出现在哪一页:
* Book 1 (Index)
- apple - 4, 10, 20
- cat - 10, 21, 22
- zebra - 15, 25, 63
* Book 2 (Index)
- banana - 14, 19, 66
- cake - 10, 37, 45
- zebra - 67, 100, 129
* Book 3 (Index)
- apple - 36, 55, 74
- cake - 1, 9, 77
- Whale - 11, 59, 79
让我们试着从书的索引中找出三个单词。一种幼稚的方法可能是挑选每本书并扫描它,直到我们找到或错过了单词:
apple
banana
parrot
* Searching for 'apple'
- Scanning Book 1\. Result: Found.
- Scanning Book 2\. Result: Not Found.
- Scanning Book 3\. Result: Found.
* Searching for 'banana'
- Scanning Book 1\. Result: Not Found.
- Scanning Book 2\. Result: Found.
- Scanning Book 3\. Result: Not Found.
* Searching for 'parrot'
- Scanning Book 1\. Result: Not Found.
- Scanning Book 2\. Result: Not Found.
- Scanning Book 3\. Result: Not Found.
简而言之,对于每一个术语,我们都会遍历每一个图书索引并搜索该词。我们对每一个单词都经历了这个过程,包括parrot
,这在任何一本书中都没有出现!起初,这似乎是可以接受的,但明智的做法是,当我们有超过一百万本书要涉猎时,我们认识到,这种做法是不切实际的。
根据上述示例,我们可以说明以下内容:
- 我们需要快速查找以确定索引中是否存在一个单词
- 对于任何给定的单词,我们需要有一种有效的方法来列出该单词可能出现的所有书籍
我们可以通过使用反向索引来实现这两个细节。标准索引的映射顺序为book→ 字→ 出现(页面、行等)如上例所示。如果我们使用倒排索引,映射顺序将变成字→ 书→ 出现(页面、行等)。
这一变化似乎意义不大;但是,它大大改进了查找。让我们用另一个例子来看一下。
让我们以与之前相同的示例中的数据为例,但现在根据倒排索引进行分类:
* apple
- Book 1 - 4, 10, 20
- Book 3 - 36, 55, 74
* banana
- Book 2 - 14, 19, 66
* cake
- Book 2 - 10, 37, 45
- Book 3 - 1, 9, 77
* cat
- Book 1 - 10, 21, 22
* whale
- Book 3 - 11, 59, 79
* zebra
- Book 1 - 15, 25, 63
- Book 2 - 67, 100, 129
通过此设置,我们可以有效地回答以下问题:
- 索引中有单词吗?
- 一个词存在于哪些书中?
- 在给定的书中,单词出现在哪几页上?
让我们再次尝试从倒排索引中查找三个单词:
apple
banana
parrot
* Searching for 'apple'
- Scanning Inverted Index. Result: Found a list of books.
* Searching for 'banana'
- Scanning Inverted Index. Result: Found a list of books.
* Searching for 'parrot'
- Scanning Inverted Index. Result: Not Found.
总而言之,我们不是浏览每本书,而是查找每一个术语,确定该术语是否存在,如果存在,则返回书籍列表,这是我们的最终目标。
排名和搜索结果的相关性是一个有趣而复杂的话题。所有主要的搜索引擎都有一组专门的软件工程师和计算机科学家,他们花费大量的时间和精力来确保他们的算法是最准确的。
对于 Goophr,我们将简化排名并限制搜索词的频率。搜索词频率越高,它在结果中的排名就越高。
让我们回顾一下图书馆员的 API 定义:
openapi: 3.0.0
servers:
- url: /api
info:
title: Goophr Librarian API
version: '1.0'
description: |
API responsible for indexing & communicating with Goophr Concierge.
paths:
/index:
post:
description: |
Add terms to index.
responses:
'200':
description: |
Terms were successfully added to the index.
'400':
description: >
Request was not processed because payload was incomplete or
incorrect.
content:
application/json:
schema:
$ref: '#/components/schemas/error'
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/terms'
description: |
List of terms to be added to the index.
required: true
/query:
post:
description: |
Search for all terms in the payload.
responses:
'200':
description: |
Returns a list of all the terms along with their frequency,
documents the terms appear in and link to the said documents.
content:
application/json:
schema:
$ref: '#/components/schemas/results'
'400':
description: >
Request was not processed because payload was incomplete or
incorrect.
content:
application/json:
schema:
$ref: '#/components/schemas/error'
parameters: []
components:
schemas:
error:
type: object
properties:
msg:
type: string
term:
type: object
required:
- title
- token
- doc_id
- line_index
- token_index
properties:
title:
description: |
Title of the document to which the term belongs.
type: string
token:
description: |
The term to be added to the index.
type: string
doc_id:
description: |
The unique hash for each document.
type: string
line_index:
description: |
Line index at which the term occurs in the document.
type: integer
token_index:
description: |
Position of the term in the document.
type: integer
terms:
type: object
properties:
code:
type: integer
data:
type: array
items:
$ref: '#/components/schemas/term'
results:
type: object
properties:
count:
type: integer
data:
type: array
items:
$ref: '#/components/schemas/result'
result:
type: object
properties:
doc_id:
type: string
score:
type: integer
根据 API 定义,我们可以说明以下内容:
- 所有通信都是通过 JSON 格式进行的
- 图书馆员的两个终点是:
/api/index
和/api/query
/api/index
使用POST
方法向反向索引添加新令牌/api/query
使用POST
方法接收搜索查询词,并返回索引包含的所有文档的列表
/api/index
的主要目的是接受礼宾部的代币,并将其添加到索引中。让我们看看“将它们添加到索引”的含义。
文档索引可以定义为以下一组连续任务:
- 我们依靠有效载荷为我们提供存储令牌所需的所有元信息。
- 我们沿着反向索引树,在尚未创建的路径中创建任何节点,最后添加令牌细节。
/api/query
的主要目的是在倒排索引中找到搜索词集,并按相关性降序返回文档 ID 列表。让我们看看“查询搜索词”和“相关性”是什么意思。
查询解析可以定义为以下一组连续任务:
- 对于每一个搜索词,我们都希望以倒排索引的形式检索所有可用的书籍。
- 接下来,我们想在一个简单的查找表(
map
中存储每本书中所有单词的出现次数。 - 一旦我们有了一个包含书籍及其各自计数的地图,我们就可以将查找表转换为一个有序文档 ID 及其各自分数的数组。
本章中的代码非常简单,遵循与第 6 章、Goophr 礼宾部相同的代码约定。让我们直接进入代码。
现在我们已经详细讨论了 Library 的设计,让我们看看项目结构和源代码:
$ tree . ├── api │ ├── index.go │ └── query.go ├── common │ ├── helpers.go ├── Dockerfile ├── main.go
两个目录和五个文件!
现在让我们看看每个文件的源代码。
源文件负责初始化路由、启动索引系统和启动 web 服务器:
package main
import (
"net/http"
"github.com/last-ent/distributed-go/chapter7/goophr/librarian/api"
"github.com/last-ent/distributed-go/chapter7/goophr/librarian/common"
)
func main() {
common.Log("Adding API handlers...")
http.HandleFunc("/api/index", api.IndexHandler)
http.HandleFunc("/api/query", api.QueryHandler)
common.Log("Starting index...")
api.StartIndexSystem()
common.Log("Starting Goophr Librarian server on port :9090...")
http.ListenAndServe(":9090", nil)
}
源文件由专用于一个处理程序的代码组成。
package common
import (
"fmt"
"log"
)
func Log(msg string) {
log.Println("INFO - ", msg)
}
func Warn(msg string) {
log.Println("---------------------------")
log.Println(fmt.Sprintf("WARN: %s", msg))
log.Println("---------------------------")
}
源文件,包含要处理的代码并向索引中添加新术语。
package api
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)
// tPayload is used to parse the JSON payload consisting of Token data.
type tPayload struct {
Token string 'json:"token"'
Title string 'json:"title"'
DocID string 'json:"doc_id"'
LIndex int 'json:"line_index"'
Index int 'json:"token_index"'
}
type tIndex struct {
Index int
LIndex int
}
func (ti *tIndex) String() string {
return fmt.Sprintf("i: %d, li: %d", ti.Index, ti.LIndex)
}
type tIndices []tIndex
// document - key in Indices represent Line Index.
type document struct {
Count int
DocID string
Title string
Indices map[int]tIndices
}
func (d *document) String() string {
str := fmt.Sprintf("%s (%s): %d\n", d.Title, d.DocID, d.Count)
var buffer bytes.Buffer
for lin, tis := range d.Indices {
var lBuffer bytes.Buffer
for _, ti := range tis {
lBuffer.WriteString(fmt.Sprintf("%s ", ti.String()))
}
buffer.WriteString(fmt.Sprintf("@%d -> %s\n", lin, lBuffer.String()))
}
return str + buffer.String()
}
// documentCatalog - key represents DocID.
type documentCatalog map[string]*document
func (dc *documentCatalog) String() string {
return fmt.Sprintf("%#v", dc)
}
// tCatalog - key in map represents Token.
type tCatalog map[string]documentCatalog
func (tc *tCatalog) String() string {
return fmt.Sprintf("%#v", tc)
}
type tcCallback struct {
Token string
Ch chan tcMsg
}
type tcMsg struct {
Token string
DC documentCatalog
}
// pProcessCh is used to process /index's payload and start process to add the token to catalog (tCatalog).
var pProcessCh chan tPayload
// tcGet is used to retrieve a token's catalog (documentCatalog).
var tcGet chan tcCallback
func StartIndexSystem() {
pProcessCh = make(chan tPayload, 100)
tcGet = make(chan tcCallback, 20)
go tIndexer(pProcessCh, tcGet)
}
// tIndexer maintains a catalog of all tokens along with where they occur within documents.
func tIndexer(ch chan tPayload, callback chan tcCallback) {
store := tCatalog{}
for {
select {
case msg := <-callback:
dc := store[msg.Token]
msg.Ch <- tcMsg{
DC: dc,
Token: msg.Token,
}
case pd := <-ch:
dc, exists := store[pd.Token]
if !exists {
dc = documentCatalog{}
store[pd.Token] = dc
}
doc, exists := dc[pd.DocID]
if !exists {
doc = &document{
DocID: pd.DocID,
Title: pd.Title,
Indices: map[int]tIndices{},
}
dc[pd.DocID] = doc
}
tin := tIndex{
Index: pd.Index,
LIndex: pd.LIndex,
}
doc.Indices[tin.LIndex] = append(doc.Indices[tin.LIndex], tin)
doc.Count++
}
}
}
func IndexHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
w.Write([]byte('{"code": 405, "msg": "Method Not Allowed."}'))
return
}
decoder := json.NewDecoder(r.Body)
defer r.Body.Close()
var tp tPayload
decoder.Decode(&tp)
log.Printf("Token received%#v\n", tp)
pProcessCh <- tp
w.Write([]byte('{"code": 200, "msg": "Tokens are being added to index."}'))
}
源文件包含根据搜索词返回排序结果的代码。
package api
import (
"encoding/json"
"net/http"
"sort"
"github.com/last-ent/distributed-go/chapter7/goophr/librarian/common"
)
type docResult struct {
DocID string 'json:"doc_id"'
Score int 'json:"doc_score"'
Indices tIndices 'json:"token_indices"'
}
type result struct {
Count int 'json:"count"'
Data []docResult 'json:"data"'
}
// getResults returns unsorted search results & a map of documents containing tokens.
func getResults(out chan tcMsg, count int) tCatalog {
tc := tCatalog{}
for i := 0; i < count; i++ {
dc := <-out
tc[dc.Token] = dc.DC
}
close(out)
return tc
}
func getFScores(docIDScore map[string]int) (map[int][]string, []int) {
// fScore maps frequency score to set of documents.
fScore := map[int][]string{}
fSorted := []int{}
for dID, score := range docIDScore {
fs := fScore[score]
fScore[score] = []string{}
}
fScore[score] = append(fs, dID)
fSorted = append(fSorted, score)
}
sort.Sort(sort.Reverse(sort.IntSlice(fSorted)))
return fScore, fSorted
}
func getDocMaps(tc tCatalog) (map[string]int, map[string]tIndices) {
// docIDScore maps DocIDs to occurences of all tokens.
// key: DocID.
// val: Sum of all occurences of tokens so far.
docIDScore := map[string]int{}
docIndices := map[string]tIndices{}
// for each token's catalog
for _, dc := range tc {
// for each document registered under the token
for dID, doc := range dc {
// add to docID score
var tokIndices tIndices
for _, tList := range doc.Indices {
tokIndices = append(tokIndices, tList...)
}
docIDScore[dID] += doc.Count
dti := docIndices[dID]
docIndices[dID] = append(dti, tokIndices...)
}
}
return docIDScore, docIndices
}
func sortResults(tc tCatalog) []docResult {
docIDScore, docIndices := getDocMaps(tc)
fScore, fSorted := getFScores(docIDScore)
results := []docResult{}
addedDocs := map[string]bool{}
for _, score := range fSorted {
for _, docID := range fScore[score] {
if _, exists := addedDocs[docID]; exists {
continue
}
results = append(results, docResult{
DocID: docID,
Score: score,
Indices: docIndices[docID],
})
addedDocs[docID] = false
}
}
return results
}
// getSearchResults returns a list of documents.
// They are listed in descending order of occurences.
func getSearchResults(sts []string) []docResult {
callback := make(chan tcMsg)
for _, st := range sts {
go func(term string) {
tcGet <- tcCallback{
Token: term,
Ch: callback,
}
}(st)
}
cts := getResults(callback, len(sts))
results := sortResults(cts)
return results
}
func QueryHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
w.Write([]byte('{"code": 405, "msg": "Method Not Allowed."}'))
return
}
decoder := json.NewDecoder(r.Body)
defer r.Body.Close()
var searchTerms []string
decoder.Decode(&searchTerms)
results := getSearchResults(searchTerms)
payload := result{
Count: len(results),
Data: results,
}
if serializedPayload, err := json.Marshal(payload); err == nil {
w.Header().Add("Content-Type", "application/json")
w.Write(serializedPayload)
} else {
common.Warn("Unable to serialize all docs: " + err.Error())
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte('{"code": 500, "msg": "Error occurred while trying to retrieve documents."}'))
}
}
为了测试图书管理员是否按预期工作,我们需要测试两件事:
- 检查
/api/index
是否接受索引项。 - 检查
/api/query
是否按预期顺序返回正确的结果。
我们可以使用单独的程序/脚本feeder.go
测试点 1,使用简单的 cURL 命令测试点 2。
下面是检查/api/index
是否接受索引项的feeder.go
脚本:
package main
import (
"bytes"
"encoding/json"
"io/ioutil"
"log"
"net/http"
)
type tPayload struct {
Token string 'json:"token"'
Title string 'json:"title"'
DocID string 'json:"doc_id"'
LIndex int 'json:"line_index"'
Index int 'json:"token_index"'
}
type msgS struct {
Code int 'json:"code"'
Msg string 'json:"msg"'
}
func main() {
// Searching for "apple" should return Book 1 at the top of search results.
// Searching for "cake" should return Book 3 at the top.
for bookX, terms := range map[string][]string{
"Book 1": []string{"apple", "apple", "cat", "zebra"},
"Book 2": []string{"banana", "cake", "zebra"},
"Book 3": []string{"apple", "cake", "cake", "whale"},
} {
for lin, term := range terms {
payload, _ := json.Marshal(tPayload{
Token: term,
Title: bookX + term,
DocID: bookX,
LIndex: lin,
})
resp, err := http.Post(
"http://localhost:9090/api/index",
"application/json",
bytes.NewBuffer(payload),
)
if err != nil {
panic(err)
}
body, _ := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
var msg msgS
json.Unmarshal(body, &msg)
log.Println(msg)
}
}
}
运行feeder.go
(图书管理员在其他窗口运行)的输出如下:
$ go run feeder.go
2018/01/04 12:53:31 {200 Tokens are being added to index.}
2018/01/04 12:53:31 {200 Tokens are being added to index.}
2018/01/04 12:53:31 {200 Tokens are being added to index.}
2018/01/04 12:53:31 {200 Tokens are being added to index.}
2018/01/04 12:53:31 {200 Tokens are being added to index.}
2018/01/04 12:53:31 {200 Tokens are being added to index.}
2018/01/04 12:53:31 {200 Tokens are being added to index.}
2018/01/04 12:53:31 {200 Tokens are being added to index.}
2018/01/04 12:53:31 {200 Tokens are being added to index.}
2018/01/04 12:53:31 {200 Tokens are being added to index.}
2018/01/04 12:53:31 {200 Tokens are being added to index.}
上述程序的图书管理员输出如下:
$ go run goophr/librarian/main.go
2018/01/04 12:53:25 INFO - Adding API handlers...
2018/01/04 12:53:25 INFO - Starting index...
2018/01/04 12:53:25 INFO - Starting Goophr Librarian server on port :9090...
2018/01/04 12:53:31 Token received api.tPayload{Token:"banana", Title:"Book 2banana", DocID:"Book 2", LIndex:0, Index:0}
2018/01/04 12:53:31 Token received api.tPayload{Token:"cake", Title:"Book 2cake", DocID:"Book 2", LIndex:1, Index:0}
2018/01/04 12:53:31 Token received api.tPayload{Token:"zebra", Title:"Book 2zebra", DocID:"Book 2", LIndex:2, Index:0}
2018/01/04 12:53:31 Token received api.tPayload{Token:"apple", Title:"Book 3apple", DocID:"Book 3", LIndex:0, Index:0}
2018/01/04 12:53:31 Token received api.tPayload{Token:"cake", Title:"Book 3cake", DocID:"Book 3", LIndex:1, Index:0}
2018/01/04 12:53:31 Token received api.tPayload{Token:"cake", Title:"Book 3cake", DocID:"Book 3", LIndex:2, Index:0}
2018/01/04 12:53:31 Token received api.tPayload{Token:"whale", Title:"Book 3whale", DocID:"Book 3", LIndex:3, Index:0}
2018/01/04 12:53:31 Token received api.tPayload{Token:"apple", Title:"Book 1apple", DocID:"Book 1", LIndex:0, Index:0}
2018/01/04 12:53:31 Token received api.tPayload{Token:"apple", Title:"Book 1apple", DocID:"Book 1", LIndex:1, Index:0}
2018/01/04 12:53:31 Token received api.tPayload{Token:"cat", Title:"Book 1cat", DocID:"Book 1", LIndex:2, Index:0}
2018/01/04 12:53:31 Token received api.tPayload{Token:"zebra", Title:"Book 1zebra", DocID:"Book 1", LIndex:3, Index:0}
为了测试/api/query
我们需要维护服务器的前一个状态,以进行有用的查询:
$ # Querying for "apple" $ curl -LX POST -d '["apple"]' localhost:9090/api/query | jq % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 202 100 193 100 9 193 9 0:00:01 --:--:-- 0:00:01 40400 { "count": 2, "data": [ { "doc_id": "Book 1", "doc_score": 2, "token_indices": [ { "Index": 0, "LIndex": 0 }, { "Index": 0, "LIndex": 1 } ] }, { "doc_id": "Book 3", "doc_score": 1, "token_indices": [ { "Index": 0, "LIndex": 0 } ] } ] } $ # Querying for "cake"
$ curl -LX POST -d '["cake"]' localhost:9090/api/query | jq % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 201 100 193 100 8 193 8 0:00:01 --:--:-- 0:00:01 33500 { "count": 2, "data": [ { "doc_id": "Book 3", "doc_score": 2, "token_indices": [ { "Index": 0, "LIndex": 1 }, { "Index": 0, "LIndex": 2 } ] }, { "doc_id": "Book 2", "doc_score": 1, "token_indices": [ { "Index": 0, "LIndex": 1 } ] } ] }
在本章中,我们了解了倒排索引,并为图书管理员实现了倒排索引,以便高效地存储和查找搜索词。我们还使用脚本、feeder.go
和 cURL 命令检查了实现。
在下一章第 8 章部署 Goophr中,我们将重写礼宾部的api.indexAdder
,以便礼宾部可以开始将要编制索引的代币发送给图书管理员。我们还将重新访问docker-compose.yaml
,以便能够运行完整的应用程序,并将其作为分布式系统使用/测试。