Skip to content

Commit

Permalink
Merge pull request #4 from dockhardman/feature_wiki_query_logic
Browse files Browse the repository at this point in the history
Feature wiki query logic
  • Loading branch information
dockhardman committed May 30, 2023
2 parents 0f4ef4c + ace4941 commit d78c7bb
Show file tree
Hide file tree
Showing 13 changed files with 561 additions and 214 deletions.
19 changes: 19 additions & 0 deletions app/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,25 @@ def __init__(self, **kwargs):
"LOG_SERVICE_FILENAME", "service.log"
)

# Service Config
self.max_top_k: int = 20

# Language Config
self.detect_languages = [
"ENGLISH",
"SPANISH",
"GERMAN",
"JAPANESE",
"FRENCH",
"RUSSIAN",
"ITALIAN",
"CHINESE",
"POLISH",
"DUTCH",
"PORTUGUESE",
"SWEDISH",
]

# OpenAI Config
self.OPENAI_API_KEY = environ.get("OPENAI_API_KEY")

Expand Down
3 changes: 2 additions & 1 deletion app/app/deps/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .document_store import get_document_store
from .language import language_detector
from .timer import click_timer


__all__ = ["click_timer", "get_document_store"]
__all__ = ["click_timer", "get_document_store", "language_detector"]
7 changes: 7 additions & 0 deletions app/app/deps/language.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from sanic.request import Request

from lingua import LanguageDetector


def language_detector(request: Request) -> "LanguageDetector":
return request.app.ctx.language_detector
2 changes: 1 addition & 1 deletion app/app/document_store/qdrant.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async def upsert(self, documents: List[DocumentWithEmbedding]) -> List[Text]:
vector=doc.embedding,
payload=asdict(doc),
)
_point.payload["created_at"] = created_at
_point.payload["metadata"]["created_at"] = created_at
points.append(_point)

self.client.upsert(
Expand Down
2 changes: 2 additions & 0 deletions app/app/exceptions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class NotFound(Exception):
pass
94 changes: 90 additions & 4 deletions app/app/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
from dataclasses import asdict
from typing import List, Text
from typing import List, Optional, Text

import aiohttp
import openai
import sanic
from dacite import from_dict
from lingua import Language, LanguageDetector, LanguageDetectorBuilder
from openai.openai_object import OpenAIObject
from pyassorted.datetime import Timer
from sanic_ext import openapi
Expand All @@ -13,8 +15,9 @@
from sanic.response import text as PlainTextResponse, json as JsonResponse

from app.config import logger, settings
from app.deps import click_timer, get_document_store
from app.deps import click_timer, get_document_store, language_detector
from app.document_store import QdrantDocumentStore
from app.resource.wiki import WikiClient
from app.schema import api as api_model
from app.schema.openai import OpenaiEmbeddingResult

Expand All @@ -29,6 +32,7 @@ async def before_server_start(*_):
# Set OpenAI credential
openai.api_key = settings.OPENAI_API_KEY
logger.debug("Have set OpenAI credential.")

# Create document store client
doc_store = QdrantDocumentStore(collection_name=settings.QDRANT_COLLECTION)
app.ctx.document_store = doc_store
Expand All @@ -40,6 +44,26 @@ async def before_server_start(*_):
f'Failed to touch document store: "{doc_store.host}:{doc_store.port}"'
)

# Language detection
detect_languages: List["Language"] = []
for _lang in settings.detect_languages:
try:
detect_languages.append(Language[_lang.upper()])
except KeyError:
logger.warning(f"Language {_lang} not supported.")
detect_languages = detect_languages or [Language.ENGLISH]
app.ctx.language_detector = LanguageDetectorBuilder.from_languages(
*detect_languages
).build()
logger.debug(
"Have set language detector with languages: "
+ f"{', '.join([l.name for l in detect_languages])}."
)

# Wiki client
app.ctx.wiki_client = WikiClient()
logger.debug("Wiki client has been initialized.")

@app.signal("openai.embedding.text")
async def openai_embedding_text(texts: List[Text], **context) -> List[List[float]]:
texts = [texts] if isinstance(texts, Text) else texts
Expand All @@ -50,6 +74,38 @@ async def openai_embedding_text(texts: List[Text], **context) -> List[List[float
emb_res: OpenaiEmbeddingResult = emb_res_obj.to_dict_recursive()
return [emb["embedding"] for emb in emb_res["data"]]

@app.signal("wiki.documents.fetch_and_upsert")
async def wiki_documents_fetch_and_upsert(
query: Text, top_k: int, exclude_names: Optional[List[Text]] = None, **kwargs
) -> None:
wiki_client: "WikiClient" = app.ctx.wiki_client
lang_detector: "LanguageDetector" = app.ctx.language_detector

query = query.strip()
exclude_names = exclude_names or []
language = lang_detector.detect_language_of(query)

docs = await wiki_client.async_query(
query=query,
lang=language.iso_code_639_1.name.lower(),
top_k=top_k,
exclude_titles=exclude_names,
)
docs = [doc for doc in docs if doc.metadata["name"] not in exclude_names]
if not docs:
return

async with aiohttp.ClientSession() as session:
async with session.post(
"http://localhost/upsert",
json=asdict(api_model.UpsertCall(documents=docs)),
) as resp:
resp.raise_for_status()
logger.info(
f"Upserted {len(docs)} documents from Wiki: "
+ f"{', '.join([doc.metadata['name'] for doc in docs])}."
)

@app.get("/")
async def root(request: "Request"):
return PlainTextResponse("OK")
Expand Down Expand Up @@ -96,7 +152,10 @@ async def upsert(request: "Request", doc_store: "QdrantDocumentStore"):
body=api_model.QueryCall,
response=api_model.QueryResponse,
)
async def query(request: "Request", doc_store: "QdrantDocumentStore"):
async def query(
request: "Request",
doc_store: "QdrantDocumentStore",
):
try:
query_call = from_dict(data_class=api_model.QueryCall, data=request.json)
except Exception:
Expand All @@ -113,6 +172,32 @@ async def query(request: "Request", doc_store: "QdrantDocumentStore"):
]

query_results = await doc_store.query(queries=emb_queries)

for _query_call, query_result in zip(query_call.queries, query_results):
if any(map(lambda doc: doc.score >= 0.9, query_result.results)):
logger.debug(
"Skip wiki fetch. We have enough score with query "
+ f"'{query_result.query}'."
)
continue # Skip if we have enough score

fetch_and_upsert_wiki_docs_task = request.app.dispatch(
"wiki.documents.fetch_and_upsert",
context=dict(
query=query_result.query,
top_k=_query_call.top_k,
exclude_names=[
doc.metadata["name"]
for doc in query_result.results
if doc.metadata.get("name")
],
),
)
app.add_task(
fetch_and_upsert_wiki_docs_task,
name=f"Task-wiki.documents.fetch_and_upsert-({query_result.query},)",
)

return JsonResponse(asdict(api_model.QueryResponse(results=query_results)))

except Exception as e:
Expand Down Expand Up @@ -161,8 +246,9 @@ async def dispatch_embeddings(
return embeddings

# Dependencies injection
app.ext.add_dependency(Timer, click_timer)
app.ext.add_dependency(LanguageDetector, language_detector)
app.ext.add_dependency(QdrantDocumentStore, get_document_store)
app.ext.add_dependency(Timer, click_timer)

# Blueprint

Expand Down
Empty file added app/app/resource/__init__.py
Empty file.
159 changes: 159 additions & 0 deletions app/app/resource/wiki.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, List, Optional, Set, Text, Union

from mediawiki import MediaWiki
from mediawiki.exceptions import PageError
from pyassorted.asyncio import run_func

from app.config import logger, settings
from app.exceptions import NotFound
from app.schema.models import Document


class WikiClient:
max_timeout: int = 120
default_timeout: int = 15
max_top_k: int = settings.max_top_k
max_sentences: int = 8
max_chars: int = 4000

def __init__(
self,
default_lang: Text = "en",
top_k: int = 5,
sentences: int = 4,
chars: int = 0,
timeout: float = default_timeout,
concurrent: int = 2,
):
self.default_lang = default_lang
self.default_client = MediaWiki(lang=self.default_lang, timeout=timeout)
self.lang_client_map: Dict[Text, MediaWiki] = {
self.default_lang: self.default_client
}
self.top_k = min(top_k, self.max_top_k)
self.sentences = min(sentences, self.max_sentences)
self.chars = min(chars, self.max_chars)
self.timeout = (
self.default_timeout if timeout < 0 else min(timeout, self.max_timeout)
)
self.concurrent = int(concurrent) or None

@property
def supported_languages(self) -> Set[Text]:
return set(self.default_client.supported_languages.keys())

def get_client(self, lang: Optional[Text] = None) -> "MediaWiki":
lang = lang.lower().strip() if lang else self.default_lang

if lang not in self.supported_languages:
raise ValueError(f"Language {lang} not supported.")

if lang not in self.lang_client_map:
self.lang_client_map[lang] = MediaWiki(lang=lang, timeout=self.timeout)

return self.lang_client_map[lang]

def query(
self,
query: Text,
lang: Optional[Text] = None,
top_k: Optional[int] = None,
sentences: Optional[int] = None,
chars: Optional[int] = None,
timeout: Optional[float] = None,
exclude_titles: Optional[List[Text]] = None,
) -> List[Document]:
query = query.strip()
lang = lang.lower().strip() if lang else self.default_lang
top_k = min(top_k, self.max_top_k) if top_k else self.top_k
sentences = min(sentences, self.max_sentences) if sentences else self.sentences
chars = min(chars, self.max_chars) if chars else self.chars
timeout = timeout or self.timeout

wiki_client = self.get_client(lang=lang)

(titles, suggestion) = wiki_client.search(
query=query, results=top_k, suggestion=True
)
if suggestion:
titles.append(suggestion)
logger.debug(f"Query '{query}' to wiki({lang}) returned titles: {titles}")

titles = [title for title in titles if title not in exclude_titles]

title_to_content: Dict[Text, Optional[Union[Text, Exception]]] = {
title: None for title in titles
}
with ThreadPoolExecutor(max_workers=self.concurrent) as executor:
future_to_title = {
executor.submit(
self._request_by_title,
wiki_client=wiki_client,
title=title,
sentences=sentences,
chars=chars,
): title
for title in titles
}
for future in as_completed(future_to_title):
title = future_to_title[future]
try:
page_content = future.result()
title_to_content[title] = page_content
except Exception as e:
title_to_content[title] = e

docs: List[Document] = []
for title, _content in title_to_content.items():
if isinstance(_content, Exception):
logger.exception(_content)
continue

if not _content:
logger.error(f"Wiki page {title} not found.")
continue

content = _content or ""
doc = Document(
text=content,
metadata=dict(name=title, title=title, source="wiki", lang=lang),
)
docs.append(doc)
return docs

async def async_query(
self,
query: Text,
lang: Optional[Text] = None,
top_k: Optional[int] = None,
sentences: Optional[int] = None,
chars: Optional[int] = None,
timeout: Optional[float] = None,
exclude_titles: Optional[List[Text]] = None,
) -> List[Document]:
docs = await run_func(
self.query,
query=query,
lang=lang,
top_k=top_k,
sentences=sentences,
chars=chars,
timeout=timeout,
exclude_titles=exclude_titles,
)
return docs

def _request_by_title(
self, wiki_client: "MediaWiki", title: Text, sentences: int, chars: int
) -> Text:
try:
wiki_page = wiki_client.page(title)
page_text = wiki_page.summarize(sentences=sentences, chars=chars)
return page_text

except PageError as e:
raise NotFound(f"Wiki page {title} not found: {e}")

except Exception as e:
raise e
5 changes: 4 additions & 1 deletion app/app/schema/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Text

from app.config import settings


@dataclass
class _WithScore:
Expand Down Expand Up @@ -51,11 +53,12 @@ class DocumentWithScore(DocumentWithEmbedding, _WithScore):
class Query:
query: Text
filter: Optional[Dict[Text, Any]] = None
top_k: Optional[int] = 3
top_k: Optional[int] = 5

def __post_init__(self):
self.query = self.query.strip()
self.filter = self.filter or {}
self.top_k = min(self.top_k, settings.max_top_k)

def with_embedding(self, embedding: List[float]) -> "QueryWithEmbedding":
return QueryWithEmbedding(
Expand Down
Loading

0 comments on commit d78c7bb

Please sign in to comment.