In [1]:
import datetime
from dotenv import load_dotenv
import logging
import os
import sys
import findspark
import pytz
import schedule
import time
import traceback
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, LongType, DoubleType, IntegerType, MapType
from pyspark.sql.functions import col, when, to_json, struct, lit, udf, collect_list, round as round_, max as max_, min as min_, sum as sum_, first, last, from_unixtime
from google.cloud import storage

from config.config import config
from data_processing import DataProcessor
from slack_package import SlackChannel, get_slack_decorators
from utils.gcs_module import gcsModule
from config import BUCKET_NAME
from sparkManager.spark_config import SparkManager

2024-08-08 16:19:16,695 - INFO - Initialized Google Cloud Storage client with key file ./key.json
2024-08-08 16:19:17,435 - INFO - Bucket 'production-trustia-raw-data' is accessible. Listing contents:
2024-08-08 16:19:17,437 - INFO - - Raw/Binance/BTCUSDT/aggTrades/2024/08/07/1723006440000.parquet
2024-08-08 16:19:17,438 - INFO - - Raw/Binance/BTCUSDT/aggTrades/2024/08/07/1723006500000.parquet
2024-08-08 16:19:17,439 - INFO - - Raw/Binance/BTCUSDT/aggTrades/2024/08/07/1723006560000.parquet
2024-08-08 16:19:17,440 - INFO - - Raw/Binance/BTCUSDT/aggTrades/2024/08/07/1723006620000.parquet
2024-08-08 16:19:17,440 - INFO - - Raw/Binance/BTCUSDT/aggTrades/2024/08/07/1723006680000.parquet
2024-08-08 16:19:17,441 - INFO - - Raw/Binance/BTCUSDT/aggTrades/2024/08/07/1723006740000.parquet
2024-08-08 16:19:17,442 - INFO - - Raw/Binance/BTCUSDT/aggTrades/2024/08/07/1723006800000.parquet
2024-08-08 16:19:17,443 - INFO - - Raw/Binance/BTCUSDT/aggTrades/2024/08/07/1723006860000.parquet
2024-08-08 16:1

In [2]:
# Initialize findspark
findspark.init()

# Load environment variables from .env file
load_dotenv()

SLACK_WEBHOOK_URL = config.SLACK_WEBHOOK_URL
PROCESSED_FILES_LOG = config.PROCESSED_FILES_LOG

gcs_module = gcsModule(bucket_name=BUCKET_NAME)
slack_decorators = get_slack_decorators()
slack_channel = SlackChannel(SLACK_WEBHOOK_URL)
data_processor = DataProcessor()
spark_manager = SparkManager(app_name="AggTrades_Transformation")

# Initialize and start the Spark session
spark = spark_manager.get_spark_session()

2024-08-08 16:19:30,517 - INFO - Initialized GCS client successfully


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c4ee90bd-b5d6-4779-97f8-c55c7a8b921e;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
:: resolution report :: resolve 281ms :: artifacts dl 9ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	----------------------------------------------

In [3]:
import datetime

def get_files_for_timeframe(symbol, date, timeframe):
    current_time = datetime.datetime.utcnow()
    if timeframe == "5m":
        file_count = 5
    elif timeframe == "15m":
        file_count = 15
    elif timeframe == "1h":
        file_count = 60
    else:
        raise ValueError(f"Unsupported timeframe: {timeframe}")

    # Adjusted path based on your file structure
    prefix = f"Raw/Binance/{symbol}/aggTrades/{date[:4]}/{date[5:7]}/{date[8:10]}/"
    
    files = gcs_module.get_gcs_files(prefix=prefix)
    
    # Sort the files by timestamp
    files_sorted = sorted(files, key=lambda x: x.split("/")[-1][:20], reverse=True)
    
    # Select the latest `file_count` files
    relevant_files = files_sorted[:file_count]

    # Create full GCS paths
    relevant_file_paths = [f"gs://{gcs_module.bucket_name}/{file}" for file in relevant_files]
    
    return relevant_file_paths


In [4]:
# Exemple d'utilisation
symbol = "BTCUSDT"
today = datetime.datetime.utcnow().strftime("%Y-%m-%d")
timeframe = "1h"
files = get_files_for_timeframe(symbol, today, timeframe)
print(files)

2024-08-08 16:19:39,744 - INFO - Retrieved 842 files from bucket 'production-trustia-raw-data' with prefix 'Raw/Binance/BTCUSDT/aggTrades/2024/08/08/'


['gs://production-trustia-raw-data/Raw/Binance/BTCUSDT/aggTrades/2024/08/08/1723126680000.parquet', 'gs://production-trustia-raw-data/Raw/Binance/BTCUSDT/aggTrades/2024/08/08/1723126620000.parquet', 'gs://production-trustia-raw-data/Raw/Binance/BTCUSDT/aggTrades/2024/08/08/1723126560000.parquet', 'gs://production-trustia-raw-data/Raw/Binance/BTCUSDT/aggTrades/2024/08/08/1723126500000.parquet', 'gs://production-trustia-raw-data/Raw/Binance/BTCUSDT/aggTrades/2024/08/08/1723126440000.parquet', 'gs://production-trustia-raw-data/Raw/Binance/BTCUSDT/aggTrades/2024/08/08/1723126380000.parquet', 'gs://production-trustia-raw-data/Raw/Binance/BTCUSDT/aggTrades/2024/08/08/1723126320000.parquet', 'gs://production-trustia-raw-data/Raw/Binance/BTCUSDT/aggTrades/2024/08/08/1723126260000.parquet', 'gs://production-trustia-raw-data/Raw/Binance/BTCUSDT/aggTrades/2024/08/08/1723126200000.parquet', 'gs://production-trustia-raw-data/Raw/Binance/BTCUSDT/aggTrades/2024/08/08/1723126140000.parquet', 'gs://pro

In [8]:
from pyspark.sql import SparkSession, DataFrame
from typing import List
import logging

# Initialize Spark session
spark = SparkSession.builder.appName("GCS_Parquet_Aggregation").getOrCreate()

def read_and_aggregate_files(files: List[str]) -> DataFrame:
    df_list = []
    
    # Trier les fichiers par ordre décroissant
    files_sorted = sorted(files, reverse=True)

    for file in files_sorted:
        try:
            df = spark.read.parquet(file)
            
            # Sélectionner uniquement les colonnes nécessaires
            required_columns = ["price", "quantity", "timestamp", "is_buyer_maker"]
            df_columns = df.columns
            
            # Ajouter les colonnes manquantes avec des valeurs nulles
            for col_name in required_columns:
                if col_name not in df_columns:
                    df = df.withColumn(col_name, lit(None))
            
            # Sélectionner seulement les colonnes nécessaires
            df = df.select(required_columns)
            df_list.append(df)
        except Exception as e:
            logging.warning(f"File not found or cannot be read: {file}. Skipping. Error: {e}")

    if not df_list:
        raise FileNotFoundError("No valid files to read and aggregate.")

    df = df_list[0]
    for temp_df in df_list[1:]:
        df = df.union(temp_df)

    return df

In [43]:
df = read_and_aggregate_files(files)

# Convert timestamp from milliseconds to seconds and then to a readable format
df = df.withColumn("readable_timestamp", from_unixtime(col("timestamp") / 1000))

# Convert timestamp column to integer type for proper sorting
df = df.withColumn("timestamp_int", col("timestamp").cast("long"))

# Sort the DataFrame by timestamp
df = df.orderBy(col("timestamp_int"))


In [44]:
# Rename 'timestamp' to 'time'
df = df.withColumnRenamed("timestamp", "time")

In [45]:
def process_foot_data(df, column_name, prefix, symbol):
    # Define the schema with StringType for all fields initially
    json_schema = StructType(
        [
            StructField("price_level", StringType(), True),
            StructField(f"{prefix}_qty", StringType(), True),
            StructField(f"{prefix}_trades", StringType(), True),
            StructField(f"{prefix}_trades_aggr", StringType(), True),
        ]
    )

    # Convert the array to a single JSON string
    df_with_foot = df.withColumn(
        f"{column_name}_str", F.concat_ws("", F.col(column_name))
    )

    df_with_foot = df_with_foot.withColumn(
        f"{column_name}_str",
        F.regexp_replace(f"{column_name}_str", r"\}\s*\{", "},{"),
    )

    # Convert the concatenated string to an array of JSON strings
    df_with_foot = df_with_foot.withColumn(
        f"{column_name}_json_array", F.split(f"{column_name}_str", r",\{")
    )
    df_with_foot = df_with_foot.withColumn(
        f"{column_name}_json_array",
        F.expr(f"TRANSFORM({column_name}_json_array, x -> concat('{{', x))"),
    )

    df_with_foot = df_with_foot.withColumn(
        f"{column_name}_json", F.explode(f"{column_name}_json_array")
    )

    # Replace any occurrences of "{{" with "{" and "}}" with "}"
    df_with_foot = df_with_foot.withColumn(
        f"{column_name}_json",
        F.regexp_replace(F.col(f"{column_name}_json"), r"\{\{", "{"),
    )

    df_with_foot = df_with_foot.withColumn(
        f"{column_name}_json",
        F.regexp_replace(F.col(f"{column_name}_json"), r"\}\}", "}"),
    )

    # Parse JSON and select the necessary fields
    df = df_with_foot.select(
        F.col("time_rounded"),
        F.from_json(F.col(f"{column_name}_json"), json_schema).alias("data"),
    ).select(
        "time_rounded",
        F.col("data.price_level").cast("double").alias("price_level"),
        F.col(f"data.{prefix}_qty").cast("double").alias("qty"),
        F.col(f"data.{prefix}_trades").cast("integer").alias("trades"),
        F.col(f"data.{prefix}_trades_aggr").cast("integer").alias("trades_aggr"),
    )

    # Conditionally round price_level based on symbol
    if symbol == "BTCUSDT":
        df = df.withColumn("price_level", F.round(F.col("price_level")))
    elif symbol == "ETHUSDT":
        df = df.withColumn("price_level", F.round(F.col("price_level")))
    elif symbol == "SOLUSDT":
        df = df.withColumn("price_level", F.round(F.col("price_level"), 1))

    # Filter out null values
    df = df.filter(
        F.col("price_level").isNotNull()
        & F.col("qty").isNotNull()
        & F.col("trades").isNotNull()
        & F.col("trades_aggr").isNotNull()
    )

    # Aggregate data
    df_agg = df.groupBy("time_rounded", "price_level").agg(
        F.sum("qty").alias(f"total_{prefix}_qty"),
        F.sum("trades").alias(f"total_{prefix}_trades"),
    )

    # Sort the aggregated DataFrame by time_rounded and price_level
    df_agg_sorted = df_agg.orderBy("time_rounded", "price_level")

    # Aggregate total bid quantity by time_rounded
    df_qty = df_agg_sorted.groupBy("time_rounded").agg(
        F.sum(f"total_{prefix}_qty").alias(f"sum_total_{prefix}_qty")
    )

    return df_agg_sorted, df_qty

def transform_footprint(row):
    try:
        bid_data = row.aggregated_foot_bid
        ask_data = row.aggregated_foot_ask
        aggregated_data = {}

        if bid_data:
            for bid_item in bid_data:
                for price, bid_row in bid_item.items():
                    if price not in aggregated_data:
                        aggregated_data[price] = {
                            "bid_qty": 0,
                            "bid_trades": 0,
                            "ask_qty": 0,
                            "ask_trades": 0,
                        }
                    aggregated_data[price]["bid_qty"] += bid_row["bid_qty"]
                    aggregated_data[price]["bid_trades"] += bid_row["bid_trades"]

        if ask_data:
            for ask_item in ask_data:
                for price, ask_row in ask_item.items():
                    if price not in aggregated_data:
                        aggregated_data[price] = {
                            "bid_qty": 0,
                            "bid_trades": 0,
                            "ask_qty": 0,
                            "ask_trades": 0,
                        }
                    aggregated_data[price]["ask_qty"] += ask_row["ask_qty"]
                    aggregated_data[price]["ask_trades"] += ask_row["ask_trades"]

        logging.info(f"Transformed footprint for row: {row.time_rounded}")
        return aggregated_data

    except KeyError as e:
        logging.error(f"KeyError in row {row.time_rounded}: {e}")
    except Exception as e:
        logging.error(f"Unexpected error in row {row.time_rounded}: {e}")
        raise e

In [46]:
date = datetime.datetime.utcnow()
# Apply the same process as in process_year_data
df = data_processor.calc_df(df, True, timeframe)

# Aggregate bid data
df_bid_agg_sorted, df_bid_total_qty = process_foot_data(df, "foot_bid", "bid", symbol)

# Aggregate ask data
df_ask_agg_sorted, df_ask_total_qty = process_foot_data(df, "foot_ask", "ask", symbol)

# Transform and aggregate bid footprint
df_footprint_bids = data_processor.transform_and_aggregate_footprint(df_bid_agg_sorted, "bid")

# Transform and aggregate ask footprint
df_footprint_asks = data_processor.transform_and_aggregate_footprint(df_ask_agg_sorted, "ask")

# Join bid and ask footprints
df_joined_final = data_processor.join_bid_ask_footprints(
    df,
    df_bid_total_qty,
    df_ask_total_qty,
    df_footprint_bids,
    df_footprint_asks
)

# Define the UDF
footprint_schema = MapType(
    StringType(),
    StructType(
        [
            StructField("bid_qty", DoubleType(), True),
            StructField("bid_trades", DoubleType(), True),
            StructField("ask_qty", DoubleType(), True),
            StructField("ask_trades", DoubleType(), True),
        ]
    ),
)

footprint_udf = F.udf(transform_footprint, footprint_schema)

In [50]:
footprint_udf

<function __main__.transform_footprint(row)>

In [52]:
# Apply footprint transformation
df_joined_final = df_joined_final.withColumn(
    "footprint",
    footprint_udf(
        F.struct(
            df_joined_final.time_rounded,  # Ensure this column exists and is correctly named
            df_joined_final.aggregated_foot_bid,
            df_joined_final.aggregated_foot_ask
        )
    )
)

In [55]:
df_joined_final = df_joined_final.drop(
    "aggregated_foot_bid", "aggregated_foot_ask"
)

df_joined_final = df_joined_final.orderBy("time_rounded")  # Ensure this column exists

for row in df:
    df = row.to_df
    df.to_parquet(func(timestamp))

# Save the final DataFrame
output_path = f"gs://{BUCKET_NAME}/live/{symbol}/{timeframe}/{symbol}_{timeframe}_{date}.parquet"
df_joined_final.write.mode("overwrite").parquet(output_path)

24/08/08 17:03:22 WARN DAGScheduler: Broadcasting large task binary with size 1013.0 KiB
24/08/08 17:03:22 WARN DAGScheduler: Broadcasting large task binary with size 1013.1 KiB
24/08/08 17:03:22 WARN DAGScheduler: Broadcasting large task binary with size 1197.2 KiB
                                                                                

In [56]:
df_joined_final.show(10, truncate=False)

                                                                                

+-------------------+--------+--------+--------+--------+------------------+------------------+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

24/08/08 17:03:50 WARN DAGScheduler: Broadcasting large task binary with size 1011.7 KiB


# TEST