In [1]:
import os
from pino_inferior.core import DATA_DIR
from sqlalchemy import Column, String, Text, DateTime
from datapipe.compute import Catalog, Table, Pipeline, run_pipeline
from datapipe.step.batch_transform import BatchTransform
from datapipe.datatable import DataStore
from datapipe.store.database import DBConn, TableStoreDB
import pandas as pd
import json
import math
import requests
import time
import re
from goose3 import Goose
from datetime import datetime
from langchain.embeddings import OpenAIEmbeddings
from pino_inferior.core import OPENAI_API_KEY, VECTOR_DB, VECTOR_DB_PARAMS, MEMORY_PARAMS
from pino_inferior.models import aengine
from pino_inferior.memory import Memory, SequentialSplitter, ParagraphSplitter, MarkdownHeaderTextSplitter
from typing import List

In [2]:
EXPERIMENT_DATA_DIRECTORY = os.path.join(DATA_DIR, "05_dump_data")
EXPERIMENT_DATA_DIRECTORY

'F:\\Projects\\pino-inferior\\pino_inferior\\data\\05_dump_data'

In [3]:
def _get_links():
    link_files = [item for item in os.listdir(EXPERIMENT_DATA_DIRECTORY) if item.lower().endswith(".json")]
    links = []
    for item in link_files:
        with open(os.path.join(EXPERIMENT_DATA_DIRECTORY, item), 'r') as src:
            links += json.load(src)
    return pd.DataFrame({"link": links})


df_links = _get_links()
df_links.head()


Unnamed: 0,link
0,https://zona.media/news/2023/10/20/msk-spb
1,https://zona.media/news/2023/10/20/prekratili-...
2,https://zona.media/news/2023/10/20/prigovor
3,https://zona.media/news/2023/10/20/eu
4,https://zona.media/news/2023/10/20/openart


In [4]:
DB_FNAME = os.path.join(EXPERIMENT_DATA_DIRECTORY, 'experiment.sqlite3')

dbconn = DBConn(f"sqlite:///{DB_FNAME}")
dbconn

<datapipe.store.database.DBConn at 0x1b80a3999d0>

## Pipeline data structure

In [5]:
catalog = Catalog({
    "links": Table(
        TableStoreDB(
            dbconn=dbconn,
            name="links",
            create_table=True,
            data_sql_schema=[
                Column("link", String(2048), primary_key=True)
            ]
        )
    ),
    "html": Table(
        TableStoreDB(
            dbconn=dbconn,
            name="html",
            create_table=True,
            data_sql_schema=[
                Column("link", String(2048), primary_key=True),
                Column("html", Text, primary_key=True),
            ]
        )
    ),
    "parsed": Table(
        TableStoreDB(
            dbconn=dbconn,
            name="parsed",
            create_table=True,
            data_sql_schema=[
                Column("link", String(2048), primary_key=True),
                Column("title", String(2048)),
                Column("meta_description", String(2048)),
                Column("datetime_string", String(128)),
                Column("text", Text),
            ]
        )
    ),
    "parsed_datetime": Table(
        TableStoreDB(
            dbconn=dbconn,
            name="parsed_datetime",
            create_table=True,
            data_sql_schema=[
                Column("link", String(2048), primary_key=True),
                Column("datetime", DateTime),
            ]
        )
    ),
    "saved_documents": Table(
        TableStoreDB(
            dbconn=dbconn,
            name="saved_documents",
            create_table=True,
            data_sql_schema=[
                Column("link", String(2048), primary_key=True),
            ]
        )
    )
})

In [6]:
ds = DataStore(dbconn, create_meta_table=True)

In [7]:
catalog.init_all_tables(ds)

## Saving input data

In [8]:
input_batch_size = 1000
input_batch_count = int(math.ceil(df_links.shape[0] / input_batch_size))
for i in range(input_batch_count):
    catalog.get_datatable(ds, "links").store_chunk(
        df_links.iloc[i * input_batch_size : (i + 1) * input_batch_size]
    )

## Conversions

### Reading publications

In [9]:
TIMEOUT = 1.0


def _read_link(url: str) -> str:
    response = requests.get(url)
    assert response.status_code == 200
    return response.content.decode("utf-8")


def read_links(df: pd.DataFrame) -> pd.DataFrame:
    records = []
    for _, row in df.iterrows():
        link = row["link"]
        try:
            html = _read_link(link)
            records.append({"link": link, "html": html})
        finally:
            time.sleep(TIMEOUT)
    return pd.DataFrame.from_records(records)

### Parsing publications

In [10]:
def parse_page(df: pd.DataFrame) -> pd.DataFrame:
    goose = Goose()
    result = []
    for _, row in df.iterrows():
        link = row["link"]
        html = row["html"]
        try:
            article = goose.extract(raw_html=html)
            record = {
                "link": link,
                "title": article.title,
                "meta_description": article.meta_description,
                "datetime_string": article.publish_date,
                "text": article.cleaned_text,
            }
            result.append(record)
        except:
            pass
    df = pd.DataFrame.from_records(result)
    df["datetime_string"] = df["datetime_string"].fillna(
        str(datetime.now()),
    )
    return df
            

### Parsing date / time

In [11]:
DATETIME_MONTHS_CONVERSION = {
    "январ.*\s": "january ",
    "феврал.*\s": "february ",
    "март.*\s": "march ",
    "апрел.*\s": "april ",
    "май\s": "may ",
    "июн.*\s": "june ",
    "июл.*\s": "july ",
    "август.*\s": "august ",
    "сентябр.*\s": "september ",
    "октябр.*\s": "october ",
    "ноябр.*\s": "november ",
    "декабр.*\s": "december ",
}


def convert_datetime(df):
    df = df.copy()
    for key, value in DATETIME_MONTHS_CONVERSION.items():
        df["datetime_string"] = df["datetime_string"].str.replace(key, value, regex=True)
    datetimes = []
    for _, row in df.iterrows():
        try:
            datetimes.append(pd.to_datetime(row["datetime_string"]))
        except:
            datetimes.append(datetime.now())
    df["datetime"] = datetimes
    return df[["link", "datetime"]]

### Saving document to retriever database

In [12]:
sentence_encoder = OpenAIEmbeddings(
    openai_api_key=OPENAI_API_KEY,
    model="text-embedding-ada-002",
)
memory = Memory(
    engine=aengine,
    vector_db=VECTOR_DB(
        embedding_function=OpenAIEmbeddings(
            openai_api_key=OPENAI_API_KEY,
            model="text-embedding-ada-002",
        ),
        **VECTOR_DB_PARAMS
    ),
    **MEMORY_PARAMS
)

In [13]:
def _add_documents(titles: List[str],
                   links: List[str],
                   publication_datetimes: List[datetime],
                   meta_descriptions: List[str],
                   texts: List[str],
                   memory: Memory):
    headers_to_split_on = [
        ("#", "Header1"),
        ("##", "Header2"),
        ("###", "Header3"),
    ]
    paragraph_splitter = SequentialSplitter(
        [
            MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on),
            ParagraphSplitter(),
        ]
    )
    paragraphs = []
    for title, link, publication_datetime, meta_description, text in zip(titles,
                                                                         links,
                                                                         publication_datetimes,
                                                                         meta_descriptions,
                                                                         texts):
        md = f"# {title}\n\n{meta_description}\n\n{text}"
        paragraphs += paragraph_splitter.split_text(md)
    memory.store(paragraphs)


def build_add_document_conversion(memory: Memory):
    def _func(df_source: pd.DataFrame, df_parsed_datetime: pd.DataFrame) -> pd.DataFrame:
        df = df_source.merge(df_parsed_datetime, left_on=["link"], right_on=["link"])
        _add_documents(
            titles=df["title"].tolist(),
            links=df["link"].tolist(),
            publication_datetimes=df["datetime"].tolist(),
            meta_descriptions=df["meta_description"].tolist(),
            texts=df["text"].tolist(),
            memory=memory,
        )
        return df[["link"]]
    
    return _func

In [14]:
pipeline = Pipeline([
    BatchTransform(
        read_links,
        inputs=["links"],
        outputs=["html"],
        chunk_size=1,
    ),
    BatchTransform(
        parse_page,
        inputs=["html"],
        outputs=["parsed"],
        chunk_size=1,
    ),
    BatchTransform(
        convert_datetime,
        inputs=["parsed"],
        outputs=["parsed_datetime"],
        chunk_size=1,
    ),
    BatchTransform(
        build_add_document_conversion(memory),
        inputs=["parsed", "parsed_datetime"],
        outputs=["saved_documents"],
        chunk_size=10,
    )
])

In [15]:
run_pipeline(ds, catalog, pipeline)