In [None]:
!pip install dateparser

In [None]:
!pip install pyts

In [None]:
import os
import boto3
import pandas as pd
import sys
from datetime import datetime
from datetime import timedelta 
import dateparser as dp
from matplotlib import pyplot
import numpy as np
import matplotlib.pyplot as plt
from pyts.image import GramianAngularField
from mpl_toolkits.axes_grid1 import ImageGrid
from pyts.image import MarkovTransitionField
from PIL import Image
from pyts.image import RecurrencePlot
import io
import matplotlib.image as mpimg
import time
from sklearn.preprocessing import MinMaxScaler
import gc
import psutil
from tqdm import tqdm

%matplotlib inline 

In [None]:
class AnomalyLabelGenerator:
    
    def isConsistentIncrease(self, x):
        isConsisIncrease = True
        i=0
        while ((i < len(x)) and (isConsisIncrease)):
            if (i+1 < len(x)):
                isConsisIncrease = isConsisIncrease & (x.iloc[i] <= x.iloc[i+1])
            i += 1
        return isConsisIncrease
    

    def isOpenToClosePercentageBeyondThreshold(self, x, percentage):
        return (((x.iloc[-1] - x.iloc[0])/x.iloc[0]*100) >= percentage)
    

    def isWindowClosingTurnoverSignificant(self, x, percentage):
        cumSumUptoClose = x.iloc[:-1].sum()
        if ((cumSumUptoClose*percentage/100) <= x.iloc[-1]):
            return True

        return False
    
    def isWidowsMaxSignificant(self, x, percentage):
        sumExceptMax = x.sum() - x.max()
        if sumExceptMax*percentage/100 <= x.max():
            return True
        
        return False
    
    def isNextValBelowThreshold(self, x, percentage):
        return ((x.iloc[0]*percentage/100) > x.iloc[1])
    
    def isLeftWindowPumping(self, x):
        return (x.iloc[0] < x.iloc[int(len(x)/2)] < x.iloc[-1])
    
    def isRightWindowDumping(self, x):
        return ( (x.iloc[0] > x.iloc[int(len(x)/2)]) and (x.iloc[0] > x.iloc[-1]) )

labelGen = AnomalyLabelGenerator();
TURNOVER_ROLLING_WINDOW_SIZE = '5Min'
TURNOVER_MIN_POINTS=5
PRICE_ROLLING_WINDOW_SIZE = '10Min'
PRICE_MIN_POINTS=10
OPEN_TO_CLOSE_TURNOVER_INCREASE_PERCENTAGE = 75
PRICE_DROP_PERCENTAGE = 100

BUCKET_NAME = <s3_bucket_name>
dataLocationCommonPath = <path_to_download_dir_on_s3>
destS3Url = <s3_url>

In [None]:
def insertNewFeaturesToDf(filename, df, featureStats=None):
    t1 = time.time()
        
    featureStats.write('*' *100)
    featureStats.write('\nAdding features to df for file [%s]\n' %filename)
    
    df['Price'] = (df['Open'] + df['High'] + df['Low'] + df['Close'])/4
    df['Turnover'] = df['Price'] * df['Volume']
    df['RollingPrice'] = df['Price'].rolling(PRICE_ROLLING_WINDOW_SIZE, center=False).mean()
    
    df['isOpenToClosePriceAbove10P'] = df['Price'].rolling(PRICE_ROLLING_WINDOW_SIZE, center=False, min_periods=PRICE_MIN_POINTS).apply(labelGen.isOpenToClosePercentageBeyondThreshold, kwargs={'percentage':10}).fillna(0)
    featureStats.write(df.groupby('isOpenToClosePriceAbove10P')['isOpenToClosePriceAbove10P'].agg(['count']).to_string())
    featureStats.write('\n')
    
    df['isOpenToClosePriceAbove15P'] = df['Price'].rolling(PRICE_ROLLING_WINDOW_SIZE, center=False, min_periods=PRICE_MIN_POINTS).apply(labelGen.isOpenToClosePercentageBeyondThreshold, kwargs={'percentage':15}).fillna(0)
    featureStats.write(df.groupby('isOpenToClosePriceAbove15P')['isOpenToClosePriceAbove15P'].agg(['count']).to_string())
    featureStats.write('\n')
    
    df['isOpenToClosePriceAbove20P'] = df['Price'].rolling(PRICE_ROLLING_WINDOW_SIZE, center=False, min_periods=PRICE_MIN_POINTS).apply(labelGen.isOpenToClosePercentageBeyondThreshold, kwargs={'percentage':20}).fillna(0)
    featureStats.write(df.groupby('isOpenToClosePriceAbove20P')['isOpenToClosePriceAbove20P'].agg(['count']).to_string())
    featureStats.write('\n')
    
    df['isWindowClosingTurnoverSignificant'] = df['Turnover'].rolling(TURNOVER_ROLLING_WINDOW_SIZE, center=False, min_periods=TURNOVER_MIN_POINTS).apply(labelGen.isWindowClosingTurnoverSignificant, kwargs={'percentage':OPEN_TO_CLOSE_TURNOVER_INCREASE_PERCENTAGE}).fillna(0)
    featureStats.write(df.groupby('isWindowClosingTurnoverSignificant')['isWindowClosingTurnoverSignificant'].agg(['count']).to_string())
    featureStats.write('\n')
    
    df['isLeftWindowPumping'] = df['Price'].rolling(PRICE_ROLLING_WINDOW_SIZE, center=False, min_periods=PRICE_MIN_POINTS).apply(labelGen.isLeftWindowPumping).fillna(0)
    featureStats.write(df.groupby('isLeftWindowPumping')['isLeftWindowPumping'].agg(['count']).to_string())
    featureStats.write('\n')
    
    df['isRightWindowDumping'] = df['Price'].shift(-10).rolling(PRICE_ROLLING_WINDOW_SIZE, center=False, min_periods=PRICE_MIN_POINTS).apply(labelGen.isRightWindowDumping).fillna(0)
    featureStats.write(df.groupby('isRightWindowDumping')['isRightWindowDumping'].agg(['count']).to_string())
    featureStats.write('\n')
    
    featureStats.write('\nElapsed time to insert Features={}\n'.format(time.time()-t1))

In [None]:
s3 = boto3.resource('s3')
my_bucket = s3.Bucket(BUCKET_NAME)

csvFileList = []

for my_bucket_object in my_bucket.objects.filter(Prefix='{}/OriginalData/'.format(dataLocationCommonPath)):
    if '.csv' in my_bucket_object.key:
        print(my_bucket_object.key)
        csvFileList.append(my_bucket_object.key)
        
print("Total number of loaded csv files=[%d]" %len(csvFileList))

# Feature Enrichment main loop

In [None]:
featureStats = open('TwoYearsDataEnrichment_10P_15P_20P_Pump.txt','w')
fileIndex = 0

startTime = time.time()
s3 = boto3.client('s3') 

for file_name in tqdm(csvFileList):
    print('Processing file [%s]' %file_name)
    coin_name = file_name.split('/')[-1].split('_')[0]
    
    fileIndex +=1
    
    obj = s3.get_object(Bucket = BUCKET_NAME, Key = file_name)
    df = pd.read_csv(obj['Body'], index_col='0', parse_dates=True)
    
    df.drop(['Close Time', 'Quote Asset Volume', 'Taker buy base asset volume', 'Taker buy quote asset volume', 'Ignore'], axis=1, inplace=True)
    
    insertNewFeaturesToDf(file_name, df, featureStats)
    
    df.to_csv(destS3Url + "Enriched_" + file_name.split('/')[-1] , index=True)    

featureStats.write('\nTotal elapsed time to insert all features={}\n'.format(time.time()-startTime))
featureStats.close()