In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pandas as pd
# Minio
from minio import Minio
from io import BytesIO
from minio.error import S3Error
# Utility
import json, sys, os, datetime
from gecko_transform import *
from create_date_dim import create_date_dim


In [18]:
def connect_to_minio():
    access_key = 'vp821YRUCKiTgGoMEjR6'
    secret_key = 'TkrExNxV2ozU3l9O59nk561fCSONQZSWvrZKoOpK'
    minio_client = Minio(
        endpoint='localhost:9000',  # Use the HTTP port
        access_key=access_key,
        secret_key=secret_key,
        secure=False  # Ensure this is False for HTTP
    )
    return minio_client

def create_spark_session():
    # Create a SparkSession
    spark = SparkSession.builder \
        .appName("Spark with Minio") \
        .config("spark.log.level", "WARN") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1") \
        .config("spark.hadoop.fs.s3a.access.key", "my_account") \
        .config("spark.hadoop.fs.s3a.secret.key", "123456789") \
        .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("fs.s3a.connection.ssl.enabled", "false") \
        .getOrCreate()

    spark.conf.set('spark.sql.caseSensitive', True)
    print("> Spark session created !")
    return spark

minio_client = connect_to_minio()
spark = create_spark_session()



> Spark session created !


In [3]:

# Define schema
bars_schema = StructType([
    StructField("date", StringType(), True),
    StructField("open_time", LongType(), True),
    StructField("symbol_name", StringType(), True),
    StructField("open", DoubleType(), True),
    StructField("high", DoubleType(), True),
    StructField("low", DoubleType(), True),
    StructField("close", DoubleType(), True),
    StructField("vol", DoubleType(), True),
    StructField("close_time", LongType(), True),
    StructField("quote_asset_vol", DoubleType(), True),
    StructField("num_trades", IntegerType(), True),
    StructField("taker_base_vol", DoubleType(), True),
    StructField("taker_quote_vol", DoubleType(), True),
    StructField("ignore", IntegerType(), True)
])


In [15]:

########################         Read Files from Minio         ########################
# Binance
bars_df = spark.read.csv("s3a://binance-bars/*.csv", header=True, schema=bars_schema)
symbols_df = spark.read.option("multiline", "true").option("header", "true").json("s3a://binance-symbols/*.json")

# Gecko
category_df = spark.read.option("multiline", "true").option("header", "true").json("s3a://gecko-category/category.json")
category_details_df = spark.read.option("multiline", "true").option("header", "true").json("s3a://gecko-category-details/category-details.json")


# Create fact and 2 dims
order_types_df = symbols_df.withColumn("symbols", explode(col("symbols"))) \
            .select("symbols.symbol", "symbols.orderTypes") \
            .withColumn("orderTypes", explode("orderTypes"))

distinct_order_types_df = order_types_df.select("orderTypes").distinct() \
    .withColumn("order_type_id", monotonically_increasing_id())

distinct_symbols_df = order_types_df.select("symbol").distinct() \
    .withColumn("symbol_id", monotonically_increasing_id())

# join and replace with key
order_types_df = order_types_df.join(distinct_symbols_df, order_types_df.symbol == distinct_symbols_df.symbol , how = "left") \
                            .join(distinct_order_types_df, order_types_df.orderTypes == distinct_order_types_df.orderTypes) \
                            .select("symbol_id", "order_type_id")

### Create date_dim
date_dim_df = spark.createDataFrame(create_date_dim("2024-01-01", "2024-12-31"))


In [16]:

########################            Transform          ####################
category_df = flatten_category(category_df, spark)
category_details_df = flatten_category_details(category_details_df, spark)

symbols_df = symbols_df.withColumn("symbols", explode(col("symbols"))) \
                        .select(
                            col("serverTime"),
                            col("symbols.symbol"),
                            col("symbols.status"),
                            col("symbols.baseAsset")
                        )

symbols_df = symbols_df.join(category_details_df, symbols_df.baseAsset == category_details_df.symbol) \
            .withColumnRenamed("category_name", "categoryName") \
            .drop(category_details_df["name"], category_details_df["url"], category_details_df["symbol"])




In [22]:
symbols_df.count()

1218

In [19]:
# Create bucket if not exists
spark_bucket = "spark-output"
if not minio_client.bucket_exists(spark_bucket):
    minio_client.make_bucket(spark_bucket)

In [None]:

################################       Write to Minio        #################
# binance
bars_df.write \
        .format("csv") \
        .option("header", "true") \
        .mode("overwrite") \
        .save(f"s3a://{spark_bucket}/binance.bars")
print("Write successuflly binance.bars ")

symbols_df.write \
        .format("csv") \
        .option("header", "true") \
        .mode("overwrite") \
        .save(f"s3a://{spark_bucket}/binance.symbols")
print("Write successuflly binance.symbols ")

# gecko
category_df.write \
        .format("csv") \
        .option("header", "true") \
        .mode("overwrite") \
        .save(f"s3a://{spark_bucket}/gecko.category_overview")
print("Write successuflly gecko.category_overview")

# fact and 2 dims
order_types_df.write \
        .format("csv") \
        .option("header", "true") \
        .mode("overwrite") \
        .save(f"s3a://{spark_bucket}/binance.order_types_fact")
print("Write successuflly binance.order_types_fact !")

distinct_order_types_df.write \
        .format("csv") \
        .option("header", "true") \
        .mode("overwrite") \
        .save(f"s3a://{spark_bucket}/binance.order_types_dim")
print("Write successuflly binance.order_types_dim !")

distinct_symbols_df.write \
        .format("csv") \
        .option("header", "true") \
        .mode("overwrite") \
        .save(f"s3a://{spark_bucket}/binance.symbols_dim")
print("Write successuflly binance.symbols_dim !")

date_dim_df.write \
        .format("csv") \
        .option("header", "true") \
        .mode("overwrite") \
        .save(f"s3a://{spark_bucket}/binance.date_dim")
print("Write successuflly binance.date_dim !")

# Finally
print(">> Spark run successfully! << ")

