In [None]:
import gcsfs
import google.auth
import modin.config as cfg
import modin.pandas as pd
from distributed import Client
from modin.config import ProgressBar

cfg.Engine.put("dask")
ProgressBar.enable()

import os

import wandb
from tqdm.auto import tqdm


In [None]:
EXCHANGE = "cboe"  # "ise"
STRATEGY = "supervised"  # "unsupervised"

# file_path_input = "gs://thesis-bucket-option-trade-classification/data/raw/livevol_ise_full_sample.csv"
FILE_PATH_INPUT = (
    "gs://thesis-bucket-option-trade-classification/data/raw/matched_cboe_quotes.csv"
)
FILE_PATH_OUTPUT = "gs://thesis-bucket-option-trade-classification/data/preprocessed/"


In [None]:
os.environ["GCLOUD_PROJECT"] = "flowing-mantis-239216"
credentials, _ = google.auth.default()
fs = gcsfs.GCSFileSystem(project="thesis", token=credentials)


In [None]:
# connect to weights and biases
run = wandb.init(project="thesis", job_type="dataset-creation", entity="fbv")
dataset = wandb.Artifact(name=f"{EXCHANGE}_{STRATEGY}_csv", type="raw_data")


In [None]:
def import_data(input_file: str) -> pd.DataFrame:
    """
    create a dataframe and optimize its memory usage.

    I. e., apply some optimizations i.e, manual inference of dtypes, pre-selection
    of unique columns and chunking to enable import.

    Adapted from here:
    https://pandas.pydata.org/pandas-docs/stable/user_guide/scale.html

    Args:
        input_file (str): input file.

    Returns:
        pd.DataFrame: dataframe.
    """
    dfs = []
    chunksize = 10**6

    # ISE / CBOE:  QUOTE_DATETIME,ROOT,EXPIRATION,STRK_PRC,OPTION_TYPE,TRADE_SIZE,TRADE_PRICE,BEST_BID,BEST_ASK,order_id,ask_ex,bid_ex,bid_size_ex,ask_size_ex,date,price_all_lead,price_all_lag,optionid,day_vol,price_ex_lead,price_ex_lag,buy_sell,secid_OM,issue_type,myn
    # ISE unmatched: UNDERLYING_SYMBOL,QUOTE_DATETIME,SEQUENCE_NUMBER,ROOT,EXPIRATION,STRK_PRC,OPTION_TYPE,EXCHANGE,TRADE_SIZE,TRADE_PRICE,CANCELED_TRADE_CONDITION_ID,BEST_BID,BEST_ASK,order_id,ask_ex,bid_ex,bid_size_ex,ask_size_ex,date,price_all_lead,price_all_lag,optionid,secid,issue_type,price_ex_lead,price_ex_lag,myn

    # do not load 'CANCELED_TRADE_CONDITION_ID' and 'EXCHANGE'
    # make ints nullable, required for CBOE
    # https://pandas.pydata.org/docs/user_guide/basics.html#basics-dtypes
    usecols = [
        "QUOTE_DATETIME",
        "ROOT",
        "EXPIRATION",
        "STRK_PRC",
        "OPTION_TYPE",
        "TRADE_SIZE",
        "TRADE_PRICE",
        "BEST_BID",
        "BEST_ASK",
        "order_id",
        "ask_ex",
        "bid_ex",
        "bid_size_ex",
        "ask_size_ex",
        "date",
        "price_all_lead",
        "price_all_lag",
        "optionid",
        "day_vol",
        "price_ex_lead",
        "price_ex_lag",
        "buy_sell",
        "issue_type",
        "myn",
        "secid_OM",
        # "secid",
    ]

    dtypes = {
        "QUOTE_DATETIME": "object",
        "ROOT": "category",
        "EXPIRATION": "object",
        "STRK_PRC": "float32",
        "OPTION_TYPE": "category",
        "EXCHANGE": "Int64",
        "TRADE_SIZE": "Int64",
        "TRADE_PRICE": "float32",
        "CANCELED_TRADE_CONDITION_ID": "Int64",
        "BEST_BID": "float32",
        "BEST_ASK": "float32",
        "order_id": "Int64",
        "ask_ex": "float32",
        "bid_ex": "float32",
        "bid_size_ex": "float32",
        "ask_size_ex": "float32",
        "date": "object",
        "price_all_lead": "float32",
        "price_all_lag": "float32",
        "optionid": "float32",
        "day_vol": "float32",
        "price_ex_lead": "float32",
        "price_ex_lag": "float32",
        "buy_sell": "Int8",
        "issue_type": "category",
        "myn": "float32",
        "secid_OM": "float32",
        # "secid": "float32",
    }

    # log raw file in w & b
    dataset.add_reference(input_file, name="raw_csv")

    with pd.read_csv(
        input_file, chunksize=chunksize, usecols=usecols, dtype=dtypes
    ) as reader:
        for chunk in reader:
            dfs.append(chunk)

    df = pd.concat(dfs, axis=0)

    format = "%d%b%Y"
    df["EXPIRATION"] = pd.to_datetime(df["EXPIRATION"], format=format)
    df["date"] = pd.to_datetime(df["date"], format=format)

    format = "%d%b%y:%H:%M:%S"
    df["QUOTE_DATETIME"] = pd.to_datetime(df["QUOTE_DATETIME"], format=format)
    return df


In [None]:
def df_to_parquet(
    df: pd.DataFrame, target_dir: str, chunk_size: int = 1000000, **parquet_wargs
) -> None:
    """
    Write pd.DataFrame to parquet format.

    Args:
        df (pd.DataFrame): input dataframe.
        target_dir (str): local directory where parquet files are written to.
        chunk_size (int, optional): number of rows stored in one chunk of parquet file.
        Defaults to 1000000.
    """
    for i in tqdm(range(0, len(df), chunk_size)):
        slc = df.iloc[i : i + chunk_size]
        chunk = int(i / chunk_size)
        output_path = (
            target_dir
            + f"{'matched' if STRATEGY == 'supervised' else 'unmatched'}_{EXCHANGE}_quotes_min_mem_usage_extended_part_{chunk:04d}.parquet"
        )
        slc.to_parquet(output_path, **parquet_wargs)

        # log in w & b
        dataset.add_reference(output_path, name=f"raw_parquet_{chunk:04d}")


In [None]:
# start dask client
client = Client()

df = import_data(FILE_PATH_INPUT)
df_to_parquet(df, FILE_PATH_OUTPUT)


In [None]:
# Log the artifact to save it as an output of this run
run.log_artifact(dataset)
wandb.finish()
