In [1]:
import pandas as pd

In [33]:
def dailyPercentChangeMetric(
    df:pd.DataFrame=None,
    column:str=None,
    prefix:str='Daily',
    yesterday:int=1,
) -> pd.DataFrame:

    def name() -> str:
        return f'{prefix}{column}{yesterday}'

    if df is None:
        return name()

    feature = df.loc[:, column].shift(yesterday-1) / df.loc[:, column].shift(yesterday)
    feature.name = name()
    return feature

In [34]:
def rawDataMetric(df:pd.DataFrame=None, column:str=None, prefix='Raw') -> pd.DataFrame:

    def name() -> str:
        return f'{prefix}{column}'

    if df is None:
        return name()

    feature = df.loc[:, column]
    feature.name = name()
    return feature

In [35]:
df = pd.DataFrame({'A':[1,2,3],'B':[4,5,6]})

In [36]:
df

Unnamed: 0,A,B
0,1,4
1,2,5
2,3,6


In [37]:
rawDataMetric(df, column='A')

0    1
1    2
2    3
Name: RawA, dtype: int64

In [38]:
from satori import disk

In [39]:
x = disk.Api(source='streamrSpoof', stream='simpleEURCleaned') 
df = x.gather()
df

Unnamed: 0_level_0,streamrSpoof,streamrSpoof,streamrSpoof
Unnamed: 0_level_1,simpleEURCleaned,simpleEURCleaned,simpleEURCleaned
Unnamed: 0_level_2,High,Low,Close
2022-04-15 03:47:45.405022,0.83724,0.83056,0.83577
2022-04-15 03:47:48.494228,0.8371,0.82583,0.8272
2022-04-15 03:47:51.558024,0.82802,0.8244,0.82488
2022-04-15 03:47:54.643479,0.83029,0.82345,0.82775
2022-04-15 03:47:57.742969,0.82878,0.82028,0.82055
2022-04-15 03:48:00.807706,0.82277,0.81699,0.81833
2022-04-15 03:48:03.890062,0.81981,0.81453,0.81633
2022-04-15 03:48:06.951474,0.82223,0.81526,0.82028
2022-04-15 03:48:10.079903,0.82488,0.81733,0.81806
2022-04-15 03:48:13.161523,0.8188,0.81261,0.8144


In [40]:
def rollingPercentChangeMetric(
    df:pd.DataFrame=None,
    column:str=None,
    prefix:str='Rolling',
    window:int=2,
    transformation:str='max()',
) -> pd.DataFrame:
    def name() -> str:
        return f'{prefix}{column}{window}{transformation[0:3]}'

    if df is None:
        return name()

    transactionOptions = 'sum max min mean median std count var skew kurt quantile cov corr apply'
    if (isinstance(window, int)
        and transformation.startswith(tuple(transactionOptions.split()))):
        feature = df[column] / eval(f'df[column].shift(1).rolling(window={window}).{transformation}')
        feature.name = name()
        return feature

    raise Exception('eval call on transformation failed, unable to create feature')

In [48]:
pd.MultiIndex.from_tuples([x + ('raw',)  for x in df.columns])

MultiIndex([('streamrSpoof', 'simpleEURCleaned',  'High', 'raw'),
            ('streamrSpoof', 'simpleEURCleaned',   'Low', 'raw'),
            ('streamrSpoof', 'simpleEURCleaned', 'Close', 'raw')],
           )

In [49]:
df.columns = pd.MultiIndex.from_tuples([x + ('raw',)  for x in df.columns])

In [51]:
df.head()

Unnamed: 0_level_0,streamrSpoof,streamrSpoof,streamrSpoof
Unnamed: 0_level_1,simpleEURCleaned,simpleEURCleaned,simpleEURCleaned
Unnamed: 0_level_2,High,Low,Close
Unnamed: 0_level_3,raw,raw,raw
2022-04-15 03:47:45.405022,0.83724,0.83056,0.83577
2022-04-15 03:47:48.494228,0.8371,0.82583,0.8272
2022-04-15 03:47:51.558024,0.82802,0.8244,0.82488
2022-04-15 03:47:54.643479,0.83029,0.82345,0.82775
2022-04-15 03:47:57.742969,0.82878,0.82028,0.82055


In [63]:
df['streamrSpoof', 'simpleEURCleaned', 'High', 'RollingMax3'] = rollingPercentChangeMetric(
    df=df,
    column=('streamrSpoof', 'simpleEURCleaned', 'High', 'raw'),
    prefix='Rolling',
    window=2,
    transformation='max()')
df.head()

Unnamed: 0_level_0,streamrSpoof,streamrSpoof,streamrSpoof,streamrSpoof,streamrSpoof
Unnamed: 0_level_1,simpleEURCleaned,simpleEURCleaned,simpleEURCleaned,simpleEURCleaned,simpleEURCleaned
Unnamed: 0_level_2,High,Low,Close,High,High
Unnamed: 0_level_3,raw,raw,raw,RollingMax2,RollingMax3
2022-04-15 03:47:45.405022,0.83724,0.83056,0.83577,,
2022-04-15 03:47:48.494228,0.8371,0.82583,0.8272,,
2022-04-15 03:47:51.558024,0.82802,0.8244,0.82488,0.988988,0.988988
2022-04-15 03:47:54.643479,0.83029,0.82345,0.82775,0.991865,0.991865
2022-04-15 03:47:57.742969,0.82878,0.82028,0.82055,0.998181,0.998181


In [65]:
df = df.sort_index(axis=1)
df.head()

Unnamed: 0_level_0,streamrSpoof,streamrSpoof,streamrSpoof,streamrSpoof,streamrSpoof
Unnamed: 0_level_1,simpleEURCleaned,simpleEURCleaned,simpleEURCleaned,simpleEURCleaned,simpleEURCleaned
Unnamed: 0_level_2,Close,High,High,High,Low
Unnamed: 0_level_3,raw,RollingMax2,RollingMax3,raw,raw
2022-04-15 03:47:45.405022,0.83577,,,0.83724,0.83056
2022-04-15 03:47:48.494228,0.8272,,,0.8371,0.82583
2022-04-15 03:47:51.558024,0.82488,0.988988,0.988988,0.82802,0.8244
2022-04-15 03:47:54.643479,0.82775,0.991865,0.991865,0.83029,0.82345
2022-04-15 03:47:57.742969,0.82055,0.998181,0.998181,0.82878,0.82028


In [68]:
[col for col in df.columns]

[('streamrSpoof', 'simpleEURCleaned', 'Close', 'raw'),
 ('streamrSpoof', 'simpleEURCleaned', 'High', 'RollingMax2'),
 ('streamrSpoof', 'simpleEURCleaned', 'High', 'RollingMax3'),
 ('streamrSpoof', 'simpleEURCleaned', 'High', 'raw'),
 ('streamrSpoof', 'simpleEURCleaned', 'Low', 'raw')]

In [71]:
def produceTarget(df, targetId):
    
    def produceTargetName(target:str, prefix:str='Target_'):
        return 'Target'
    
    series = df.loc[:, targetId].shift(-1)
    series.name = produceTargetName(targetId)
    return pd.DataFrame(series)

In [77]:
produceTarget(df, ('streamrSpoof', 'simpleEURCleaned', 'High', 'raw')).shape

(59, 1)

In [76]:
df.shape

(59, 5)

In [79]:
target = produceTarget(df, ('streamrSpoof', 'simpleEURCleaned', 'High', 'raw'))
df['TARGET'] = target

In [80]:
df

Unnamed: 0_level_0,streamrSpoof,streamrSpoof,streamrSpoof,streamrSpoof,streamrSpoof,TARGET
Unnamed: 0_level_1,simpleEURCleaned,simpleEURCleaned,simpleEURCleaned,simpleEURCleaned,simpleEURCleaned,Unnamed: 6_level_1
Unnamed: 0_level_2,Close,High,High,High,Low,Unnamed: 6_level_2
Unnamed: 0_level_3,raw,RollingMax2,RollingMax3,raw,raw,Unnamed: 6_level_3
2022-04-15 03:47:45.405022,0.83577,,,0.83724,0.83056,0.8371
2022-04-15 03:47:48.494228,0.8272,,,0.8371,0.82583,0.82802
2022-04-15 03:47:51.558024,0.82488,0.988988,0.988988,0.82802,0.8244,0.83029
2022-04-15 03:47:54.643479,0.82775,0.991865,0.991865,0.83029,0.82345,0.82878
2022-04-15 03:47:57.742969,0.82055,0.998181,0.998181,0.82878,0.82028,0.82277
2022-04-15 03:48:00.807706,0.81833,0.990943,0.990943,0.82277,0.81699,0.81981
2022-04-15 03:48:03.890062,0.81633,0.989177,0.989177,0.81981,0.81453,0.82223
2022-04-15 03:48:06.951474,0.82028,0.999344,0.999344,0.82223,0.81526,0.82488
2022-04-15 03:48:10.079903,0.81806,1.003223,1.003223,0.82488,0.81733,0.8188
2022-04-15 03:48:13.161523,0.8144,0.992629,0.992629,0.8188,0.81261,0.82115


### ok I have to make another merge function because my original assumption was wrong about duplicating the time...


In [91]:
import datetime as dt
from functools import reduce

In [92]:
def mergeORIGINAL(dfs:list[pd.DataFrame]):
    ''' Layer 1
    combines multiple mutlicolumned dataframes.
    to support disparate frequencies, 
    outter join fills in missing values with previous value.
    '''
    if len(dfs) == 0:
        return None
    if len(dfs) == 1:
        return dfs[0]
    for df in dfs:
        df.index = pd.to_datetime(df.index)
    return reduce(
        lambda left, right: pd.merge(
            left, 
            right, 
            how='outer',
            left_index=True,
            right_index=True).fillna(method='ffill'),
        #.fillna(method='bfill'),
        # don't bfill here, in many cases its fine to bfill, but not in all.
        # maybe we will bfill in model. always bfill After ffill.
        dfs)

In [114]:
df1 = pd.DataFrame(['a', 'b', 'c'],
    columns=pd.MultiIndex.from_product([['target'], ['key']]),
    index = [
        '2022-04-15 20:20:20.000000', 
        '2022-04-15 20:20:21.000000', 
        '2022-04-15 20:20:22.000000'],)
df1

Unnamed: 0_level_0,target
Unnamed: 0_level_1,key
2022-04-15 20:20:20.000000,a
2022-04-15 20:20:21.000000,b
2022-04-15 20:20:22.000000,c


In [122]:
df2 = pd.DataFrame(['a2', 'b2', 'c2', 'd2', 'e2'],
    columns=pd.MultiIndex.from_product([['feature2'], ['keys']]),
    index = [
        '2022-04-15 20:20:20.100000', 
        '2022-04-15 20:20:20.500000', 
        '2022-04-15 20:20:20.900000', 
        '2022-04-15 20:20:21.000000', 
        '2022-04-15 20:20:21.100000',],)

df2

Unnamed: 0_level_0,feature2
Unnamed: 0_level_1,keys
2022-04-15 20:20:20.100000,a2
2022-04-15 20:20:20.500000,b2
2022-04-15 20:20:20.900000,c2
2022-04-15 20:20:21.000000,d2
2022-04-15 20:20:21.100000,e2


In [123]:
df3 = pd.DataFrame(['a3', 'b3', 'c3', 'd3', 'e3'],
    columns=pd.MultiIndex.from_product([['feature3'], ['keys']]),
    index = [
        '2022-04-15 20:20:19.000000', 
        '2022-04-15 20:20:19.200000', 
        '2022-04-15 20:20:20.000000', 
        '2022-04-15 20:20:20.200000', 
        '2022-04-15 20:20:23.100000',],)
df3

Unnamed: 0_level_0,feature3
Unnamed: 0_level_1,keys
2022-04-15 20:20:19.000000,a3
2022-04-15 20:20:19.200000,b3
2022-04-15 20:20:20.000000,c3
2022-04-15 20:20:20.200000,d3
2022-04-15 20:20:23.100000,e3


In [124]:
mergeORIGINAL([df1, df2, df3])

Unnamed: 0_level_0,target,feature2,feature3
Unnamed: 0_level_1,key,keys,keys
2022-04-15 20:20:19.000,,,a3
2022-04-15 20:20:19.200,,,b3
2022-04-15 20:20:20.000,a,,c3
2022-04-15 20:20:20.100,a,a2,c3
2022-04-15 20:20:20.200,a,a2,d3
2022-04-15 20:20:20.500,a,b2,d3
2022-04-15 20:20:20.900,a,c2,d3
2022-04-15 20:20:21.000,b,d2,d3
2022-04-15 20:20:21.100,b,e2,d3
2022-04-15 20:20:22.000,c,e2,d3


##### no good because stream 1 is our target we don't want it duplicated

In [125]:
def merge(dfs:list[pd.DataFrame], targetDf:pd.DataFrame, targetColumn:'str|tuple[str]'):
    dfs = [targetDf] + dfs
    for df in dfs:
        df.index = pd.to_datetime(df.index)
    return reduce(
        lambda left, right: pd.merge(
            left, 
            right, 
            how='outer',
            left_index=True,
            right_index=True),#.fillna(method='ffill'),
        #.fillna(method='bfill'),
        # don't bfill here, in many cases its fine to bfill, but not in all.
        # maybe we will bfill in model. always bfill After ffill.
        dfs)

In [127]:
merged = merge([df2, df3], targetDf=df1, targetColumn=('target', 'key'))
merged

Unnamed: 0_level_0,target,feature2,feature3
Unnamed: 0_level_1,key,keys,keys
2022-04-15 20:20:19.000,,,a3
2022-04-15 20:20:19.200,,,b3
2022-04-15 20:20:20.000,a,,c3
2022-04-15 20:20:20.100,,a2,
2022-04-15 20:20:20.200,,,d3
2022-04-15 20:20:20.500,,b2,
2022-04-15 20:20:20.900,,c2,
2022-04-15 20:20:21.000,b,d2,
2022-04-15 20:20:21.100,,e2,
2022-04-15 20:20:22.000,c,,


In [129]:
merged.drop_duplicates(subset=('target', 'key'), keep='first')

Unnamed: 0_level_0,target,feature2,feature3
Unnamed: 0_level_1,key,keys,keys
2022-04-15 20:20:19,,,a3
2022-04-15 20:20:20,a,,c3
2022-04-15 20:20:21,b,d2,
2022-04-15 20:20:22,c,,


In [134]:
for col in merged.columns:
    if col != ('target', 'key'):
        merged[col] = merged[col].fillna(method='ffill')
merged

Unnamed: 0_level_0,target,feature2,feature3
Unnamed: 0_level_1,key,keys,keys
2022-04-15 20:20:19.000,,,a3
2022-04-15 20:20:19.200,,,b3
2022-04-15 20:20:20.000,a,,c3
2022-04-15 20:20:20.100,,a2,c3
2022-04-15 20:20:20.200,,a2,d3
2022-04-15 20:20:20.500,,b2,d3
2022-04-15 20:20:20.900,,c2,d3
2022-04-15 20:20:21.000,b,d2,d3
2022-04-15 20:20:21.100,,e2,d3
2022-04-15 20:20:22.000,c,e2,d3


In [138]:
merged = merged[merged[('target', 'key')].notna()]

In [139]:
merged

Unnamed: 0_level_0,target,feature2,feature3
Unnamed: 0_level_1,key,keys,keys
2022-04-15 20:20:20,a,,c3
2022-04-15 20:20:21,b,d2,d3
2022-04-15 20:20:22,c,e2,d3


#### ok now that that's solved... 

In [142]:
# I wish I didn't have to expand it out then filter it down... but this isn't a solution... 
df1.merge(df2, how='left', left_index=True, right_index=True)

Unnamed: 0_level_0,target,feature2
Unnamed: 0_level_1,key,keys
2022-04-15 20:20:20,a,
2022-04-15 20:20:21,b,d2
2022-04-15 20:20:22,c,


##### next handle names because the old way should be superceeded by the mutliindex columns

In [146]:
import pandas as pd
from functools import reduce

# Convert all indexes to datetime
for df in [df1, df2, df3]:
    df.index = pd.to_datetime(df.index)

# Perform as-of merges
res = reduce(
    lambda left, right:
        pd.merge_asof(left, right, left_index=True, right_index=True),
    [df1, df2, df3])
res

Unnamed: 0_level_0,target,feature2,feature3
Unnamed: 0_level_1,key,keys,keys
2022-04-15 20:20:20,a,,c3
2022-04-15 20:20:21,b,d2,d3
2022-04-15 20:20:22,c,e2,d3


In [147]:
mylist = [1,2,3]
mylist.insert(0, mylist.pop(mylist.index(2)))
mylist

[2, 1, 3]

In [149]:
mylist = [df2,df1,df3]

for ix, item in enumerate(mylist):
    if ('target', 'key') in item.columns:
        mylist.insert(0, mylist.pop(ix))
        break
mylist

[                    target
                        key
 2022-04-15 20:20:20      a
 2022-04-15 20:20:21      b
 2022-04-15 20:20:22      c,
                         feature2
                             keys
 2022-04-15 20:20:20.100       a2
 2022-04-15 20:20:20.500       b2
 2022-04-15 20:20:20.900       c2
 2022-04-15 20:20:21.000       d2
 2022-04-15 20:20:21.100       e2,
                         feature3
                             keys
 2022-04-15 20:20:19.000       a3
 2022-04-15 20:20:19.200       b3
 2022-04-15 20:20:20.000       c3
 2022-04-15 20:20:20.200       d3
 2022-04-15 20:20:23.100       e3]

In [154]:
df3.loc[:, [('feature3' ,'keys')]].shift(-1)

Unnamed: 0_level_0,feature3
Unnamed: 0_level_1,keys
2022-04-15 20:20:19.000,b3
2022-04-15 20:20:19.200,c3
2022-04-15 20:20:20.000,d3
2022-04-15 20:20:20.200,e3
2022-04-15 20:20:23.100,


In [155]:
pd.DataFrame(
    {('sourceId', 'streamId', target): values for target, values in list({'a': 1, 'b':2}.items()) + [('StreamObservationId', 'observationId')]},
    #columns=pd.MultiIndex.from_tuples([(self.sourceId, self.streamId, self.)]),
    index=['observedTime'])

Unnamed: 0_level_0,sourceId,sourceId,sourceId
Unnamed: 0_level_1,streamId,streamId,streamId
Unnamed: 0_level_2,a,b,StreamObservationId
observedTime,1,2,observationId
