In [None]:
#input must be options chain day1 - end

In [33]:
import wandb
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist
from tensorflow.keras.utils import to_categorical
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import joblib
import random

2024-06-17 01:46:18.539069: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [42]:
CSV_PATH     = './data/OptionsEOD.csv/'
PARQUET_PATH = './data/OptionsEOD.parquet'
PARQUET_STG_PATH = './data/OptionsEOD_STG.parquet'
SCALER_COL  = ['DTE','INTRINSIC_VALUE','TOTAL_C_VOLUME', 'TOTAL_P_VOLUME',	'C_BID',	'C_ASK', 'C_VOLUME',  'P_BID',	'P_ASK',	'P_VOLUME' ]
SCALER_PATH = './data/scaler.gz'
 
UNIQUE_KEYS = ['QUOTE_DATE','SYMBOL','EXPIRE_DATE']

In [None]:
#example
EOD_CSV = pd.read_csv(CSV_PATH+"qqq/qqq_eod_201201.txt", engine='pyarrow')
EOD_CSV.head()

In [None]:
EOD_CSV.columns

In [None]:
#Part I
#TransformData : 
#-each partition from EXPIRE_DATE 
#-csv too parquet
#-col. rename 
def TransformDataI():
    scaler = MinMaxScaler()
    schema = None
    pqwriter = None
    for d in os.listdir(CSV_PATH):
        for f in os.listdir(CSV_PATH+f"{d}/"):
            if f.endswith(".txt"):
                ## load
                print( f"[LOAD] : {CSV_PATH}{d}/{f}        ",end='\r')
                EOD_CSV = pd.read_csv(CSV_PATH+f"{d}/"+f, engine='pyarrow')
                    
                ## rename col.
                for c in EOD_CSV.columns:
                    EOD_CSV = EOD_CSV.rename( columns={ c:c.strip().replace(']','').replace('[','') } )
                
                ## add symbol 
                EOD_CSV['SYMBOL'] = d.upper()
                ## add INTRINSIC_VALUE
                EOD_CSV['INTRINSIC_VALUE'] = EOD_CSV['UNDERLYING_LAST'] - EOD_CSV['STRIKE']
                
                ## fillnafillna
                EOD_CSV['P_VOLUME'] = EOD_CSV['P_VOLUME'].fillna(0)
                EOD_CSV['C_VOLUME'] = EOD_CSV['C_VOLUME'].fillna(0)
                
                # date columns convert to datetime
                for c in ["QUOTE_READTIME","QUOTE_DATE","EXPIRE_DATE"]:
                    EOD_CSV[c] = pd.to_datetime(EOD_CSV[c])
                
                #clean float data
                for c in ['INTRINSIC_VALUE','C_DELTA','C_GAMMA','C_VEGA','C_THETA','C_RHO','C_IV','C_VOLUME','C_LAST','C_BID','C_ASK','STRIKE','P_BID','P_ASK','P_LAST','P_DELTA','P_GAMMA','P_VEGA','P_THETA','P_RHO','P_IV','P_VOLUME','STRIKE_DISTANCE','STRIKE_DISTANCE_PCT']:
                    if EOD_CSV[c].dtype not in ( 'float32','float64'):
                        EOD_CSV[c] = EOD_CSV[c].apply(lambda x: x.strip())
                        EOD_CSV[c] = EOD_CSV[c].replace('', np.nan).fillna(np.nan)
                        EOD_CSV[c] = EOD_CSV[c].astype('float64')
                    if EOD_CSV[c].dtype == 'float32':
                        EOD_CSV[c] = EOD_CSV[c].astype('float64')
                        
                # REMAIN_DAYS(int) =>  use DTE col.
                #partition with QUOTE_DATE
                EOD_CSV['PartitionDate'] = EOD_CSV['QUOTE_DATE'].dt.strftime('%Y-%m')
                EOD_CSV.sort_values(['QUOTE_DATE','EXPIRE_DATE','SYMBOL','STRIKE'],ascending =False ) 

                #scaler(Normalization_
                #scaler.partial_fit(EOD_CSV[SCALER_COL])

                # save
                if os.path.exists(PARQUET_PATH):
                  EOD_CSV.to_parquet(PARQUET_PATH, engine='fastparquet', append=True, partition_cols=['PartitionDate'], index=False )
                else:
                  EOD_CSV.to_parquet(PARQUET_PATH, engine='fastparquet' , partition_cols=['PartitionDate'], index=False  )
                    
    # joblib.dump(scaler, SCALER_PATH )
    # if pqwriter:
    #     pqwriter.close()
    # print( f"[DONE]                                                       ",end='\r')



In [None]:
##-RunCleanData
#TransformDataI()

In [None]:
#Part II 
#TransformData : 
# - read each partitions 
# - Normalization if not have scaler.gz file
def strikeZero(df,v)
    # First, filter based on QUOTE_DATE, SYMBOL, and EXPIRE_DATE
    filtered_df = df[(df['QUOTE_DATE'] == v['QUOTE_DATE']) &
                     (df['SYMBOL'] == v['SYMBOL']) &
                     (df['EXPIRE_DATE'] == v['EXPIRE_DATE'])]
    
    # Then, find the max and min INTRINSIC_VALUE within the filtered subset
    max_intrinsic_value = filtered_df['INTRINSIC_VALUE'].max()
    min_intrinsic_value = filtered_df['INTRINSIC_VALUE'].min()

    df.loc[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) 
            & (df['SYMBOL'] == v['SYMBOL']) 
            & (df['EXPIRE_DATE'] == v['EXPIRE_DATE']
            & (  df['INTRINSIC_VALUE'] == max_intrinsic_value 
               | df['INTRINSIC_VALUE'] == min_intrinsic_value
              ) 
        , ['DTE', 'INTRINSIC_VALUE', 'C_BID',	'C_ASK', 'C_VOLUME',  'P_BID',	'P_ASK',	'P_VOLUME']       
    ] = 0
    
def TransformDataII():
    
    keys = None#df[unique_keys].sort_values(by=unique_keys).drop_duplicates()
    max_option_len = 20
    scaler = MinMaxScaler()
    PartitionDate = [ d[-7:] for d in  os.listdir(PARQUET_PATH) if 'PartitionDate' in d]
    options_qoute = {}
    for i,partdate in enumerate(PartitionDate) :  
        df = pd.read_parquet(PARQUET_PATH,engine='pyarrow'
                                     , filters=[('PartitionDate', '=', partdate)]
                                    )
        ####################################################
        keys = df[UNIQUE_KEYS].sort_values(by=UNIQUE_KEYS).drop_duplicates()
        #loop each keys
        for j,v in keys.iterrows():
            
            df_filter=df[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) & (df['SYMBOL'] == v['SYMBOL']) & (df['EXPIRE_DATE'] == v['EXPIRE_DATE']) ]
            qoute = "".join(v[ ['SYMBOL','EXPIRE_DATE'] ].apply(str).values)
            #add qoute
            if qoute not in [*options_qoute.keys()]:
                options_qoute[qoute] = {}
                options_qoute[qoute]['start_price'] = df_filter['UNDERLYING_LAST'].values[0]
                options_qoute[qoute]['strike'] = df_filter[ df_filter['INTRINSIC_VALUE'].abs().isin(df_filter['INTRINSIC_VALUE'].abs().sort_values()[:max_option_len]) ]['STRIKE'].values
                options_qoute[qoute]['exp'] = df_filter['EXPIRE_DATE'].values[0]
                #check diff UNDERLYING_LAST
                if df_filter['UNDERLYING_LAST'].values[0] != round(np.average(df_filter['UNDERLYING_LAST']),4):
                    print('[ERROR] : set UNDERLYING_LAST ',qoute )
            #rm index max : max_option_len
            rm_strike_index = df_filter[ ~df_filter['STRIKE'].isin(options_qoute[qoute]['strike']) ].index
            df = df.drop(rm_strike_index)

            # Generate zero strike 
            # Generate a random float between 0.01 and 1
            random_number = random.uniform(0, 1)
            [ strikeZero(df,v) for r in [0.05,0.1,0.15,0.02] if random_number < r]:
                
            #TOTAL_P_VOLUME
            df.loc[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) 
            & (df['SYMBOL'] == v['SYMBOL']) 
            & (df['EXPIRE_DATE'] == v['EXPIRE_DATE']) 
            , 'TOTAL_P_VOLUME'] = df_filter['P_VOLUME'].sum()
        
            #TOTAL_P_VOLUME
            df.loc[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) 
            & (df['SYMBOL'] == v['SYMBOL']) 
            & (df['EXPIRE_DATE'] == v['EXPIRE_DATE']) 
            , 'TOTAL_C_VOLUME'] = df_filter['C_VOLUME'].sum()
    
        #rm options_qoute
        for qi in list(options_qoute.keys()):
            if options_qoute[qi]['exp'] < df['QUOTE_DATE'].values[0]:
                options_qoute.pop(qi)
    
        #filled na
        df['P_VOLUME'] = df['P_VOLUME'].fillna(0)
        df['C_VOLUME'] = df['C_VOLUME'].fillna(0)
        ###################################################

        # scaler.partial_fit
        scaler.partial_fit(df[SCALER_COL])
        print(f"[Processing] {round(((i+1)/len(PartitionDate))*100,2)}%   ",end='\r')
        
        # save
        if os.path.exists(PARQUET_STG_PATH):
          df.to_parquet(PARQUET_STG_PATH, engine='fastparquet', append=True, partition_cols=['PartitionDate'], index=False )
        else:
          df.to_parquet(PARQUET_STG_PATH, engine='fastparquet' , partition_cols=['PartitionDate'], index=False  )
            
    joblib.dump(scaler, SCALER_PATH )

In [None]:
TransformDataII()

In [35]:
#Example data
PartitionDate = [ d[-7:] for d in  os.listdir(PARQUET_STG_PATH) if 'PartitionDate' in d]
options_qoute = {}
for i,partdate in enumerate(PartitionDate[5:6]) :  
    df = pd.read_parquet(PARQUET_PATH,engine='pyarrow'
                                 , filters=[('PartitionDate', '=', partdate)]
                                )
    break

In [68]:

keys = df[UNIQUE_KEYS].sort_values(by=UNIQUE_KEYS).drop_duplicates()
#loop each keys
for j,v in keys.iterrows():

    df_filter=df[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) 
        & (df['SYMBOL'] == v['SYMBOL']) 
        & (df['EXPIRE_DATE'] == v['EXPIRE_DATE']) 
    ]

    df_filter=df_filter[
        (df_filter['INTRINSIC_VALUE'].max() == df_filter['INTRINSIC_VALUE'])
    |   (df_filter['INTRINSIC_VALUE'].min() == df_filter['INTRINSIC_VALUE'])
    ]
    break

In [69]:
df_filter

Unnamed: 0,QUOTE_UNIXTIME,QUOTE_READTIME,QUOTE_DATE,QUOTE_TIME_HOURS,UNDERLYING_LAST,EXPIRE_DATE,EXPIRE_UNIX,DTE,C_DELTA,C_GAMMA,...,P_VEGA,P_THETA,P_RHO,P_IV,P_VOLUME,STRIKE_DISTANCE,STRIKE_DISTANCE_PCT,SYMBOL,INTRINSIC_VALUE,PartitionDate
56588,1338580800,2012-06-01 16:00:00,2012-06-01,16.0,60.41,2012-06-01,1338580800,0.0,0.89779,0.02384,...,0.00072,-0.00464,0.0,1.29713,,8.4,0.139,QQQ,8.41,2012-06
56607,1338580800,2012-06-01 16:00:00,2012-06-01,16.0,60.41,2012-06-01,1338580800,0.0,0.00463,0.00353,...,0.0,0.0,0.0,,,10.6,0.175,QQQ,-10.59,2012-06


In [None]:
& 

In [55]:
df['INTRINSIC_VALUE'].max()

1262.17

In [None]:
===================================================================

In [None]:
def strikeZero(df,v)
    df.loc[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) 
            & (df['SYMBOL'] == v['SYMBOL']) 
            & (df['EXPIRE_DATE'] == v['EXPIRE_DATE']
            & (  df['STRIKE'].max() | df['STRIKE'].min()
              ) 
        , ['DTE', 'INTRINSIC_VALUE', 'C_BID',	'C_ASK', 'C_VOLUME',  'P_BID',	'P_ASK',	'P_VOLUME']       
    ]

unique_keys = ['QUOTE_DATE','SYMBOL','EXPIRE_DATE']
keys = None#df[unique_keys].sort_values(by=unique_keys).drop_duplicates()
max_option_len = 20
scaler = MinMaxScaler()
PartitionDate = [ d[-7:] for d in  os.listdir(PARQUET_PATH) if 'PartitionDate' in d]
options_qoute = {}
for i,partdate in enumerate(PartitionDate) :  
    df = pd.read_parquet(PARQUET_PATH,engine='pyarrow'
                                 , filters=[('PartitionDate', '=', partdate)]
                                )
    ####################################################
    keys = df[unique_keys].sort_values(by=unique_keys).drop_duplicates()
    #loop each keys
    for j,v in keys.iterrows():
        
        df_filter=df[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) & (df['SYMBOL'] == v['SYMBOL']) & (df['EXPIRE_DATE'] == v['EXPIRE_DATE']) ]
        qoute = "".join(v[ ['SYMBOL','EXPIRE_DATE'] ].apply(str).values)
        #add qoute
        if qoute not in [*options_qoute.keys()]:
            options_qoute[qoute] = {}
            options_qoute[qoute]['start_price'] = df_filter['UNDERLYING_LAST'].values[0]
            options_qoute[qoute]['strike'] = df_filter[ df_filter['INTRINSIC_VALUE'].abs().isin(df_filter['INTRINSIC_VALUE'].abs().sort_values()[:max_option_len]) ]['STRIKE'].values
            options_qoute[qoute]['exp'] = df_filter['EXPIRE_DATE'].values[0]
            #check diff UNDERLYING_LAST
            if df_filter['UNDERLYING_LAST'].values[0] != round(np.average(df_filter['UNDERLYING_LAST']),4):
                print('[ERROR] : set UNDERLYING_LAST ',qoute )
        #rm index max : max_option_len
        rm_strike_index = df_filter[ ~df_filter['STRIKE'].isin(options_qoute[qoute]['strike']) ].index
        df = df.drop(rm_strike_index)

        # Generate zero strike 
        # Generate a random float between 0.01 and 1
        random_number = random.uniform(0, 1)
        [ strikeZero(df,v) for r in [0.05,0.1,0.15,0.02] if random_number < r]:
            
        #TOTAL_P_VOLUME
        df.loc[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) 
        & (df['SYMBOL'] == v['SYMBOL']) 
        & (df['EXPIRE_DATE'] == v['EXPIRE_DATE']) 
        , 'TOTAL_P_VOLUME'] = df_filter['P_VOLUME'].sum()
    
        #TOTAL_P_VOLUME
        df.loc[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) 
        & (df['SYMBOL'] == v['SYMBOL']) 
        & (df['EXPIRE_DATE'] == v['EXPIRE_DATE']) 
        , 'TOTAL_C_VOLUME'] = df_filter['C_VOLUME'].sum()

    #rm options_qoute
    for qi in list(options_qoute.keys()):
        if options_qoute[qi]['exp'] < df['QUOTE_DATE'].values[0]:
            options_qoute.pop(qi)

    #filled na
    df['P_VOLUME'] = df['P_VOLUME'].fillna(0)
    df['C_VOLUME'] = df['C_VOLUME'].fillna(0)
    ###################################################

    # scaler.partial_fit
    scaler.partial_fit(df[SCALER_COL])
    print(f"[Processing] {round(((i+1)/len(PartitionDate))*100,2)}%   ",end='\r')
    
    # save
    if os.path.exists(PARQUET_STG_PATH):
      df.to_parquet(PARQUET_STG_PATH, engine='fastparquet', append=True, partition_cols=['PartitionDate'], index=False )
    else:
      df.to_parquet(PARQUET_STG_PATH, engine='fastparquet' , partition_cols=['PartitionDate'], index=False  )
        
joblib.dump(scaler, SCALER_PATH )

In [None]:
df[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) & (df['SYMBOL'] == v['SYMBOL']) & (df['EXPIRE_DATE'] == v['EXPIRE_DATE']) ]

In [None]:
df

In [None]:
options_qoute = {}
#for partitions
for i,v in keys.iterrows():
    
    df_filter=df[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) & (df['SYMBOL'] == v['SYMBOL']) & (df['EXPIRE_DATE'] == v['EXPIRE_DATE']) ]
    qoute = "".join(v[ ['SYMBOL','EXPIRE_DATE'] ].apply(str).values)
    
    if qoute not in [*options_qoute.keys()]:
        options_qoute[qoute] = {}
        options_qoute[qoute]['start_price'] = df_filter['UNDERLYING_LAST'].values[0]
        options_qoute[qoute]['strike'] = df_filter[ df_filter['INTRINSIC_VALUE'].abs().isin(df_filter['INTRINSIC_VALUE'].abs().sort_values()[:20]) ]['STRIKE'].values
        options_qoute[qoute]['exp'] = df_filter['EXPIRE_DATE'].values[0]
        #check diff UNDERLYING_LAST
        if df_filter['UNDERLYING_LAST'].values[0] != round(np.average(df_filter['UNDERLYING_LAST']),4):
            print('[ERROR] : set UNDERLYING_LAST ',qoute )
            
    rm_strike_index = df_filter[ ~df_filter['STRIKE'].isin(options_qoute[qoute]['strike']) ].index
    df = df.drop(rm_strike_index)
    # Generate zero strike 
    # Generate a random float between 0.01 and 1
    random_number = random.uniform(0, 1)
    if random_number < 0.05 :
        df.loc[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) 
                & (df['SYMBOL'] == v['SYMBOL']) 
                & (df['EXPIRE_DATE'] == v['EXPIRE_DATE']
                & (  df['STRIKE'].max() | df['STRIKE'].min()
                  ) 
            , ['DTE', 'INTRINSIC_VALUE', 'C_BID',	'C_ASK', 'C_VOLUME',  'P_BID',	'P_ASK',	'P_VOLUME']       
        ]
    if random_number < 0.1:
        df.loc[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) 
                & (df['SYMBOL'] == v['SYMBOL']) 
                & (df['EXPIRE_DATE'] == v['EXPIRE_DATE']
                & (  df['STRIKE'].max() | df['STRIKE'].min()
                  ) 
            , ['DTE', 'INTRINSIC_VALUE', 'C_BID',	'C_ASK', 'C_VOLUME',  'P_BID',	'P_ASK',	'P_VOLUME']       
        ]
    if random_number < 0.15:
        df.loc[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) 
                & (df['SYMBOL'] == v['SYMBOL']) 
                & (df['EXPIRE_DATE'] == v['EXPIRE_DATE']
                & (  df['STRIKE'].max() | df['STRIKE'].min()
                  ) 
            , ['DTE', 'INTRINSIC_VALUE', 'C_BID',	'C_ASK', 'C_VOLUME',  'P_BID',	'P_ASK',	'P_VOLUME']       
        ]
    if random_number < 0.2:
        df.loc[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) 
                & (df['SYMBOL'] == v['SYMBOL']) 
                & (df['EXPIRE_DATE'] == v['EXPIRE_DATE']
                & (  df['STRIKE'].max() | df['STRIKE'].min()
                  ) 
            , ['DTE', 'INTRINSIC_VALUE', 'C_BID',	'C_ASK', 'C_VOLUME',  'P_BID',	'P_ASK',	'P_VOLUME']       
        ]
        
    #TOTAL_P_VOLUME
    df.loc[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) 
    & (df['SYMBOL'] == v['SYMBOL']) 
    & (df['EXPIRE_DATE'] == v['EXPIRE_DATE']) 
    , 'TOTAL_P_VOLUME'] = df_filter['P_VOLUME'].sum()

    #TOTAL_P_VOLUME
    df.loc[ (df['QUOTE_DATE'] == v['QUOTE_DATE']) 
    & (df['SYMBOL'] == v['SYMBOL']) 
    & (df['EXPIRE_DATE'] == v['EXPIRE_DATE']) 
    , 'TOTAL_P_VOLUME'] = df_filter['C_VOLUME'].sum()
    if i == 1 :
        break
        
#for partitions
for qi in options_qoute.keys():
    if options_qoute[qi]['exp'] < df['QUOTE_DATE'].values[0]:
        options_qoute.pop(qi)


In [None]:
rm_strike_index

In [None]:
np.abs()[:20]

In [None]:
df_filter[ ~df_filter['STRIKE'].isin(X) ].index

In [None]:
rm_index = df_filter[ np.abs(df_filter['INTRINSIC_VALUE']) != np.abs(df_filter['INTRINSIC_VALUE'])[:20]]

In [None]:
df_filter.columns

In [None]:
max(df_filter['INTRINSIC_VALUE'])

In [None]:
df_filter[ np.abs(df_filter['INTRINSIC_VALUE']) == np.abs(df_filter['INTRINSIC_VALUE'])[:20]]

In [None]:

df_filter[ np.abs(df_filter['INTRINSIC_VALUE']) == np.abs(df_filter['INTRINSIC_VALUE'])[:20]]['STRIKE']

In [None]:
np.abs( df_filter['INTRINSIC_VALUE'] )

In [None]:
df_filter['UNDERLYING_LAST'].values[0] == round(np.average(df_filter['UNDERLYING_LAST']),4)

In [None]:
#=======================================================================================

In [None]:
#example transform(norm)
#load scaler
scaler = MinMaxScaler()
PartitionDate = [ d[-7:] for d in  os.listdir(PARQUET_PATH) if 'PartitionDate' in d]
for i,partdate in enumerate(PartitionDate) :
    df = pd.read_parquet(PARQUET_PATH,engine='pyarrow'
                                 , filters=[('PartitionDate', '=', partdate)]
                                )
    df['P_VOLUME'] = df['P_VOLUME'].fillna(0)
    df['C_VOLUME'] = df['C_VOLUME'].fillna(0)
    break
scaler = joblib.load(SCALER_PATH)
scaler.transform(df[SCALER_COL])

In [None]:
df.head()

In [None]:
df[SCALER_COL]

In [None]:
!apt-get update
!apt-get install -y openjdk-8-jdk

In [30]:


# Generate a random float between 0.01 and 1
random_number = random.uniform(0, 1)

In [31]:
random_number

0.9314077016364187