In [22]:
# ---
# jupyter:
#   jupytext:
#     formats: ipynb,py:light
#     text_representation:
#       extension: .py
#       format_name: light
#       format_version: '1.4'
#       jupytext_version: 1.2.3
#   kernelspec:
#     display_name: Python 3
#     language: python
#     name: python3
# ---

# Parameters - when this notebook is executed via airflow, these will be dynamically updated
execution_date_str = '2019-11-16T18:00:00+00:00'

# +
import pandas as pd
import numpy as np
#from ipy_table import make_table, apply_theme, set_global_style
from dateutil.parser import parse
import datetime
import pytz

from drgn.pandas import load_dataframe, save_dataframe, preprocess_kafka
from drgn.time import apply_offset

# -

# Work out the date range for this job. Note that execution dates are still starting from 2am, for data from the previous day.

# +
HKT = pytz.timezone("HongKong")

execution_date = parse(execution_date_str)

start_ts = execution_date - datetime.timedelta(hours=2)
end_ts = execution_date + datetime.timedelta(hours=22)
snapshot_date = end_ts.astimezone(HKT).date()

print("Start Time:    %s" % start_ts.astimezone(HKT))
print("End Time:      %s" % end_ts.astimezone(HKT))

print("Snapshot Date: %s" % snapshot_date)

# +
df_events = load_dataframe(
    'processed',
    'kafka-connect/deposit-main-account-snapshot-events',
    start_ts=start_ts,
    end_ts=end_ts
)

df_events.head()

# +
# In case there was no data, we create a dummy dataframe with the
# minimum number of required columns so the script will complete
# This shouldn't be required in future when we have full schema registry support

REQUIRED_COLUMNS = [
    'msg_payload.accountId',
    'msg_payload.accountNumber',
    'msg_payload.bankCode',
    'msg_payload.branchCode',
    'msg_payload.checksum',
    'msg_payload.createdAt',
    'msg_payload.customerId',
    'msg_payload.denomination',
    'msg_payload.productVersionId',
    'msg_payload.status',
    'msg_timestamp'
]

for col in REQUIRED_COLUMNS:
    if col not in df_events:
        df_events[col] = None

# +
# preprocess the raw kafka messages, drop all the kafka headers
df_events = preprocess_kafka(df_events)

df_events.head()

# +
# Load the previous day's snapshot of the EOD customer state
prev_snapshot_date = snapshot_date - datetime.timedelta(days=1)

df_prev_snapshot = load_dataframe(
    'deposits',
    'snapshot/main-account/year={year}/month={month}/day={day}'.format(
        year=prev_snapshot_date.year,
        month=prev_snapshot_date.month,
        day=prev_snapshot_date.day
    )
)
df_prev_snapshot.head()
# -

# Merge the dataframes, and then take the latest available record by customerProfile.id

# +
df_merged = pd.concat([df_prev_snapshot, df_events], sort=False)

df_merged = df_merged.sort_values(['msg_timestamp'], ascending=True)
df_merged = df_merged.drop_duplicates(subset=['accountId'], keep='last')
# -

df_merged.head()

if len(df_merged) > 0:
    save_dataframe(
        'deposits',
        'snapshot/main-account/year={year}/month={month}/day={day}'.format(
            year=snapshot_date.year,
            month=snapshot_date.month,
            day=snapshot_date.day
        ),
        df_merged,
        output_type='parquet'
    )


Start Time:    2019-11-17 00:00:00+08:00
End Time:      2019-11-18 00:00:00+08:00
Snapshot Date: 2019-11-18


S3UploadFailedError: Failed to upload /tmp/tmpin1c2p_i/day=18_2019-11-18.parquet to dev-deposits-datalake-446152970647-ap-southeast-1/snapshot/main-account/year=2019/month=11/day=18/day=18_2019-11-18.parquet: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied

In [26]:
df_merged

Unnamed: 0,accountId,accountNumber,bankCode,branchCode,checksum,createdAt,customerId,denomination,productVersionId,status,msg_timestamp
0,881cbce9-b945-2104-cadc-5d30a212d3ae,4438914,389,749,2,2019-10-23T09:06:51.717421Z,8160635109194952136,HKD,133,OPEN,1573920056960
70,40d2503c-3869-4af0-b366-54179a767802,8405077,389,749,8,2019-10-09T07:09:58.467088Z,3613976792116039700,HKD,128,OPEN,1573992005733
71,50c7b99f-a237-402f-98d6-a3da634c629d,9605297,389,749,8,2019-10-09T08:05:58.268977Z,2983111236260421481,HKD,128,OPEN,1573992007424
72,eaacf08a-5ba4-e58e-3315-2140d8b06831,4770222,389,749,6,2019-11-13T04:44:50.559266Z,8879042521807956384,HKD,138,OPEN,1573992341578
73,37432aea-c2bf-a64a-88d4-3416dbbd3ba8,5366736,389,749,2,2019-11-14T12:26:06.564245Z,5337692565061109795,HKD,139,OPEN,1573992435999
74,a18d0eb9-8472-4c47-a63f-39bc726a939e,3436820,389,749,5,2019-10-18T02:36:15.471826Z,9160510001745264989,HKD,133,OPEN,1573992491947
75,8bbd081f-1740-4627-93a2-e31a82b45d9b,34948,389,749,6,2019-10-15T15:42:29.769327Z,235604347760561562,HKD,128,OPEN,1573992551660
76,890dfa3d-b70b-a48a-2c24-17719b00cd44,9639151,389,749,7,2019-10-23T09:58:25.708755Z,6588825185493474316,HKD,133,OPEN,1573992625132
77,f8239ea1-9a08-b388-3e63-2f3f10ae604d,2425281,389,749,5,2019-11-07T14:02:21.756172Z,2606421835017449496,HKD,138,OPEN,1573992664420
78,8904c918-ef66-7698-2f96-2158456db40a,7149921,389,749,0,2019-10-24T09:50:14.922908Z,6029527329206976558,HKD,133,OPEN,1573992686585
