In [1]:
import os
import sys
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

In [2]:
import yaml

import snowflake.connector as connector
from datetime import datetime, timedelta

import warnings
warnings.filterwarnings('ignore')

%load_ext autoreload
%autoreload 2

In [3]:
with open('../config.yml', 'r') as file:
    config = yaml.safe_load(file)

snow_config = config['snowflake']
github_config = config['github']

In [4]:
ds = 'tile_ds'

user=snow_config['user']
password=snow_config['password']
url=snow_config['account']
database=snow_config[ds]['database']
schema=snow_config[ds]['schema'] 
warehouse=snow_config[ds]['warehouse']

conn = connector.connect(user=user, 
                         password=password, 
                         account=url, 
                         database="FB_SIMULATE", 
                         schema="FEATUREBYTE",
                         warehouse="FB_TILE_WH")

In [5]:
def execute(sql):
    cs = conn.cursor()
    try:
        cs.execute(sql)
        return cs.fetch_pandas_all()
    finally:
        cs.close()

sql_template = """
    select entity_id, F_INDEX_TO_TIMESTAMP(index, 15, ${BLIND_SPOT}, 1) as tile_start_ts, count(*) as value
    from (
        select 
            entity_id,            
            F_TIMESTAMP_TO_INDEX(event_timestamp, 15, ${BLIND_SPOT}, 1) as index,
            value
        from FB_TEST.TEMP
        where event_timestamp < FB_END_TS
        and wh_available_at <= '${FEATURE_JOB_TS}'
    )
    where tile_start_ts < FB_END_TS
    group by entity_id, tile_start_ts
"""

sql_sp = """
    call SP_TILE_GENERATE_SCHEDULE('${TILE_NAME}', 15, ${BLIND_SPOT}, 1, 1440, '${SQL}', 
        'entity_id, tile_start_ts, value', 
        'online', 10, 
        '${TILE_END_TS}')
"""

In [6]:
def check_tile(feature_job_ts_str):
    #feature_job_ts = datetime.fromisoformat("2022-06-09 01:51:15")
    feature_job_ts = datetime.fromisoformat(feature_job_ts_str)
    tile_end_ts = feature_job_ts - timedelta(seconds=blind_spot)
    
    feature_job_ts_str = feature_job_ts.strftime("%Y-%m-%dT%H:%M:%S.00Z")
    tile_end_ts_str = tile_end_ts.strftime("%Y-%m-%dT%H:%M:%S.00Z")

    sql = sql_template.replace("${BLIND_SPOT}", str(blind_spot)).replace("${FEATURE_JOB_TS}", feature_job_ts_str)
    
    sql_check = sql.replace("FB_END_TS", "\'"+tile_end_ts_str+"\'")
    print(sql_check)
    df = execute(sql_check)
    
    return sql, tile_end_ts_str, df


def generate_tile(input_sql, tile_end_ts_str):
    sql2 = input_sql.replace("'", "\\\\\\'")
    
    sql3 = sql_sp_new.replace("${SQL}", sql2) \
        .replace("${TILE_END_TS}", tile_end_ts_str) \
        .replace("${BLIND_SPOT}", str(blind_spot))
    
    print(sql3)

    return execute(sql3)

def check_monitor(tile_name):
    df = execute(f"SELECT * FROM {tile_name}_tile_monitor order by TILE_START_TS desc")
    return df

def check_result_tiles(tile_name):
    sql = f"SELECT F_INDEX_TO_TIMESTAMP(index, 15, {blind_spot}, 1) as tile_start_ts, * FROM {tile_name}_tile order by TILE_START_TS desc"
    df = execute(sql)
    return df

# 1 FeatureJob with BlindSpot 100 Seconds

* The feature job is scheduled every min at **15s**
* Blind spot is not set optimally. Blind spot is equal to **100s**
* Tiles size is **1min**

In [10]:
blind_spot = 100
tile_name = f"TEMP_{blind_spot}"
sql_sp_new = sql_sp.replace("${TILE_NAME}", tile_name).replace("${BLIND_SPOT}", str(blind_spot))

### 1.1 FeatureJob at 01:15

In [11]:
sql, tile_end_ts, df = check_tile("2022-06-09 01:51:15")
df


    select entity_id, F_INDEX_TO_TIMESTAMP(index, 15, 100, 1) as tile_start_ts, count(*) as value
    from (
        select 
            entity_id,            
            F_TIMESTAMP_TO_INDEX(event_timestamp, 15, 100, 1) as index,
            value
        from FB_TEST.TEMP
        where event_timestamp < '2022-06-09T01:49:35.00Z'
        and wh_available_at <= '2022-06-09T01:51:15.00Z'
    )
    where tile_start_ts < '2022-06-09T01:49:35.00Z'
    group by entity_id, tile_start_ts



Unnamed: 0,ENTITY_ID,TILE_START_TS,VALUE


In [12]:
generate_tile(sql, tile_end_ts)


    call SP_TILE_GENERATE_SCHEDULE('TEMP_100', 15, 100, 1, 1440, '
    select entity_id, F_INDEX_TO_TIMESTAMP(index, 15, 100, 1) as tile_start_ts, count(*) as value
    from (
        select 
            entity_id,            
            F_TIMESTAMP_TO_INDEX(event_timestamp, 15, 100, 1) as index,
            value
        from FB_TEST.TEMP
        where event_timestamp < FB_END_TS
        and wh_available_at <= \\\'2022-06-09T01:51:15.00Z\\\'
    )
    where tile_start_ts < FB_END_TS
    group by entity_id, tile_start_ts
', 
        'entity_id, tile_start_ts, value', 
        'online', 10, 
        '2022-06-09T01:49:35.00Z')



Unnamed: 0,SP_TILE_GENERATE_SCHEDULE
0,Debug - cron_residue_seconds: 15 - END_TS: Wed...


In [13]:
check_result_tiles(tile_name)

Unnamed: 0,TILE_START_TS,INDEX,ENTITY_ID,VALUE,CREATED_AT


### 1.2 FeatureJob at 02:15

In [14]:
sql, tile_end_ts, df = check_tile("2022-06-09 01:52:15")
df


    select entity_id, F_INDEX_TO_TIMESTAMP(index, 15, 100, 1) as tile_start_ts, count(*) as value
    from (
        select 
            entity_id,            
            F_TIMESTAMP_TO_INDEX(event_timestamp, 15, 100, 1) as index,
            value
        from FB_TEST.TEMP
        where event_timestamp < '2022-06-09T01:50:35.00Z'
        and wh_available_at <= '2022-06-09T01:52:15.00Z'
    )
    where tile_start_ts < '2022-06-09T01:50:35.00Z'
    group by entity_id, tile_start_ts



Unnamed: 0,ENTITY_ID,TILE_START_TS,VALUE
0,1,2022-06-09 01:49:35,3


In [15]:
generate_tile(sql, tile_end_ts)


    call SP_TILE_GENERATE_SCHEDULE('TEMP_100', 15, 100, 1, 1440, '
    select entity_id, F_INDEX_TO_TIMESTAMP(index, 15, 100, 1) as tile_start_ts, count(*) as value
    from (
        select 
            entity_id,            
            F_TIMESTAMP_TO_INDEX(event_timestamp, 15, 100, 1) as index,
            value
        from FB_TEST.TEMP
        where event_timestamp < FB_END_TS
        and wh_available_at <= \\\'2022-06-09T01:52:15.00Z\\\'
    )
    where tile_start_ts < FB_END_TS
    group by entity_id, tile_start_ts
', 
        'entity_id, tile_start_ts, value', 
        'online', 10, 
        '2022-06-09T01:50:35.00Z')



Unnamed: 0,SP_TILE_GENERATE_SCHEDULE
0,Debug - cron_residue_seconds: 15 - END_TS: Wed...


In [16]:
check_result_tiles(tile_name)

Unnamed: 0,TILE_START_TS,INDEX,ENTITY_ID,VALUE,CREATED_AT
0,2022-06-09 01:49:35,27578991,1,3,2022-06-10 05:36:50.898


In [17]:
check_monitor(tile_name)

Unnamed: 0,TILE_START_TS,INDEX,ENTITY_ID,VALUE,OLD_VALUE,TILE_TYPE,EXPECTED_CREATED_AT,CREATED_AT


### 1.3 FeatureJob at 03:15

In [18]:
ts = "2022-06-09 01:53:15"
print('Running for ', ts)
sql, tile_end_ts, df = check_tile(ts)
generate_tile(sql, tile_end_ts)

Running for  2022-06-09 01:53:15

    select entity_id, F_INDEX_TO_TIMESTAMP(index, 15, 100, 1) as tile_start_ts, count(*) as value
    from (
        select 
            entity_id,            
            F_TIMESTAMP_TO_INDEX(event_timestamp, 15, 100, 1) as index,
            value
        from FB_TEST.TEMP
        where event_timestamp < '2022-06-09T01:51:35.00Z'
        and wh_available_at <= '2022-06-09T01:53:15.00Z'
    )
    where tile_start_ts < '2022-06-09T01:51:35.00Z'
    group by entity_id, tile_start_ts


    call SP_TILE_GENERATE_SCHEDULE('TEMP_100', 15, 100, 1, 1440, '
    select entity_id, F_INDEX_TO_TIMESTAMP(index, 15, 100, 1) as tile_start_ts, count(*) as value
    from (
        select 
            entity_id,            
            F_TIMESTAMP_TO_INDEX(event_timestamp, 15, 100, 1) as index,
            value
        from FB_TEST.TEMP
        where event_timestamp < FB_END_TS
        and wh_available_at <= \\\'2022-06-09T01:53:15.00Z\\\'
    )
    where tile_start_ts

Unnamed: 0,SP_TILE_GENERATE_SCHEDULE
0,Debug - cron_residue_seconds: 15 - END_TS: Wed...


In [19]:
check_result_tiles(tile_name)

Unnamed: 0,TILE_START_TS,INDEX,ENTITY_ID,VALUE,CREATED_AT
0,2022-06-09 01:50:35,27578992,1,4,2022-06-10 05:37:16.381
1,2022-06-09 01:49:35,27578991,1,3,2022-06-10 05:37:16.381


In [20]:
check_monitor(tile_name)

Unnamed: 0,TILE_START_TS,INDEX,ENTITY_ID,VALUE,OLD_VALUE,TILE_TYPE,EXPECTED_CREATED_AT,CREATED_AT


### 1.4 FeatureJob at 04:15

In [21]:
sql, tile_end_ts, df = check_tile("2022-06-09 01:54:15")
df


    select entity_id, F_INDEX_TO_TIMESTAMP(index, 15, 100, 1) as tile_start_ts, count(*) as value
    from (
        select 
            entity_id,            
            F_TIMESTAMP_TO_INDEX(event_timestamp, 15, 100, 1) as index,
            value
        from FB_TEST.TEMP
        where event_timestamp < '2022-06-09T01:52:35.00Z'
        and wh_available_at <= '2022-06-09T01:54:15.00Z'
    )
    where tile_start_ts < '2022-06-09T01:52:35.00Z'
    group by entity_id, tile_start_ts



Unnamed: 0,ENTITY_ID,TILE_START_TS,VALUE
0,1,2022-06-09 01:49:35,3
1,1,2022-06-09 01:50:35,4
2,1,2022-06-09 01:51:35,4


In [22]:
generate_tile(sql, tile_end_ts)


    call SP_TILE_GENERATE_SCHEDULE('TEMP_100', 15, 100, 1, 1440, '
    select entity_id, F_INDEX_TO_TIMESTAMP(index, 15, 100, 1) as tile_start_ts, count(*) as value
    from (
        select 
            entity_id,            
            F_TIMESTAMP_TO_INDEX(event_timestamp, 15, 100, 1) as index,
            value
        from FB_TEST.TEMP
        where event_timestamp < FB_END_TS
        and wh_available_at <= \\\'2022-06-09T01:54:15.00Z\\\'
    )
    where tile_start_ts < FB_END_TS
    group by entity_id, tile_start_ts
', 
        'entity_id, tile_start_ts, value', 
        'online', 10, 
        '2022-06-09T01:52:35.00Z')



Unnamed: 0,SP_TILE_GENERATE_SCHEDULE
0,Debug - cron_residue_seconds: 15 - END_TS: Wed...


In [23]:
check_result_tiles(tile_name)

Unnamed: 0,TILE_START_TS,INDEX,ENTITY_ID,VALUE,CREATED_AT
0,2022-06-09 01:51:35,27578993,1,4,2022-06-10 05:37:42.775
1,2022-06-09 01:50:35,27578992,1,4,2022-06-10 05:37:42.775
2,2022-06-09 01:49:35,27578991,1,3,2022-06-10 05:37:42.775


In [24]:
check_monitor(tile_name)

Unnamed: 0,TILE_START_TS,INDEX,ENTITY_ID,VALUE,OLD_VALUE,TILE_TYPE,EXPECTED_CREATED_AT,CREATED_AT


### 1.5 FeatureJob from 05:15 to 08:15

In [25]:
for i in range(5, 9):
    ts = f"2022-06-09 01:5{i}:15"
    print('Running for ', ts)
    sql, tile_end_ts, df = check_tile(ts)
    generate_tile(sql, tile_end_ts)

Running for  2022-06-09 01:55:15

    select entity_id, F_INDEX_TO_TIMESTAMP(index, 15, 100, 1) as tile_start_ts, count(*) as value
    from (
        select 
            entity_id,            
            F_TIMESTAMP_TO_INDEX(event_timestamp, 15, 100, 1) as index,
            value
        from FB_TEST.TEMP
        where event_timestamp < '2022-06-09T01:53:35.00Z'
        and wh_available_at <= '2022-06-09T01:55:15.00Z'
    )
    where tile_start_ts < '2022-06-09T01:53:35.00Z'
    group by entity_id, tile_start_ts


    call SP_TILE_GENERATE_SCHEDULE('TEMP_100', 15, 100, 1, 1440, '
    select entity_id, F_INDEX_TO_TIMESTAMP(index, 15, 100, 1) as tile_start_ts, count(*) as value
    from (
        select 
            entity_id,            
            F_TIMESTAMP_TO_INDEX(event_timestamp, 15, 100, 1) as index,
            value
        from FB_TEST.TEMP
        where event_timestamp < FB_END_TS
        and wh_available_at <= \\\'2022-06-09T01:55:15.00Z\\\'
    )
    where tile_start_ts

In [26]:
check_result_tiles(tile_name)

Unnamed: 0,TILE_START_TS,INDEX,ENTITY_ID,VALUE,CREATED_AT
0,2022-06-09 01:53:35,27578995,1,1,2022-06-10 05:39:09.602
1,2022-06-09 01:52:35,27578994,1,4,2022-06-10 05:39:09.602
2,2022-06-09 01:51:35,27578993,1,4,2022-06-10 05:39:09.602
3,2022-06-09 01:50:35,27578992,1,4,2022-06-10 05:39:09.602
4,2022-06-09 01:49:35,27578991,1,3,2022-06-10 05:39:09.602


In [27]:
check_monitor(tile_name)

Unnamed: 0,TILE_START_TS,INDEX,ENTITY_ID,VALUE,OLD_VALUE,TILE_TYPE,EXPECTED_CREATED_AT,CREATED_AT


### 1.6 FeatureJob at 09:15

In [28]:
sql, tile_end_ts, df = check_tile("2022-06-09 01:59:15")
df


    select entity_id, F_INDEX_TO_TIMESTAMP(index, 15, 100, 1) as tile_start_ts, count(*) as value
    from (
        select 
            entity_id,            
            F_TIMESTAMP_TO_INDEX(event_timestamp, 15, 100, 1) as index,
            value
        from FB_TEST.TEMP
        where event_timestamp < '2022-06-09T01:57:35.00Z'
        and wh_available_at <= '2022-06-09T01:59:15.00Z'
    )
    where tile_start_ts < '2022-06-09T01:57:35.00Z'
    group by entity_id, tile_start_ts



Unnamed: 0,ENTITY_ID,TILE_START_TS,VALUE
0,1,2022-06-09 01:49:35,3
1,1,2022-06-09 01:50:35,4
2,1,2022-06-09 01:51:35,4
3,1,2022-06-09 01:52:35,4
4,1,2022-06-09 01:53:35,4
5,1,2022-06-09 01:54:35,4
6,1,2022-06-09 01:55:35,4
7,1,2022-06-09 01:56:35,4


In [29]:
generate_tile(sql, tile_end_ts)


    call SP_TILE_GENERATE_SCHEDULE('TEMP_100', 15, 100, 1, 1440, '
    select entity_id, F_INDEX_TO_TIMESTAMP(index, 15, 100, 1) as tile_start_ts, count(*) as value
    from (
        select 
            entity_id,            
            F_TIMESTAMP_TO_INDEX(event_timestamp, 15, 100, 1) as index,
            value
        from FB_TEST.TEMP
        where event_timestamp < FB_END_TS
        and wh_available_at <= \\\'2022-06-09T01:59:15.00Z\\\'
    )
    where tile_start_ts < FB_END_TS
    group by entity_id, tile_start_ts
', 
        'entity_id, tile_start_ts, value', 
        'online', 10, 
        '2022-06-09T01:57:35.00Z')



Unnamed: 0,SP_TILE_GENERATE_SCHEDULE
0,Debug - cron_residue_seconds: 15 - END_TS: Wed...


In [30]:
check_result_tiles(tile_name)

Unnamed: 0,TILE_START_TS,INDEX,ENTITY_ID,VALUE,CREATED_AT
0,2022-06-09 01:56:35,27578998,1,4,2022-06-10 05:39:29.370
1,2022-06-09 01:55:35,27578997,1,4,2022-06-10 05:39:29.370
2,2022-06-09 01:54:35,27578996,1,4,2022-06-10 05:39:29.370
3,2022-06-09 01:53:35,27578995,1,4,2022-06-10 05:39:29.370
4,2022-06-09 01:52:35,27578994,1,4,2022-06-10 05:39:29.370
5,2022-06-09 01:51:35,27578993,1,4,2022-06-10 05:39:29.370
6,2022-06-09 01:50:35,27578992,1,4,2022-06-10 05:39:29.370
7,2022-06-09 01:49:35,27578991,1,3,2022-06-10 05:39:29.370


In [31]:
check_monitor(tile_name)

Unnamed: 0,TILE_START_TS,INDEX,ENTITY_ID,VALUE,OLD_VALUE,TILE_TYPE,EXPECTED_CREATED_AT,CREATED_AT
0,2022-06-09 01:55:35,27578997,1,4,,ONLINE,2022-06-09 01:58:15,2022-06-10 05:39:28.413
1,2022-06-09 01:54:35,27578996,1,4,,ONLINE,2022-06-09 01:57:15,2022-06-10 05:39:28.413
2,2022-06-09 01:53:35,27578995,1,4,1.0,ONLINE,2022-06-09 01:56:15,2022-06-10 05:39:28.413
