# Datapipe - зачем и как
Datapipe это инструмент для инкрементальной обработки данных. Интуитивно идея проста - при наличии некоторого количества табличных (или представимых в виде таблицы, например папка с фалйами) данных, мы хотим отслеживать и обрабатывать изменения в них по частям.  
Мы рассмотрим простой пример: пользователи сайта генерирую события, из которых нужно получать и хранить информацию о пользователях. 
Обычно задача решается записью событий в таблицу, после чего происходит парсинг всех данных по расписанию. С datapipe нам  
  
а) не нужно каждый раз обрабатывать весь массив данных и  
б) мы можем получать изменения во всех производных от основной таблицах в реальном времени. 

Пример будет очень простой: пользователь с \<user_id\> и \<ip\> сделал клик на сайте. В таблице events мы будем записывать каждое событие на сайте, агрегировать клики для каждого пользователя в таблице click_count, хранить и обновлять айпи адреса всех пользователей в таблице ip.
  
Сначала мы разберемся из чего состоит datapipe (он же труба), потом как туда попадают, и как редактируются данные. 
  
*
*Если вы не пользовались блокнотами - каждая ячейка выполняет блок кода внутри, переменные, полученные из разных ячеек, хранятся в общей памяти. Просто нажимайте shift + enter в каждой ячейке по порядку, и все будет работать.*

Каждый пайплайн в трубе состоит из трех главных сущностей:
- Catalog
- Datastore
- Pipeline  
  
Разберем по порядку:

# Catalog
В Сatalog описаны все таблицы, входящие в наш пайплайн. Каждую таблицу мы будем называть Store. Store бывают разные. Каждый умеет читать и записывать данные в свой тип хранилища. Оцените красоту идеи - в пайплайне могут одновременно находиться папки с файлами, таблицы в базе данных, файлы excel, и все это будет работать как одна система.  


In [1]:
# from IPython.display import clear_output
# ! pip install git+https://github.com/epoch8/datapipe.git
# clear_output()

Для каждой таблицы в каталоге нам нужно описать схему:

In [2]:
from sqlalchemy import Column, String, JSON, Integer
import json

In [3]:
events_schema = [
    Column("user_id", String(), primary_key=True),
    Column("event_id", String(), primary_key=True),
    Column("event", JSON())
]
click_count_schema = [
    Column('user_id', String(), primary_key=True),
    Column('click_count', Integer())
]
ip_schema = [
    Column('user_id', String(), primary_key=True),
    Column('ip', String())
]

Из соображений лаконичности все таблицы будут все лежать в одной БД, но как мы уже упоминали, это необязательно.

In [4]:
! rm store.sqlite

In [5]:
from datapipe.store.database import DBConn
dbconn = DBConn("sqlite:///store.sqlite")
from sqlalchemy.engine import create_engine
from sqlalchemy.orm import Session
engine = create_engine("sqlite:///store.sqlite")

Теперь можно обьявлять каталог

In [6]:
from datapipe.compute import Catalog, Table
from datapipe.store.database import TableStoreDB

In [7]:
events_store = TableStoreDB(
    name='events',                  # имя, с которым таблица будет храниться в БД
    dbconn=dbconn,                  # соединение с бд, в которой будет храниться таблица
    data_sql_schema=events_schema   # схема таблицы
)

click_store = TableStoreDB(
    name='click_count',
    dbconn=dbconn,
    data_sql_schema=click_count_schema
)

ip_store = TableStoreDB(
    name='ip',
    dbconn=dbconn,
    data_sql_schema=ip_schema
)

In [8]:
catalog = Catalog({
    'events': Table(store=events_store),  # по этому имени мы будем обращаться к таблице в каталоге
    'click_count': Table(store=click_store),
    'ip': Table(store=ip_store)
})

- Catalog    - готово
- Datastore
- Pipeline 

# Datastore  
Datastore нужен для хранения и отслеживания изменений в каталоге. Если в исходной таблице пайплайна поменяется какая-то строка, datastore будет знать, какая именно строка изменилась. Он позволяет не обрабатывать каждый раз весь массив данных во всех таблицах. 

In [9]:
from datapipe.compute import DataStore
import pandas as pd

In [10]:
ds = DataStore(dbconn)

- Catalog    - готово
- Datastore  - готово
- Pipeline 

# Pipeline
Мы подошли к главному - pipeline описывает связи между таблицами в каталоге.  
Переход от одной таблицы к другой, сопровождающийся каким-то преобразованием данных, мы называем Step.  
Таким образом пайплайн состоит из Шагов, у каждого из которых есть входящие и исходящие таблицы.  
Тут важно обратить внимание на то, что у каждого шага может быть больше одной входящей и исходящей таблицы. В datapipe можно строить сложные графы обработки данных.

In [11]:
def parse_clicks_ip_step(df: pd.DataFrame) -> pd.DataFrame: 
    res = []

    res_ips = []

    for user_id, grp in df.groupby("user_id"):
        events = grp['event'].apply(json.loads)
        click_count = sum([1 for x in events if x['event_type'] == 'click'])
        res.append({'user_id': user_id, 'click_count': click_count})
        res_ips.append({'user_id': user_id, 'ip': events.iloc[-1]['ip']})

    return (
        pd.DataFrame.from_records(res),
        pd.DataFrame.from_records(res_ips),
    )

In [12]:
from datapipe.compute import Pipeline, build_compute
from datapipe.core_steps import BatchTransform

In [13]:
pipeline = Pipeline(steps=[
    BatchTransform(                                   # Шаг между таблицами описывается сущностью BatchTransform
        parse_clicks_ip_step,
        inputs=["events"],
        outputs=["click_count", "ip"],
    )
])

In [14]:
# build_compute собирает граф вычислений для всех описанных нами выше сущностей.
steps = build_compute(ds, catalog, pipeline)

- Catalog    - готово
- Datastore  - готово
- Pipeline   - готово

# API и обновление данных

In [15]:
from fastapi import FastAPI
from fastapi.testclient import TestClient

In [16]:
app = FastAPI()

В нашем АПИ всего один метод. Он получает на вход событие вида:  
 
"user_id": ...,  
    "event_id": ...,  
    "event": {"event_type": "click", "ip": "..."}       
    
и при его получении запускает пересчет пайплайна.

In [17]:
from datapipe.types import ChangeList
from datapipe.compute import run_steps_changelist
import json
from pydantic import BaseModel

class UpdateDataRequest(BaseModel):
    user_id: str
    event_id: str
    event: dict


In [18]:
@app.post("/update-data")
def update_data(req: UpdateDataRequest):
    # Мы можем обращаться к таблицам в каталоге напрямую. 
    dt = catalog.get_datatable(ds, 'events')
    
    # в ChangeList мы будем хранить индексы строк,
    # измененных или добавленных в исходную таблицу пайплайна  
    cl = ChangeList()

    # с помощью метода .store_chunk мы запишем входящие данные
    # в исходную таблицу пайплайна. Если 
    chunk = pd.DataFrame({
        'user_id': req.user_id,
        'event_id': req.event_id,
        'event': json.dumps(req.event)
    }, index=[0])
    idx = dt.store_chunk(chunk)

    # Сделанные нами изменения запишем в ChangeList
    cl.append('events', idx)

    # Запустим пропагацию изменений по всем таблицам в пайплайне
    run_steps_changelist(ds, steps, cl)

    return {
        "result": "ok"
    }


Попробуем добавить события в пайплайн.

In [19]:
user_event_1 = {
            "user_id": 1,
            "event_id": 1,
            "event": {
                "event_type": "click",
                "ip": "111.111.111.111",
            }
        }


In [20]:
client = TestClient(app)
item = client.post("/update-data", json=user_event_1).json()
item

100%|██████████| 1/1 [00:00<00:00,  1.77it/s]


{'result': 'ok'}

Убедимся, что в таблицы добавились данные

In [21]:
events_dt = catalog.get_datatable(ds, 'events')
print(events_dt.get_data())
click_count_dt = catalog.get_datatable(ds, 'click_count')
print(click_count_dt.get_data())
ip_dt = catalog.get_datatable(ds, 'ip')
print(ip_dt.get_data())

  user_id event_id                                             event
0       1        1  {"event_type": "click", "ip": "111.111.111.111"}
  user_id  click_count
0       1            1
  user_id               ip
0       1  111.111.111.111


Теперь добавим еще одно событие для этого же пользователя

In [22]:
user_event_2 = {
            "user_id": 1,
            "event_id": 2,
            "event": {
                "event_type": "click",
                "ip": "222.222.222.222",
            }
        }
item = client.post("/update-data", json=user_event_2).json()

100%|██████████| 1/1 [00:00<00:00,  1.18it/s]


In [23]:
events_dt = catalog.get_datatable(ds, 'events')
print(events_dt.get_data())
click_count_dt = catalog.get_datatable(ds, 'click_count')
print(click_count_dt.get_data())
ip_dt = catalog.get_datatable(ds, 'ip')
print(ip_dt.get_data())

  user_id event_id                                             event
0       1        1  {"event_type": "click", "ip": "111.111.111.111"}
1       1        2  {"event_type": "click", "ip": "222.222.222.222"}
  user_id  click_count
0       1            2
  user_id               ip
0       1  222.222.222.222


Работает! В таблице events сохранились оба уникальных события, таблица click_count увеличила счетчик, в таблице ip обновился ip адрес пользователя, который изменился в последнем запросе. 

Давайте посмотрим, как пайплайн обработает событие, в котором оба ключа уже имеются в наших таблицах.  
Предположим, что мы нам не гарантированы уникальные события, например мы хотим переписать данные в имеющихся ключах. 

In [26]:
user_event_3 = {
            "user_id": 1,
            "event_id": 1,
            "event": {
                "event_type": "clack",
                "ip": "333.333.333.333",
            }
        }
item = client.post("/update-data", json=user_event_3).json()

100%|██████████| 1/1 [00:00<00:00,  1.29it/s]


In [27]:
events_dt = catalog.get_datatable(ds, 'events')
print(events_dt.get_data())
click_count_dt = catalog.get_datatable(ds, 'click_count')
print(click_count_dt.get_data())
ip_dt = catalog.get_datatable(ds, 'ip')
print(ip_dt.get_data())

  user_id event_id                                             event
0       1        2  {"event_type": "click", "ip": "222.222.222.222"}
1       1        1  {"event_type": "clack", "ip": "333.333.333.333"}
  user_id  click_count
0       1            1
  user_id               ip
0       1  333.333.333.333


Событие, которое хранилось в таблице по полученным ключам, обновилось вместе со всеми производными от него таблицами!
Важно заметить, что последнее записанное нами в пайплайн событие оказалось в конце таблицы.  
