In [2]:
import pyarrow.parquet as pq

In [3]:
import numpy as np
import pandas as pd
import pyarrow as pa

In [7]:
df = pd.DataFrame({'one': [-1, np.nan, 2.5],
                   'two': ['foo', 'bar', 'baz'],
                   'three': [True, False, True]},
                   )
df

Unnamed: 0,one,two,three
0,-1.0,foo,True
1,,bar,False
2,2.5,baz,True


In [4]:
table = pa.Table.from_pandas(df)
table

pyarrow.Table
one: double
two: string
three: bool
----
one: [[-1,null,2.5]]
two: [["foo","bar","baz"]]
three: [[true,false,true]]

In [5]:
pq.write_table(table, 'example.parquet')

In [6]:
table2 = pq.read_table('example.parquet')

In [7]:
table2.to_pandas()

Unnamed: 0,one,two,three
0,-1.0,foo,True
1,,bar,False
2,2.5,baz,True


In [8]:
pq.read_table('example.parquet', columns=['one', 'three']).to_pandas()

Unnamed: 0,one,three
0,-1.0,True
1,,False
2,2.5,True


In [9]:
pq.read_table('example.parquet', columns=['one', 'three'], filters=[('one', '==', 2.5)]).to_pandas()

Unnamed: 0,one,three
0,2.5,True


In [65]:
df3 = pd.DataFrame({('x', 'one'): [-1, np.nan, 2.5],
                   ('x', 'two'): ['foo', 'bar', 'baz'],
                   ('x', 'three'): [True, False, True]},
                   )
df3

Unnamed: 0_level_0,x,x,x
Unnamed: 0_level_1,one,two,three
0,-1.0,foo,True
1,,bar,False
2,2.5,baz,True


In [66]:
table3 = pa.Table.from_pandas(df3).replace_schema_metadata(None)

In [67]:
pq.write_to_dataset(table3, root_path='multiColumnExample.parquet')

In [70]:
pq.read_table('multiColumnExample.parquet', #columns=[('x', 'one'), ('x', 'three')]
             ).to_pandas()

Unnamed: 0,"('x', 'one')","('x', 'two')","('x', 'three')"
0,-1.0,foo,True
1,,bar,False
2,2.5,baz,True


that means we can't save the multiindex. so that's ok

In [78]:
if isinstance(df3.columns, pd.MultiIndex):
    df3.columns = df3.columns.droplevel()
df3
table3 = pa.Table.from_pandas(df3).replace_schema_metadata(None)
pq.write_to_dataset(table3, root_path='multiColumnExample2.parquet')
df4 = pq.read_table('multiColumnExample2.parquet', columns=['one', 'three']).to_pandas()
df4.columns = pd.MultiIndex.from_product([['x'], df4.columns])
df4

Unnamed: 0_level_0,x,x
Unnamed: 0_level_1,one,three
0,-1.0,True
1,,False
2,2.5,True
3,-1.0,True
4,,False
5,2.5,True


In [86]:
df5 = pd.DataFrame({('y', 'one'): [-1, np.nan, 2.5],
                   ('y', 'two'): ['foo', 'bar', 'baz'],
                   ('y', 'three'): [True, False, True]},
                   )
df5

Unnamed: 0_level_0,y,y,y
Unnamed: 0_level_1,one,two,three
0,-1.0,foo,True
1,,bar,False
2,2.5,baz,True


In [88]:
df6 = pd.DataFrame({('y', 'one'): [100],
                   ('y', 'two'): ['111'],
                   ('y', 'three'): [True]},
                   index=[3]
                   )
df6

Unnamed: 0_level_0,y,y,y
Unnamed: 0_level_1,one,two,three
3,100,111,True


In [91]:
df7 = pd.merge(df4,df5, how='outer', left_index=True, right_index=True)
df7

Unnamed: 0_level_0,x,x,y,y,y
Unnamed: 0_level_1,one,three,one,two,three
0,-1.0,True,-1.0,foo,True
1,,False,,bar,False
2,2.5,True,2.5,baz,True
3,-1.0,True,,,
4,,False,,,
5,2.5,True,,,


In [95]:
[x for x in df6.columns]

[('y', 'one'), ('y', 'two'), ('y', 'three')]

In [97]:
# append insert
df7.loc[df6.index, [x for x in df6.columns]] = df6
df7

Unnamed: 0_level_0,x,x,y,y,y
Unnamed: 0_level_1,one,three,one,two,three
0,-1.0,True,-1.0,foo,True
1,,False,,bar,False
2,2.5,True,2.5,baz,True
3,-1.0,True,100.0,111,True
4,,False,,,
5,2.5,True,,,


In [110]:
df6 = pd.DataFrame({('y', 'one'): [99],
                   ('y', 'two'): ['99'],
                   ('y', 'three'): [False]},
                   index=[2.5]
                   )
df6

Unnamed: 0_level_0,y,y,y
Unnamed: 0_level_1,one,two,three
2.5,99,99,False


In [114]:
df7

Unnamed: 0_level_0,x,x,y,y,y
Unnamed: 0_level_1,one,three,one,two,three
0,-1.0,True,-1.0,foo,True
1,,False,,bar,False
2,2.5,True,2.5,baz,True
3,-1.0,True,100.0,111,True
4,,False,,,
5,2.5,True,,,


In [118]:
df5.index.values[0] in df7.index.values

True

In [119]:
#pd.merge(df7, df6, how='outer', left_index=True, right_index=True)
#df7.loc[df6.index] = None
df7.append(df6).sort_index().fillna(method='ffill')
#pd.concat([df7.iloc[:df6.index], df6, df.loc[df6.index:]]).reset_index(drop=True)
#df7.loc[df6.index, [x for x in df6.columns]] 


Unnamed: 0_level_0,x,x,y,y,y
Unnamed: 0_level_1,one,three,one,three,two
0.0,-1.0,True,-1.0,True,foo
1.0,-1.0,False,-1.0,False,bar
2.0,2.5,True,2.5,True,baz
2.5,2.5,True,99.0,False,99
3.0,-1.0,True,100.0,True,111
4.0,-1.0,False,100.0,True,111
5.0,2.5,True,100.0,True,111


In [8]:
df8 = pd.DataFrame({('z', 'y', 'one'): [100],
                   ('z', 'y', 'two'): ['111'],
                   ('z', 'y', 'three'): [True]},
                   index=[3]
                   )
df8

Unnamed: 0_level_0,z,z,z
Unnamed: 0_level_1,y,y,y
Unnamed: 0_level_2,one,two,three
3,100,111,True


In [9]:
if isinstance(df8.columns, pd.MultiIndex):
    df8.columns = df8.columns.droplevel() # source
if isinstance(df8.columns, pd.MultiIndex):
    df8.columns = df8.columns.droplevel() # stream
df8

Unnamed: 0,one,two,three
3,100,111,True


In [16]:
df8.columns = pd.MultiIndex.from_product([['source'],['stream'], df8.columns])
df8

Unnamed: 0_level_0,source,source,source
Unnamed: 0_level_1,stream,stream,stream
Unnamed: 0_level_2,one,two,three
3,100,111,True


In [59]:
df8.loc[:, [('source', 'stream', 'one'), ('source', 'stream', 'two')]]

Unnamed: 0_level_0,source,source
Unnamed: 0_level_1,stream,stream
Unnamed: 0_level_2,one,two
3,100,111


#### if we use `write_to_dataset` we can automatically append
append seemed difficult with `write_table` and `pq.ParquetWriter`

In [10]:
dfNew = pd.DataFrame({'one':[3.0], 'two': ['B'], 'three': [True]})
dfNew

Unnamed: 0,one,two,three
0,3.0,B,True


In [11]:
df.append(dfNew)

Unnamed: 0,one,two,three
0,-1.0,foo,True
1,,bar,False
2,2.5,baz,True
0,3.0,B,True


In [12]:
df

Unnamed: 0,one,two,three
0,-1.0,foo,True
1,,bar,False
2,2.5,baz,True


In [146]:
table.schema.remove_metadata()

one: double
two: string
three: bool

In [13]:
newTable = pa.Table.from_pandas(dfNew).replace_schema_metadata(None)

In [14]:
newTable.schema

one: double
two: string
three: bool

In [15]:
pq.write_to_dataset(newTable, root_path='sample.parquet')

In [16]:
pq.write_to_dataset(pa.Table.from_pandas(df), root_path='sample.parquet')

In [17]:
pq.read_table('sample.parquet').to_pandas()

Unnamed: 0,one,two,three
0,-1.0,foo,True
1,,bar,False
2,2.5,baz,True
3,3.0,B,True


In [156]:
pq.read_table('sample.parquet', columns=['one', 'three'], filters=[('one', '==', 2.5)]).to_pandas()

Unnamed: 0,one,three
0,2.5,True


#### bad way to append:

```
pqwriter = pq.ParquetWriter(
    'example.parquet', 
    table.schema.remove_metadata(),
    #table.replace_schema_metadata(None),
)
#pqwriter.write_table(df)
```

```
pqwriter.write_table(
    pa.Table.from_pandas(dfNew).replace_schema_metadata(None), 
    'example.parquet',

    #schema=table.schema.remove_metadata()
)
```

```
# close the parquet writer
if pqwriter:
    pqwriter.close()
```

---

### merge the data into one dataframe
once we pull the various data set from disk, we need to merge the dataframes into one, filling in values where the dates do not match up

In [18]:
import datetime as dt

In [19]:
x = dt.datetime.now()
x.ctime()

'Sun Apr  3 10:09:51 2022'

In [20]:
str(dt.datetime.utcnow()-dt.timedelta(hours=1))

'2022-04-03 15:09:52.231143'

In [36]:
df1 = pd.DataFrame([1, 2, 3],
    columns=pd.MultiIndex.from_product([['streamId'], ['key']]),
    index = [str(dt.datetime.utcnow()-dt.timedelta(hours=2)), 
        str(dt.datetime.utcnow()-dt.timedelta(hours=1)), 
        str(dt.datetime.utcnow())],)
df1

Unnamed: 0_level_0,streamId
Unnamed: 0_level_1,key
2022-04-03 18:14:08.092166,1
2022-04-03 19:14:08.092166,2
2022-04-03 20:14:08.092166,3


In [62]:
df2 = pd.DataFrame(
    {('a','b'):[1,2,3], ('a','c'):[5,6,7]},
    index = [str(dt.datetime.utcnow()-dt.timedelta(hours=2)), 
        str(dt.datetime.utcnow()-dt.timedelta(hours=2)), 
        str(dt.datetime.utcnow())],)
df2.loc[:, [('a','b')]]

Unnamed: 0_level_0,a
Unnamed: 0_level_1,b
2022-04-03 18:33:26.333997,1
2022-04-03 18:33:26.333997,2
2022-04-03 20:33:26.333997,3


In [40]:
from functools import reduce
def merge(dfs:list[pd.DataFrame]):
    ''' meant for combining multiple datasets (multiple streams) '''
    for df in dfs:
        df.index = pd.to_datetime(df.index)
    return reduce(
        lambda left, right: pd.merge(
            left, 
            right, 
            how='outer',
            left_index=True,
            right_index=True).fillna(method='ffill'),
        dfs)

In [41]:
merge([df1, df2])

Unnamed: 0_level_0,streamId,streamId2
Unnamed: 0_level_1,key,key
2022-04-03 18:14:08.092166,1.0,
2022-04-03 18:14:58.380649,1.0,a
2022-04-03 18:14:58.380649,1.0,b
2022-04-03 19:14:08.092166,2.0,b
2022-04-03 20:14:08.092166,3.0,b
2022-04-03 20:14:58.380649,3.0,c


In [42]:
index1 = pd.MultiIndex.from_product([["variables"], ["number", "fruit"]])
df1 = pd.DataFrame({
    'dt': [
        str(dt.datetime.utcnow()-dt.timedelta(hours=2)), 
        str(dt.datetime.utcnow()-dt.timedelta(hours=1)), 
        str(dt.datetime.utcnow())],
    'value': [1, 2, 3],})
df1

Unnamed: 0,dt,value
0,2022-04-03 18:18:30.334950,1
1,2022-04-03 19:18:30.334950,2
2,2022-04-03 20:18:30.334950,3


In [22]:
df1.index = df1['dt']
df1

Unnamed: 0_level_0,dt,value
dt,Unnamed: 1_level_1,Unnamed: 2_level_1
2022-04-03 14:10:00.419600,2022-04-03 14:10:00.419600,1
2022-04-03 15:10:00.419600,2022-04-03 15:10:00.419600,2
2022-04-03 16:10:00.419600,2022-04-03 16:10:00.419600,3


In [24]:
df1.index = pd.to_datetime(df1.index)
df1

Unnamed: 0_level_0,dt,value
dt,Unnamed: 1_level_1,Unnamed: 2_level_1
2022-04-03 14:10:00.419600,2022-04-03 14:10:00.419600,1
2022-04-03 15:10:00.419600,2022-04-03 15:10:00.419600,2
2022-04-03 16:10:00.419600,2022-04-03 16:10:00.419600,3


In [26]:
df2 = pd.DataFrame({
    'dt': [
        str(dt.datetime.utcnow()-dt.timedelta(minutes=130)), 
        str(dt.datetime.utcnow()-dt.timedelta(minutes=120)), 
        str(dt.datetime.utcnow()-dt.timedelta(minutes=110)), 
        str(dt.datetime.utcnow()-dt.timedelta(minutes=100)), 
        str(dt.datetime.utcnow()-dt.timedelta(minutes=90)), 
        str(dt.datetime.utcnow()-dt.timedelta(minutes=80)), 
        str(dt.datetime.utcnow()-dt.timedelta(minutes=70)), 
        str(dt.datetime.utcnow()-dt.timedelta(minutes=60)), 
        str(dt.datetime.utcnow()-dt.timedelta(minutes=50)), 
        str(dt.datetime.utcnow()-dt.timedelta(minutes=40)), 
        str(dt.datetime.utcnow()-dt.timedelta(minutes=30)), 
        str(dt.datetime.utcnow()-dt.timedelta(minutes=20)), 
        str(dt.datetime.utcnow()-dt.timedelta(minutes=10)), 
        str(dt.datetime.utcnow())],
    'value': [
        130,
        120,
        110,
        100,
        90,
        80,
        70,
        60,
        50,
        40,
        30,
        20,
        10,
        0]})
df2

Unnamed: 0,dt,value
0,2022-04-03 14:03:31.414300,130
1,2022-04-03 14:13:31.414300,120
2,2022-04-03 14:23:31.414300,110
3,2022-04-03 14:33:31.414300,100
4,2022-04-03 14:43:31.414300,90
5,2022-04-03 14:53:31.414300,80
6,2022-04-03 15:03:31.414300,70
7,2022-04-03 15:13:31.414300,60
8,2022-04-03 15:23:31.414300,50
9,2022-04-03 15:33:31.414300,40


In [27]:
df2.index = df2['dt']
df2.index = pd.to_datetime(df2.index)

In [30]:
pd.merge(df2, df1, how='outer', left_index=True, right_index=True).fillna(method='ffill')

Unnamed: 0_level_0,dt_x,value_x,dt_y,value_y
dt,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2022-04-03 14:03:31.414300,2022-04-03 14:03:31.414300,130.0,,
2022-04-03 14:10:00.419600,2022-04-03 14:03:31.414300,130.0,2022-04-03 14:10:00.419600,1.0
2022-04-03 14:13:31.414300,2022-04-03 14:13:31.414300,120.0,2022-04-03 14:10:00.419600,1.0
2022-04-03 14:23:31.414300,2022-04-03 14:23:31.414300,110.0,2022-04-03 14:10:00.419600,1.0
2022-04-03 14:33:31.414300,2022-04-03 14:33:31.414300,100.0,2022-04-03 14:10:00.419600,1.0
2022-04-03 14:43:31.414300,2022-04-03 14:43:31.414300,90.0,2022-04-03 14:10:00.419600,1.0
2022-04-03 14:53:31.414300,2022-04-03 14:53:31.414300,80.0,2022-04-03 14:10:00.419600,1.0
2022-04-03 15:03:31.414300,2022-04-03 15:03:31.414300,70.0,2022-04-03 14:10:00.419600,1.0
2022-04-03 15:10:00.419600,2022-04-03 15:03:31.414300,70.0,2022-04-03 15:10:00.419600,2.0
2022-04-03 15:13:31.414300,2022-04-03 15:13:31.414300,60.0,2022-04-03 15:10:00.419600,2.0


In [55]:
df1['dt'] = pd.to_datetime(df1['dt'])
df2['dt'] = pd.to_datetime(df2['dt'])

df3 = pd.merge(df2, df1, how='outer', on='dt')
df3

Unnamed: 0,dt,value_x,value_y
0,2022-02-16 02:52:45.794120,130.0,
1,2022-02-16 03:02:45.794120,120.0,
2,2022-02-16 03:12:45.794120,110.0,
3,2022-02-16 03:22:45.794120,100.0,
4,2022-02-16 03:32:45.794120,90.0,
5,2022-02-16 03:42:45.794120,80.0,
6,2022-02-16 03:52:45.794120,70.0,
7,2022-02-16 04:02:45.794120,60.0,
8,2022-02-16 04:12:45.794120,50.0,
9,2022-02-16 04:22:45.794120,40.0,


In [63]:
df3 = df3.sort_values('dt')
df3 = df3.reset_index(drop=True)
df3

Unnamed: 0,dt,value_x,value_y
0,2022-02-16 02:52:45.794120,130.0,
1,2022-02-16 03:00:05.915053,,1.0
2,2022-02-16 03:02:45.794120,120.0,
3,2022-02-16 03:12:45.794120,110.0,
4,2022-02-16 03:22:45.794120,100.0,
5,2022-02-16 03:32:45.794120,90.0,
6,2022-02-16 03:42:45.794120,80.0,
7,2022-02-16 03:52:45.794120,70.0,
8,2022-02-16 04:00:05.915053,,2.0
9,2022-02-16 04:02:45.794120,60.0,


In [157]:
df3 = df3.fillna(method='ffill')
df3

Unnamed: 0,dt,value_x,value_y
0,2022-02-16 02:52:45.794120,130.0,
1,2022-02-16 03:00:05.915053,130.0,1.0
2,2022-02-16 03:02:45.794120,120.0,1.0
3,2022-02-16 03:12:45.794120,110.0,1.0
4,2022-02-16 03:22:45.794120,100.0,1.0
5,2022-02-16 03:32:45.794120,90.0,1.0
6,2022-02-16 03:42:45.794120,80.0,1.0
7,2022-02-16 03:52:45.794120,70.0,1.0
8,2022-02-16 04:00:05.915053,70.0,2.0
9,2022-02-16 04:02:45.794120,60.0,2.0


In [158]:
df3 = df3.fillna(method='bfill')
df3

Unnamed: 0,dt,value_x,value_y
0,2022-02-16 02:52:45.794120,130.0,1.0
1,2022-02-16 03:00:05.915053,130.0,1.0
2,2022-02-16 03:02:45.794120,120.0,1.0
3,2022-02-16 03:12:45.794120,110.0,1.0
4,2022-02-16 03:22:45.794120,100.0,1.0
5,2022-02-16 03:32:45.794120,90.0,1.0
6,2022-02-16 03:42:45.794120,80.0,1.0
7,2022-02-16 03:52:45.794120,70.0,1.0
8,2022-02-16 04:00:05.915053,70.0,2.0
9,2022-02-16 04:02:45.794120,60.0,2.0


---
# done!
thats all the peices we needed for data manage: 
1. save the data in rdbms format
2. incrementally save it
3. pull the data from disk by query so we can pull only the last x hours or something
4. once I pull them all merge them into one dataset, filling in nulls