In [13]:
from datapipe.datatable import DataStore, DBConn
from datapipe.compute import Catalog, Pipeline, Table, run_pipeline
from datapipe.store.database import TableStoreDB
from datapipe.store.redis import RedisStore
from datapipe.core_steps import BatchTransform, UpdateExternalTable
from datapipe.types import data_to_index
from sqlalchemy.engine import create_engine
from sqlalchemy import inspect
from sqlalchemy import Column, String, JSON, Integer, Boolean
import pandas as pd
import redis

In [14]:
prim_key_a = ['a', 'b']
prim_key_b = ['e', 'f']

a = pd.DataFrame({'a': [1,2,3], 'b': [4,5,6], 'c': [7,8,9], 'd': ['a', 'b', 'c']})
b = pd.DataFrame({'e': [1,2,3], 'f': [4,5,6], 'g': [7,8,9], 'h': ['a', 'b', 'c']})

c = a[prim_key_a].itertuples(index=False, name=None)
d = b[prim_key_b].itertuples(index=False, name=None)

for keys, values in zip(c, d):
    print(keys, values)

(1, 4) (1, 4)
(2, 5) (2, 5)
(3, 6) (3, 6)


In [15]:
# entities:
# DBConn - database connection (conn url string or conn object itself. question - does dbconn only eat sqlalchemy?)
# DataStore - metatables data storage (entity where hashes and info about changes is stored)
# Table - data table (could be in db, or in file? question - are all sources of data described as Table?)
# Catalog - entity that describes DataTables
# Store - entity with methods for different kinds of storage.  could be TableStoreDB, TableDataSingleFileStore, folder etc...
# TableStoreDB - table data stored in database.
# Table schemas described in sqlaclchemy terms and classes

In [16]:
DBCONN = "postgresql://postgres:testpass@localhost:5432/test"
engine = create_engine(DBCONN)
# two separate connections needed for datastore and main tables
engine.execute('''
DROP SCHEMA public CASCADE;
CREATE SCHEMA public;''')
dbconn = DBConn(DBCONN)
meta_dbconn = DBConn(DBCONN)
ds = DataStore(meta_dbconn)
redis_conn_mid = redis.Redis(decode_responses=False)

Attempting to instrument while already instrumented
Attempting to instrument while already instrumented


# Start Pipe

# MID PIPE

In [17]:
INPUT_SCHEMA = [
    Column("user_id", String, primary_key=True),
    Column("guest_id", String, primary_key=True),
    Column("click_count", Integer)
]

INTERM_SCHEMA = [
    Column('user_id', String, primary_key=True),
    Column('click_count_doubled', Integer)
]

OUTPUT_SCHEMA = [
    Column("user_id", String, primary_key=True),
    Column("click_count_doubled", Integer),
    Column("click_count_doubled_squared", Integer)
]

In [18]:
catalog = Catalog({
    "pipeline_start": Table(store=TableStoreDB(dbconn, "test_input", INPUT_SCHEMA)),
    "pipeline_mid": Table(store=RedisStore(redis_conn_mid,'test_mid', INTERM_SCHEMA)),
    "pipeline_end": Table(store=TableStoreDB(dbconn, "test_output", OUTPUT_SCHEMA))
})

In [19]:
def double(df):
    df['click_count_doubled'] = df['click_count'] * 2
    df.drop(columns=['click_count', 'guest_id'], inplace=True)
    print(df)
    return df

def double_and_square(df):
    print(df)
    df['click_count_doubled_squared'] = df['click_count_doubled']**2
    return df

In [20]:
pipeline = Pipeline([
    UpdateExternalTable('pipeline_start'),
    BatchTransform(
        double,
        inputs=["pipeline_start"],
        outputs=["pipeline_mid"],
    ),
    BatchTransform(
        double_and_square,
        inputs=['pipeline_mid'],
        outputs=['pipeline_end']
    )
])

In [21]:
run_pipeline(ds, catalog, pipeline)
input_dt = catalog.get_datatable(ds, "pipeline_start")
output_dt = catalog.get_datatable(ds, "pipeline_end")
print(input_dt.get_data())
print(output_dt.get_data())

1it [00:00, 27.81it/s]

Empty DataFrame
Columns: [user_id, guest_id, click_count]
Index: []
Empty DataFrame
Columns: [user_id, click_count_doubled, click_count_doubled_squared]
Index: []





In [22]:
engine.execute('''
insert into test_input values ('abc', 'guest_id', 123)
''')
run_pipeline(ds, catalog, pipeline)

1it [00:00, 11.75it/s]
100%|██████████| 1/1 [00:00<00:00, 12.49it/s]


  user_id  click_count_doubled
0     abc                  246


100%|██████████| 1/1 [00:00<00:00, 10.13it/s]

  user_id  click_count_doubled
0     abc                  246





In [23]:
input_dt = catalog.get_datatable(ds, "pipeline_start")
mid_dt = catalog.get_datatable(ds, 'pipeline_mid')
output_dt = catalog.get_datatable(ds, "pipeline_end")
print(input_dt.get_data())
print(mid_dt.get_data())
print(output_dt.get_data())

  user_id  guest_id  click_count
0     abc  guest_id          123
  user_id  click_count_doubled
0     abc                  246
  user_id  click_count_doubled  click_count_doubled_squared
0     abc                  246                        60516


In [24]:
engine.execute('''
delete from test_input where user_id = 'abc'
''')
run_pipeline(ds, catalog, pipeline)
input_dt = catalog.get_datatable(ds, "pipeline_start")
mid_dt = catalog.get_datatable(ds, 'pipeline_mid')
output_dt = catalog.get_datatable(ds, "pipeline_end")
print(input_dt.get_data())
print(mid_dt.get_data())
print(output_dt.get_data())

1it [00:00, 18.33it/s]
100%|██████████| 1/1 [00:00<00:00,  8.07it/s]
100%|██████████| 1/1 [00:00<00:00,  1.02it/s]

Empty DataFrame
Columns: [user_id, guest_id, click_count]
Index: []
Empty DataFrame
Columns: [user_id, click_count_doubled]
Index: []
Empty DataFrame
Columns: [user_id, click_count_doubled, click_count_doubled_squared]
Index: []



