In [None]:
#input must be options chain day1 - end

In [23]:
import wandb
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist
from tensorflow.keras.utils import to_categorical
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import joblib
import random

In [24]:
CSV_PATH     = './data/OptionsEOD.csv/'
PARQUET_PATH = './data/OptionsEOD.parquet'
PARQUET_STG_PATH = './data/OptionsEOD_STG.parquet'
SCALER_COL  = ['DTE','INTRINSIC_VALUE','TOTAL_C_VOLUME', 'TOTAL_P_VOLUME',	'C_BID',	'C_ASK', 'C_VOLUME',  'P_BID',	'P_ASK',	'P_VOLUME' ]
SCALER_PATH = './data/scaler.gz'
 
UNIQUE_KEYS = ['QUOTE_DATE','SYMBOL','EXPIRE_DATE']

In [None]:
#example
EOD_CSV = pd.read_csv(CSV_PATH+"qqq/qqq_eod_201201.txt", engine='pyarrow')
EOD_CSV.head()

In [None]:
EOD_CSV.columns

In [None]:
#Part I
#TransformData : 
#-each partition from EXPIRE_DATE 
#-csv too parquet
#-col. rename 
def TransformDataI():
    scaler = MinMaxScaler()
    schema = None
    pqwriter = None
    for d in os.listdir(CSV_PATH):
        for f in os.listdir(CSV_PATH+f"{d}/"):
            if f.endswith(".txt"):
                ## load
                print( f"[LOAD] : {CSV_PATH}{d}/{f}        ",end='\r')
                EOD_CSV = pd.read_csv(CSV_PATH+f"{d}/"+f, engine='pyarrow')
                    
                ## rename col.
                for c in EOD_CSV.columns:
                    EOD_CSV = EOD_CSV.rename( columns={ c:c.strip().replace(']','').replace('[','') } )
                
                ## add symbol 
                EOD_CSV['SYMBOL'] = d.upper()
                ## add INTRINSIC_VALUE
                EOD_CSV['INTRINSIC_VALUE'] = EOD_CSV['UNDERLYING_LAST'] - EOD_CSV['STRIKE']
                
                ## fillnafillna
                EOD_CSV['P_VOLUME'] = EOD_CSV['P_VOLUME'].fillna(0)
                EOD_CSV['C_VOLUME'] = EOD_CSV['C_VOLUME'].fillna(0)


                
                # date columns convert to datetime
                for c in ["QUOTE_READTIME","QUOTE_DATE","EXPIRE_DATE"]:
                    EOD_CSV[c] = pd.to_datetime(EOD_CSV[c])
                
                #clean float data
                for c in ['INTRINSIC_VALUE','C_DELTA','C_GAMMA','C_VEGA','C_THETA','C_RHO','C_IV','C_VOLUME','C_LAST','C_BID','C_ASK','STRIKE','P_BID','P_ASK','P_LAST','P_DELTA','P_GAMMA','P_VEGA','P_THETA','P_RHO','P_IV','P_VOLUME','STRIKE_DISTANCE','STRIKE_DISTANCE_PCT']:
                    if EOD_CSV[c].dtype not in ( 'float32','float64'):
                        EOD_CSV[c] = EOD_CSV[c].apply(lambda x: x.strip())
                        EOD_CSV[c] = EOD_CSV[c].replace('', np.nan).fillna(np.nan)
                        EOD_CSV[c] = EOD_CSV[c].astype('float64')
                    if EOD_CSV[c].dtype == 'float32':
                        EOD_CSV[c] = EOD_CSV[c].astype('float64')
                        
                # REMAIN_DAYS(int) =>  use DTE col.
                #partition with QUOTE_DATE
                EOD_CSV['PartitionDate'] = EOD_CSV['QUOTE_DATE'].dt.strftime('%Y-%m')
                EOD_CSV.sort_values(['QUOTE_DATE','EXPIRE_DATE','SYMBOL','STRIKE'],ascending =False ) 

                #scaler(Normalization_
                #scaler.partial_fit(EOD_CSV[SCALER_COL])

                # save
                if os.path.exists(PARQUET_PATH):
                  EOD_CSV.to_parquet(PARQUET_PATH, engine='fastparquet', append=True, partition_cols=['PartitionDate'], index=False )
                else:
                  EOD_CSV.to_parquet(PARQUET_PATH, engine='fastparquet' , partition_cols=['PartitionDate'], index=False  )
                    
    # joblib.dump(scaler, SCALER_PATH )
    # if pqwriter:
    #     pqwriter.close()
    # print( f"[DONE]                                                       ",end='\r')



In [None]:
##-RunCleanData
#TransformDataI()

In [None]:
#Part II 
#TransformData : 
# - read each partitions 
# - Normalization if not have scaler.gz file
def strikeZero(df,v,num_rm):
    # First, filter based on QUOTE_DATE, SYMBOL, and EXPIRE_DATE
    filtered_arr = df[(df['QUOTE_DATE'] == v['QUOTE_DATE']) &
                     (df['SYMBOL'] == v['SYMBOL']) &
                     (df['EXPIRE_DATE'] == v['EXPIRE_DATE'])]['STRIKE'].values
    
    # print(max_intrinsic_value)
    # print(min_intrinsic_value)
    df.loc[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) 
            & (df['SYMBOL'] == v['SYMBOL']) 
            & (df['EXPIRE_DATE'] == v['EXPIRE_DATE'])
            & (  (df['STRIKE'].isin(filtered_arr[:5].values)  )
               | (df['STRIKE'].isin(filtered_arr[-5:].values)   )
              ) 
        , ['DTE', 'INTRINSIC_VALUE', 'C_BID',	'C_ASK', 'C_VOLUME',  'P_BID',	'P_ASK',	'P_VOLUME']       
    ] = 0
    

def TransformDataII():
    
    keys = None#df[unique_keys].sort_values(by=unique_keys).drop_duplicates()
    max_option_len = 20
    scaler = MinMaxScaler()
    PartitionDate = [ d[-7:] for d in  os.listdir(PARQUET_PATH) if 'PartitionDate' in d]
    options_qoute = {}
    for i,partdate in enumerate(PartitionDate) :  
        df = pd.read_parquet(PARQUET_PATH,engine='pyarrow'
                                     , filters=[('PartitionDate', '=', partdate)]
                                    )
        #add col options_id
        df['OPTIONS_ID'] = None
        ####################################################
        keys = df[UNIQUE_KEYS].sort_values(by=UNIQUE_KEYS).drop_duplicates()
        #loop each keys
        for j,v in keys.iterrows():
            
            df_filter=df[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) & (df['SYMBOL'] == v['SYMBOL']) & (df['EXPIRE_DATE'] == v['EXPIRE_DATE']) ]
            qoute = "".join(v[ ['SYMBOL','EXPIRE_DATE'] ].apply(str).values)
            #add qoute
            if qoute not in [*options_qoute.keys()]:
                options_qoute[qoute] = {}
                options_qoute[qoute]['start_price'] = df_filter['UNDERLYING_LAST'].values[0]
                options_qoute[qoute]['strike'] = df_filter[ df_filter['INTRINSIC_VALUE'].abs().isin(df_filter['INTRINSIC_VALUE'].abs().sort_values()[:max_option_len]) ]['STRIKE'].values
                options_qoute[qoute]['exp'] = df_filter['EXPIRE_DATE'].values[0]
                #check diff UNDERLYING_LAST
                if df_filter['UNDERLYING_LAST'].values[0] != round(np.average(df_filter['UNDERLYING_LAST']),4):
                    print('[ERROR] : set UNDERLYING_LAST ',qoute )
            #rm index max : max_option_len
            rm_strike_index = df_filter[ ~df_filter['STRIKE'].isin(options_qoute[qoute]['strike']) ].index
            df = df.drop(rm_strike_index)
            df_filter = df_filter.drop(rm_strike_index)

            # Generate zero strike 
            # Generate a random float between 0.01 and 1
            random_number = random.uniform(0, 1)
            nom_rm_rows = [r  for r in [0.05,0.1,0.15,0.2,0.25,0.3,0.35,0.4] if random_number < r]
            strikeZero(df,v,len(nom_rm_rows) )
            #TOTAL_P_VOLUME,TOTAL_C_VOLUME,OPTIONS_ID
            df.loc[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) 
            & (df['SYMBOL'] == v['SYMBOL']) 
            & (df['EXPIRE_DATE'] == v['EXPIRE_DATE']) 
            , ['TOTAL_P_VOLUME','TOTAL_C_VOLUME','OPTIONS_ID']
            ] = [
                df_filter['P_VOLUME'].sum(),
                df_filter['C_VOLUME'].sum(),
                pd.util.hash_pandas_object(pd.Series([i,j])).sum()
            ]
            
        #clear expire options_qoute
        for qi in list(options_qoute.keys()):
            if options_qoute[qi]['exp'] < df['QUOTE_DATE'].values[0]:
                options_qoute.pop(qi)
    
        ###################################################

        # scaler.partial_fit
        scaler.partial_fit(df[SCALER_COL])
        print(f"[Processing] {round(((i+1)/len(PartitionDate))*100,2)}%   ",end='\r')
        
        # save
        if os.path.exists(PARQUET_STG_PATH):
          df.to_parquet(PARQUET_STG_PATH, engine='fastparquet', append=True, partition_cols=['PartitionDate'], index=False )
        else:
          df.to_parquet(PARQUET_STG_PATH, engine='fastparquet' , partition_cols=['PartitionDate'], index=False  )
            
    joblib.dump(scaler, SCALER_PATH )

In [None]:
TransformDataII()

In [25]:
#Example data
PartitionDate = [ d[-7:] for d in  os.listdir(PARQUET_STG_PATH) if 'PartitionDate' in d]
options_qoute = {}
for i,partdate in enumerate(PartitionDate[5:6]) :  
    df = pd.read_parquet(PARQUET_PATH,engine='pyarrow'
                                 , filters=[('PartitionDate', '=', partdate)]
                                )
    break

In [26]:
df

Unnamed: 0,QUOTE_UNIXTIME,QUOTE_READTIME,QUOTE_DATE,QUOTE_TIME_HOURS,UNDERLYING_LAST,EXPIRE_DATE,EXPIRE_UNIX,DTE,C_DELTA,C_GAMMA,...,P_VEGA,P_THETA,P_RHO,P_IV,P_VOLUME,STRIKE_DISTANCE,STRIKE_DISTANCE_PCT,SYMBOL,INTRINSIC_VALUE,PartitionDate
0,1338580800,2012-06-01 16:00:00,2012-06-01,16.0,128.17,2012-06-01,1338580800,0.00,0.93111,0.01477,...,0.00061,-0.00465,0.00000,0.74919,0.0,13.2,0.103,SPY,13.17,2012-06
1,1338580800,2012-06-01 16:00:00,2012-06-01,16.0,128.17,2012-06-01,1338580800,0.00,0.92158,0.01731,...,0.00086,-0.00519,-0.00022,0.69439,0.0,12.2,0.095,SPY,12.17,2012-06
2,1338580800,2012-06-01 16:00:00,2012-06-01,16.0,128.17,2012-06-01,1338580800,0.00,0.91846,0.01973,...,0.00028,-0.00507,-0.00007,0.63877,0.0,11.2,0.087,SPY,11.17,2012-06
3,1338580800,2012-06-01 16:00:00,2012-06-01,16.0,128.17,2012-06-01,1338580800,0.00,0.95615,0.01983,...,0.00111,-0.00515,-0.00044,0.58294,0.0,10.2,0.079,SPY,10.17,2012-06
4,1338580800,2012-06-01 16:00:00,2012-06-01,16.0,128.17,2012-06-01,1338580800,0.00,0.95448,0.02382,...,0.00103,-0.00516,0.00000,0.52753,0.0,9.2,0.072,SPY,9.17,2012-06
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
68728,1341000000,2012-06-29 16:00:00,2012-06-29,16.0,64.14,2014-12-19,1419022800,903.04,0.40045,0.02129,...,0.37628,-0.00434,-0.70972,0.32386,,10.9,0.169,QQQ,-10.86,2012-06
68729,1341000000,2012-06-29 16:00:00,2012-06-29,16.0,64.14,2014-12-19,1419022800,903.04,0.30755,0.02010,...,0.36639,-0.00465,-0.73891,0.33269,0.0,15.9,0.247,QQQ,-15.86,2012-06
68730,1341000000,2012-06-29 16:00:00,2012-06-29,16.0,64.14,2014-12-19,1419022800,903.04,0.26723,0.01683,...,0.34916,-0.00428,-0.74556,0.33833,,20.9,0.325,QQQ,-20.86,2012-06
68731,1341000000,2012-06-29 16:00:00,2012-06-29,16.0,64.14,2014-12-19,1419022800,903.04,0.17054,0.01484,...,0.33047,-0.00343,-0.74398,0.34950,,25.9,0.403,QQQ,-25.86,2012-06


In [21]:
keys = df[UNIQUE_KEYS].sort_values(by=UNIQUE_KEYS).drop_duplicates()
#loop each keys
for j,v in keys.iterrows():

    df_filter=df[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) 
        & (df['SYMBOL'] == v['SYMBOL']) 
        & (df['EXPIRE_DATE'] == v['EXPIRE_DATE']) 
    ]

    df_filter=df_filter[
        (df_filter['INTRINSIC_VALUE'].max() == df_filter['INTRINSIC_VALUE'])
    |   (df_filter['INTRINSIC_VALUE'].min() == df_filter['INTRINSIC_VALUE'])
    ]
    break

NameError: name 'df' is not defined

In [None]:
3============== sampling show 3===========================

In [None]:
PartitionDate = [ d[-7:] for d in  os.listdir(PARQUET_STG_PATH) if 'PartitionDate' in d]

In [27]:
import random
from IPython.display import clear_output,display, HTML

for i in range(10):
    ran_partiion=random.choice(PartitionDate)

    df = pd.read_parquet(PARQUET_STG_PATH,engine='pyarrow'
                                 , filters=[('PartitionDate', '=', ran_partiion)]
                                )
    key = random.choice( [*df[UNIQUE_KEYS].sort_values(by=UNIQUE_KEYS).drop_duplicates().values] )

    df_filtered=df[ (df['QUOTE_DATE'] == key[0]) 
        & (df['SYMBOL'] == key[1]) 
        & (df['EXPIRE_DATE'] == key[2]) 
    ]
    break
    display(HTML(df_filter[['STRIKE']+SCALER_COL].to_html()))
    input('Next ...')
    clear_output()


In [42]:
df_filtered['STRIKE'].values

array([80.63, 81.63, 82.63, 83.63, 84.63, 85.63, 86.63, 87.63, 88.63,
       89.63, 90.63, 91.63, 92.63, 93.63, 94.63, 95.63, 96.63, 97.63,
       98.63, 99.63])

array([80.63, 81.63, 82.63, 83.63, 84.63])

array([95.63, 96.63, 97.63, 98.63, 99.63])

In [3]:
    max_value = filtered_df[['STRIKE']].sort_values(by=['STRIKE'], ascending=True)
    min_value = filtered_df[['STRIKE']].sort_values(by=['STRIKE'], ascending=False)

Host 10.7.55.66 is reachable.
Port 8080 is open.


In [17]:
pd.util.hash_pandas_object(pd.Series([2,1])).sum()

14543938847800026030

In [19]:
pd.util.hash_pandas_object(pd.Series([2,0])).sum()

14468241752409847283

In [20]:
pd.util.hash_pandas_object(pd.Series([0,2])).sum()

7582650688657438907