In [98]:
BUCKET = 'innovationday-467664929633'
OVERRIDE_PATH = None
DATA_LOCATION = 's3a://{}'.format(BUCKET)

In [99]:
import numpy as np
import pandas as pd
from time import time
from io import StringIO
import math

import matplotlib.pyplot as plt

from datetime import timedelta
import sagemaker_pyspark, boto3
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, DoubleType, StringType, TimestampType, StructType, StructField
import json
from IPython.display import display # Allows the use of display() for displaying DataFrames


from sagemaker import get_execution_role
role = get_execution_role()

In [100]:
pd.set_option('display.max_columns', 500)
region = boto3.Session().region_name
classpath = ":".join(sagemaker_pyspark.classpath_jars())
spark = SparkSession.builder.config("spark.driver.extraClassPath", classpath).getOrCreate()

In [101]:
# file = DATA_LOCATION + '/cbbc_daily_summary.csv'
# # print(file)
# df = spark.read.csv(file, inferSchema=True, header=True)
# df.show(5)

In [102]:
#cbbc_daily_summary.csv
cbbc_daily_summary = pd.read_csv(f'{DATA_LOCATION}/cbbc_daily_summary.csv')
display(cbbc_daily_summary.head())
cbbc_daily_summary.info()

Unnamed: 0,date,sym,CBBCCode,CBBCName,TradeDate,CBBCsBought,AveragePriceBought,CBBCsSold,AveragePriceSold,Outstanding,OutstandingPct,TotalIssueSize,TradingCurrency,DayHigh,DayLow,ClosingPrice,Volume,Turnover,Issuer,Underlying,BullBear,CBBCType,CBBCCategory,ListingDate,LastTradingDate,MaturityDate,MCE,Strike_CallCurrency,StrikeLevel,CallLevel,EntRatio,DelistingDate,ldt
0,2016-01-04,60018.hk,60018,CS#HSI RP1601Y,2016-01-04,0.0,0.0,-20000.0,-0.2255,90490000.0,45.25,200000000.0,HKD,0.237,0.223,0.248,120000,28210,CS,HSI,Bear,Standard,R,2015-08-25,2016-01-27,2016-01-28,N,-,23800.0,23600.0,10000,2016-01-29,2019-03-11D19:21:06.181120000
1,2016-01-04,60023.hk,60023,CS#HSI RP1602C,2016-01-04,830000.0,0.183771,-510000.0,-0.197137,130960000.0,65.48,200000000.0,HKD,0.205,0.164,0.199,7750000,1478070,CS,HSI,Bear,Standard,R,2015-08-25,2016-02-25,2016-02-26,N,-,23725.0,23475.0,12000,2016-02-29,2019-03-11D19:21:06.181120000
2,2016-01-04,60045.hk,60045,EA#TENCTRC1610A,2016-01-04,0.0,0.0,0.0,0.0,0.0,0.0,40000000.0,HKD,0.0,0.0,,0,0,EA,00700,Bull,Standard,R,2015-10-02,,2016-10-26,N,HKD,115.0,118.0,100,,2019-03-11D19:21:06.181120000
3,2016-01-04,60046.hk,60046,SG#HSI RP1602Z,2016-01-04,10000.0,0.249,-280000.0,-0.246714,290000.0,0.15,200000000.0,HKD,0.249,0.246,0.26,290000,71570,SG,HSI,Bear,Standard,R,2015-08-25,2016-02-25,2016-02-26,N,-,23788.0,23688.0,10000,2016-02-29,2019-03-11D19:21:06.181120000
4,2016-01-04,60051.hk,60051,SG#HSI RP1602A,2016-01-04,2670000.0,0.227225,-1620000.0,-0.219426,1210000.0,0.61,200000000.0,HKD,0.243,0.2,0.24,4290000,962160,SG,HSI,Bear,Standard,R,2015-08-25,2016-02-25,2016-02-26,N,-,23588.0,23488.0,10000,2016-02-29,2019-03-11D19:21:06.181120000


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1152111 entries, 0 to 1152110
Data columns (total 33 columns):
date                   1152111 non-null object
sym                    1152111 non-null object
CBBCCode               1152111 non-null int64
CBBCName               1152111 non-null object
TradeDate              1152111 non-null object
CBBCsBought            1151871 non-null float64
AveragePriceBought     1151871 non-null float64
CBBCsSold              1151871 non-null float64
AveragePriceSold       1151871 non-null float64
Outstanding            1151871 non-null float64
OutstandingPct         1151871 non-null float64
TotalIssueSize         1151871 non-null float64
TradingCurrency        1152111 non-null object
DayHigh                1152111 non-null float64
DayLow                 1152111 non-null float64
ClosingPrice           1067680 non-null float64
Volume                 1152111 non-null int64
Turnover               1152111 non-null int64
Issuer                 1152111 non

In [103]:
# add few column
trimData = cbbc_daily_summary.copy()

# add net sales column
trimData['target'] = trimData.apply(lambda row: -1 * ((row['CBBCsBought'] + row['CBBCsSold']) * row['ClosingPrice']) / row['EntRatio'], axis = 1)

#add the days left from the maturity date and the days past the listing date columns
trimData['DaysPastListingDate'] = (pd.to_datetime(trimData['TradeDate']) - pd.to_datetime(trimData['ListingDate'])).dt.days
trimData['DaysFromMaturityDate'] = (pd.to_datetime(trimData['MaturityDate']) - pd.to_datetime(trimData['TradeDate'])).dt.days
trimData['MaturityDaysFromIssuance'] = (pd.to_datetime(trimData['MaturityDate']) - pd.to_datetime(trimData['ListingDate'])).dt.days

# spread
trimData['Spread'] = trimData['AveragePriceSold'] - trimData['AveragePriceBought']
trimData['Spread'] = trimData['Spread'].abs()

# call level relative to strike level
trimData['callVsStrike'] = (trimData['CallLevel'] - trimData['StrikeLevel']) / trimData['StrikeLevel']
trimData['callVsStrike'] = trimData['callVsStrike'].abs()

# print
display(trimData.head())

Unnamed: 0,date,sym,CBBCCode,CBBCName,TradeDate,CBBCsBought,AveragePriceBought,CBBCsSold,AveragePriceSold,Outstanding,OutstandingPct,TotalIssueSize,TradingCurrency,DayHigh,DayLow,ClosingPrice,Volume,Turnover,Issuer,Underlying,BullBear,CBBCType,CBBCCategory,ListingDate,LastTradingDate,MaturityDate,MCE,Strike_CallCurrency,StrikeLevel,CallLevel,EntRatio,DelistingDate,ldt,target,DaysPastListingDate,DaysFromMaturityDate,MaturityDaysFromIssuance,Spread,callVsStrike
0,2016-01-04,60018.hk,60018,CS#HSI RP1601Y,2016-01-04,0.0,0.0,-20000.0,-0.2255,90490000.0,45.25,200000000.0,HKD,0.237,0.223,0.248,120000,28210,CS,HSI,Bear,Standard,R,2015-08-25,2016-01-27,2016-01-28,N,-,23800.0,23600.0,10000,2016-01-29,2019-03-11D19:21:06.181120000,0.496,132,24,156,0.2255,0.008403
1,2016-01-04,60023.hk,60023,CS#HSI RP1602C,2016-01-04,830000.0,0.183771,-510000.0,-0.197137,130960000.0,65.48,200000000.0,HKD,0.205,0.164,0.199,7750000,1478070,CS,HSI,Bear,Standard,R,2015-08-25,2016-02-25,2016-02-26,N,-,23725.0,23475.0,12000,2016-02-29,2019-03-11D19:21:06.181120000,-5.306667,132,53,185,0.380908,0.010537
2,2016-01-04,60045.hk,60045,EA#TENCTRC1610A,2016-01-04,0.0,0.0,0.0,0.0,0.0,0.0,40000000.0,HKD,0.0,0.0,,0,0,EA,00700,Bull,Standard,R,2015-10-02,,2016-10-26,N,HKD,115.0,118.0,100,,2019-03-11D19:21:06.181120000,,94,296,390,0.0,0.026087
3,2016-01-04,60046.hk,60046,SG#HSI RP1602Z,2016-01-04,10000.0,0.249,-280000.0,-0.246714,290000.0,0.15,200000000.0,HKD,0.249,0.246,0.26,290000,71570,SG,HSI,Bear,Standard,R,2015-08-25,2016-02-25,2016-02-26,N,-,23788.0,23688.0,10000,2016-02-29,2019-03-11D19:21:06.181120000,7.02,132,53,185,0.495714,0.004204
4,2016-01-04,60051.hk,60051,SG#HSI RP1602A,2016-01-04,2670000.0,0.227225,-1620000.0,-0.219426,1210000.0,0.61,200000000.0,HKD,0.243,0.2,0.24,4290000,962160,SG,HSI,Bear,Standard,R,2015-08-25,2016-02-25,2016-02-26,N,-,23588.0,23488.0,10000,2016-02-29,2019-03-11D19:21:06.181120000,-25.2,132,53,185,0.446651,0.004239


In [116]:
# feature aggregation
trainData = trimData.groupby(
    [
        'TotalIssueSize',
        'Underlying',
        'BullBear',
        'StrikeLevel',
        'callVsStrike',
        'EntRatio',
        'MaturityDaysFromIssuance'
    ], as_index = True
).agg(
    {
         # find the first trade date
        'TradeDate': [min],
        'TotalIssueSize': 'first',
        'Underlying': 'first',
        'BullBear': 'first',
        'StrikeLevel': 'first',
        'callVsStrike': 'first',
        'EntRatio': 'first',
        'MaturityDaysFromIssuance': 'first',
         # target as time series
        'target': lambda x: list(x)
    }
)
trainData.columns = ["_".join(x) for x in trainData.columns.ravel()]

# column renaming
trainData.rename(
    {
        'TradeDate_min': 'start',
        'target_<lambda>': 'target',
        'TotalIssueSize_first': 'TotalIssueSize',
        'Underlying_first': 'Underlying',
        'BullBear_first': 'BullBear',
        'StrikeLevel_first': 'StrikeLevel',
        'callVsStrike_first': 'callVsStrike',
        'EntRatio_first': 'EntRatio',
        'MaturityDaysFromIssuance_first': 'MaturityDaysFromIssuance'
    },
    axis=1, inplace=True
)

def categoriseTotalIssueSize(row):
    return math.floor(row['TotalIssueSize'] / math.pow(10, 9) * 2)

def categoriseUnderlying(row):
    return 1 if row['Underlying'] == 'HSI' else 0

def categoriseBullBear(row):
    return 1 if row['BullBear'] == 'Bull' else 0

def categoriseStrike(row):
    if row['StrikeLevel'] < 500:
        return math.floor(row['StrikeLevel'] / 100)
    elif row['StrikeLevel'] < 10000:
        return 5
    else:
        return 4 + math.floor(row['StrikeLevel']/5000)
    
def categoriseCallVstrike(row):
    if row['callVsStrike'] < 15:
        return math.floor(row['callVsStrike'] / 2.5)
    else:
        return 3 + math.floor(row['callVsStrike']/5)
    
def categoriseEntRatio(row):
    if row['EntRatio'] < 1000:
        return math.floor(row['EntRatio'] / 500)
    elif row['EntRatio'] < 10000:
        return 2
    elif row['EntRatio'] < 17500:
        return math.floor(row['EntRatio']/2500) - 1
    else:
        return 6

def categoriseMaturityPeriod(row):
    if row['MaturityDaysFromIssuance'] < 100:
        return 0
    elif row['MaturityDaysFromIssuance'] < 300:
        return math.floor(row['MaturityDaysFromIssuance'] / 50) - 1
    else:
        return math.floor(row['MaturityDaysFromIssuance']/100) + 2


# convert the grouped columns into categories ie. values from 0 to x
trainData['TotalIssueSize'].fillna(0, inplace=True)
trainData['TotalIssueSize'] = trainData.apply(categoriseTotalIssueSize, axis = 1)

trainData['Underlying'] = trainData.apply(categoriseUnderlying, axis = 1)

trainData['BullBear'] = trainData.apply(categoriseBullBear, axis = 1)

trainData['StrikeLevel'] = trainData.apply(categoriseStrike, axis = 1)

trainData['callVsStrike'] = trainData.apply(categoriseCallVstrike, axis = 1)

trainData['EntRatio'] = trainData.apply(categoriseEntRatio, axis=1)

trainData['MaturityDaysFromIssuance'] = trainData.apply(categoriseMaturityPeriod, axis=1)


# convert all the grouped categories into the arry
trainData['cat'] = trainData.apply(lambda row: [
    row['TotalIssueSize'], row['Underlying'], row['BullBear'], row['StrikeLevel'],
    row['callVsStrike'], row['EntRatio'], row['MaturityDaysFromIssuance']
], axis = 1)

# drop the grouped columns
trainData.drop(columns = [
    'TotalIssueSize',
    'Underlying',
    'BullBear',
    'StrikeLevel',
    'callVsStrike',
    'EntRatio',
    'MaturityDaysFromIssuance'
], inplace=True)

# date transformation
trainData['start'] = pd.to_datetime(trainData['start'])
trainData['start'] = trainData['start'].dt.strftime('%Y-%m-%d %H:%M:%S')

trainData.info()
trainData.head()

<class 'pandas.core.frame.DataFrame'>
MultiIndex: 37858 entries, (2100000.0, 00700, Bull, 372.2, 0.007522837184309542, 10, 204) to (1000000000.0, HSI, Bull, 25678.0, 0.01363034504244879, 10000, 497)
Data columns (total 3 columns):
start     37858 non-null object
target    37858 non-null object
cat       37858 non-null object
dtypes: object(3)
memory usage: 1.4+ MB


Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,Unnamed: 4_level_0,Unnamed: 5_level_0,Unnamed: 6_level_0,start,target,cat
TotalIssueSize,Underlying,BullBear,StrikeLevel,callVsStrike,EntRatio,MaturityDaysFromIssuance,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2100000.0,700,Bull,372.2,0.007523,10,204,2018-05-24 00:00:00,"[-0.0, 9900.0, -8820.0, 6477.0, 7130.0, 1062.0...","[0, 0, 1, 3, 0, 0, 3]"
3200000.0,700,Bear,407.88,0.006865,10,205,2018-05-23 00:00:00,[nan],"[0, 0, 0, 4, 0, 0, 3]"
3500000.0,700,Bull,167.88,0.014892,10,183,2016-12-29 00:00:00,"[-0.0, -0.0, -0.0, -0.0, -0.0, -0.0, -0.0, -0....","[0, 0, 1, 1, 0, 0, 2]"
4400000.0,700,Bear,375.81,0.007451,50,241,2019-03-19 00:00:00,"[-0.0, -0.0, -0.0, -0.0, 47.5, -0.0, -41.5, -0...","[0, 0, 0, 3, 0, 0, 3]"
4700000.0,700,Bear,366.61,0.007638,50,251,2019-03-22 00:00:00,[nan],"[0, 0, 0, 3, 0, 0, 4]"


In [117]:
outpath = DATA_LOCATION + '/b4EtlOutput/train.json'

json_buffer = StringIO()
trainData.to_json(json_buffer, orient='records', lines=True)
s3_resource = boto3.resource('s3')
s3_resource.Object(BUCKET, 'b4EtlOutput/train.json').put(Body=json_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '0AD2060F638F60E5',
  'HostId': 'rPkGDphG842JYdVEHmFoQ4310AYB6rbq4LoD/WZSwML8Hv36VyCJvXhLRKNIi+vJxqNshUgMDvw=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'rPkGDphG842JYdVEHmFoQ4310AYB6rbq4LoD/WZSwML8Hv36VyCJvXhLRKNIi+vJxqNshUgMDvw=',
   'x-amz-request-id': '0AD2060F638F60E5',
   'date': 'Wed, 17 Jul 2019 07:45:54 GMT',
   'etag': '"f815016c15a6740c0ef380d296fe9c17"',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'ETag': '"f815016c15a6740c0ef380d296fe9c17"'}