In [3]:
# typical
import os
import re
import pandas as pd
from collections import Counter
import matplotlib.pyplot as plt

# pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, desc
from pyspark.sql.functions import col, lag
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import time

In [4]:
spark = (SparkSession
     .builder
     .master('local[*]')
     .getOrCreate())

Note: Check the `lab2.py` file for proof of concept.

# Executive Summary

# Introduction

# EDA

**Loading the File**

In [5]:
directory = '/mnt/data/public/binance-full-history'
binance = spark.read.parquet(directory)

Just to have a look at all the column names and see what columns are to be referenced when all the parquet files will be combined into a single dataframe.

In [6]:
binance.printSchema()

root
 |-- open: float (nullable = true)
 |-- high: float (nullable = true)
 |-- low: float (nullable = true)
 |-- close: float (nullable = true)
 |-- volume: float (nullable = true)
 |-- quote_asset_volume: float (nullable = true)
 |-- number_of_trades: integer (nullable = true)
 |-- taker_buy_base_asset_volume: float (nullable = true)
 |-- taker_buy_quote_asset_volume: float (nullable = true)
 |-- open_time: timestamp_ntz (nullable = true)



In [7]:
files = os.listdir(directory)

**Filtering the Files According to Coin Classification**

First, all the files were classified according to either three coin types: cyrptocurrency, stablecoin, and fiat-backed. Something to note is that filenames such as `BTC-USDT.parquet` were classified under the *base asset*, which refers to the first coin in the title (BTC), and the second coin (USDT) means that it's being expressed in terms of that pricing. So, the given example would be classified as a cryptocurrency.

In [8]:
# Base Classifications (cryptocurrencies, stablecoins, and fiats)
cryptos = [
    '1INCH', 'AAVE', 'ACM', 'ADA', 'ADADOWN', 'ADAUP', 'ADX', 'AE', 'AERGO', 'AGI',
    'AION', 'AKRO', 'ALGO', 'ALICE', 'ALPHA', 'AMB', 'ANKR', 'ANT', 'APPC', 'AR',
    'ARDR', 'ARK', 'ARN', 'ARPA', 'ASR', 'AST', 'ATA', 'ATM', 'ATOM', 'AUCTION',
    'AUDIO', 'BCH', 'BEAM', 'BEL', 'BETA', 'BETH', 'BIFI', 'BLZ', 'BNB', 'BNBDOWN',
    'BNBUP', 'BNT', 'BQX', 'BRD', 'BTC', 'BTCDOWN', 'BTCST', 'BTCUP', 'BTG', 'BTS',
    'BTT', 'BURGER', 'BZRX', 'C98', 'CAKE', 'CTK', 'CTSI', 'CTXC', 'CVC', 'CVP',
    'DAR', 'DASH', 'DATA', 'DCR', 'DEGO', 'DENT', 'DEXE', 'DF', 'DGB', 'DGD', 'DIA',
    'DLT', 'DNT', 'DOCK', 'DODO', 'DOGE', 'DOT', 'DOTDOWN', 'DOTUP', 'DREP', 'DUSK',
    'DYDX', 'EDO', 'EGLD', 'ELF', 'ENG', 'ENJ', 'EOS', 'ETC', 'ETH', 'FET', 'FIL',
    'FIO', 'FIRO', 'FIS', 'FLM', 'FLOW', 'FOR', 'FORTH', 'FRONT', 'FTM', 'FTT',
    'FUEL', 'FUN', 'FXS', 'GALA', 'GAS', 'GHST', 'GLM', 'GNT', 'GO', 'GRS', 'GRT',
    'GTC', 'GTO', 'GVT', 'GXS', 'HARD', 'HBAR', 'HC', 'HIVE', 'HNT', 'HOT', 'ICP',
    'ICX', 'IOTA', 'KSM', 'LAZIO', 'LEND', 'LINA', 'LINK', 'LINKDOWN', 'LINKUP',
    'LIT', 'LOOM', 'LPT', 'LRC', 'LSK', 'LTC', 'LTCDOWN', 'LTCUP', 'LTO', 'LUN',
    'LUNA', 'MANA', 'MASK', 'MATIC', 'MBL', 'MBOX', 'MCO', 'MDA', 'MDT', 'MDX',
    'MFT', 'MINA', 'MIR', 'MITH', 'NEO', 'OCEAN', 'OG', 'OGN', 'OM', 'OMG', 'ONE',
    'ONG', 'ONT', 'ORN', 'OST', 'OXT', 'PAXG', 'PERL', 'PERP', 'PHA', 'PHB', 'PIVX',
    'PNT', 'POA', 'POE', 'POLS', 'POLY', 'POND', 'POWR', 'PPT', 'PROM', 'PROS',
    'PSG', 'PUNDIX', 'QKC', 'QLC', 'QNT', 'QSP', 'QTUM', 'RAMP', 'RCN', 'RDN',
    'REEF', 'REN', 'REP', 'REQ', 'SNGLS', 'SNM', 'SNT', 'SNX', 'SOL', 'SPARTA',
    'SRM', 'STEEM', 'STMX', 'STORJ', 'STORM', 'STPT', 'STRAT', 'STRAX', 'STX',
    'SUN', 'SUPER', 'SUSHI', 'SUSHIDOWN', 'SUSHIUP', 'SXP', 'SXPUP', 'SYS', 'TCT',
    'TFUEL', 'THETA', 'TKO', 'TLM', 'TNB', 'TNT', 'TOMO', 'TORN', 'TRB', 'TROY',
    'TRX', 'VET', 'VIA', 'VIB', 'VIBE', 'VIDT', 'VITE', 'VTHO', 'WABI', 'WAN',
    'WAVES', 'WBTC', 'WIN', 'WING', 'WNXM', 'WPR', 'WRX', 'WTC', 'XEC', 'XEM',
    'XLM', 'XMR', 'XRP', 'XRPDOWN', 'XRPUP', 'XTZ']
stablecoins = ['USDT','USDC','BUSD','TUSD','DAI','PAX','BIDR','IDRT']
fiats = ['EUR','GBP','AUD','TRY','BRL','RUB','NGN','UAH']

classified = {
    "cryptocurrency": [],
    "stablecoin": [],
    "fiat_backed": []
}

for f in files:
    pair = f.replace('.parquet','')
    base, quote = pair.split('-')
    
    if base in cryptos:
        classified["cryptocurrency"].append(f)
    elif base in stablecoins:
        classified["stablecoin"].append(f)
    elif base in fiats:
        classified["fiat_backed"].append(f)
    else:
        # unlisted base asset
        classified["cryptocurrency"].append(f)

In [9]:
classification = {}

for category, file_list in classified.items():
    for f in file_list:
        pair = f.replace(".parquet", "")
        classification[pair] = category

After getting the coin classifications of the files, all the parquet files were combined into a single dataframe and was saved into a CSV & parquet copy after execution.

In [None]:
spark = SparkSession.builder.getOrCreate()

# set shuffle partitions BEFORE the transformations / writes
spark.conf.set("spark.sql.shuffle.partitions", "200")  # or 300, 400, etc.

# Add stock column from filename
binance_with_stock = binance.withColumn(
    "stock",
    F.regexp_extract(F.input_file_name(), r'([^/]+)\.parquet$', 1)
)

# Build proper classification DataFrame
class_rows = [
    (pair, category)
    for pair, category in classification.items()
]

class_df = spark.createDataFrame(class_rows, ["stock", "classification"])

# Window per token
w = Window.partitionBy("stock").orderBy("open_time")

final_spark_df = (
    binance_with_stock
    .join(class_df, on="stock", how="left")

    # split stock into base + quote
    .withColumn("base_currency", F.split(F.col("stock"), "-")[0])
    .withColumn("quote_currency", F.split(F.col("stock"), "-")[1])

    .withColumn("prev_close", F.lag("close").over(w))

    # returns
    .withColumn("return", (F.col("close") - F.col("prev_close")) / F.col("prev_close"))
    .withColumn("abs_return", F.abs(F.col("return")))

    # log returns
    .withColumn("log_return", F.log(F.col("close") / F.col("prev_close")))
    .withColumn("abs_log_return", F.abs(F.col("log_return")))

    .dropna(subset=["return", "log_return"])

    .select(
        "base_currency",
        "quote_currency",
        "classification",
        "open",
        "high",
        "low",
        "close",
        "volume",
        "quote_asset_volume",
        "number_of_trades",
        "taker_buy_base_asset_volume",
        "taker_buy_quote_asset_volume",
        "open_time",
        "return",
        "abs_return",
        "log_return",
        "abs_log_return"
    )
)

# sanity check
final_spark_df.show(5)

# add the year column once
final_with_year = final_spark_df.withColumn("year", F.year("open_time"))

# see what years you have
(
    final_with_year
    .select("year")
    .distinct()
    .orderBy("year")
    .show()
)

# single write, partitioned by year
(
    final_with_year
    .repartition("year")          # helps spread data by year
    .write
    .mode("overwrite")            # overwrite the whole dataset
    .partitionBy("year")          # creates year=2017, year=2018, ...
    .parquet("final_output_by_year")
)

print("Done.")