# Raw Trade Data Aggregation
  
  **Environment:** Databricks Runtime
 
  **Purpose:** This notebook reads raw trade data from the S3 `raw/` directory, transforms it into 1-minute OHLCV candles, and writes the processed data back to the `processed/` directory in Parquet format.

In [0]:
# Define the path to your S3 bucket's raw data directory
s3_path = "s3://digital-asset-analytics-data-lake/raw/"

display(dbutils.fs.ls(s3_path))

path,name,size,modificationTime
s3://digital-asset-analytics-data-lake/raw/BTCUSDT-trades-2025-05.csv,BTCUSDT-trades-2025-05.csv,8382487926,1749846581000


In [0]:
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, BooleanType
# Define the schema for the CSV file according to its documentation
trade_schema = StructType([
    StructField("trade_id", LongType(), True),
    StructField("price", DoubleType(), True),
    StructField("qty", DoubleType(), True),
    StructField("quote_qty", DoubleType(), True),
    StructField("time", LongType(), True),
    StructField("is_buyer_maker", BooleanType(), True),
    StructField("is_best_match", BooleanType(), True)
])


# 1. Define the full path to CSV file in S3
file_path = "s3://digital-asset-analytics-data-lake/raw/BTCUSDT-trades-2025-05.csv"

# Read the CSV file into a Spark DataFrame

raw_trades_df = (spark.read
              .option("header", "false")
              .schema(trade_schema)
              .csv(file_path)
             )


# Chech the first few rows of the DataFrame to verify correctness
print("Successfully loaded data into a Spark DataFrame. Showing the first 10 rows:")
display(raw_trades_df.limit(10))

# Check the schema
print("DataFrame Schema:")
raw_trades_df.printSchema()

Successfully loaded data into a Spark DataFrame. Showing the first 10 rows:


trade_id,price,qty,quote_qty,time,is_buyer_maker,is_best_match
4865299889,94172.0,0.00067,63.09524,1746057600079126,True,True
4865299890,94172.0,0.00011,10.35892,1746057600079126,True,True
4865299891,94172.0,0.00095,89.4634,1746057600079126,True,True
4865299892,94172.01,0.00022,20.7178422,1746057600219133,False,True
4865299893,94172.0,0.00012,11.30064,1746057600466644,True,True
4865299894,94172.0,0.00423,398.34756,1746057600502604,True,True
4865299895,94172.0,0.00202,190.22744,1746057600502604,True,True
4865299896,94172.0,6e-05,5.65032,1746057600502604,True,True
4865299897,94172.0,6e-05,5.65032,1746057600502604,True,True
4865299898,94172.0,6e-05,5.65032,1746057600502604,True,True


DataFrame Schema:
root
 |-- trade_id: long (nullable = true)
 |-- price: double (nullable = true)
 |-- qty: double (nullable = true)
 |-- quote_qty: double (nullable = true)
 |-- time: long (nullable = true)
 |-- is_buyer_maker: boolean (nullable = true)
 |-- is_best_match: boolean (nullable = true)



In [0]:
from pyspark.sql.functions import col
# 'time' column is in microseconds, convert to seconds and cast to timestamp
transformed_df = raw_trades_df.withColumn(
    "trade_time",
    (col("time") / 1000000).cast("timestamp")
)

print("Verification: Displaying original 'time' and new 'trade_time' columns.")
display(transformed_df.select("time", "trade_time").limit(10))

print("\nNew Schema with the 'trade_time' column:")
transformed_df.printSchema()

Verification: Displaying original 'time' and new 'trade_time' columns.


time,trade_time
1746057600079126,2025-05-01T00:00:00.079126Z
1746057600079126,2025-05-01T00:00:00.079126Z
1746057600079126,2025-05-01T00:00:00.079126Z
1746057600219133,2025-05-01T00:00:00.219133Z
1746057600466644,2025-05-01T00:00:00.466644Z
1746057600502604,2025-05-01T00:00:00.502604Z
1746057600502604,2025-05-01T00:00:00.502604Z
1746057600502604,2025-05-01T00:00:00.502604Z
1746057600502604,2025-05-01T00:00:00.502604Z
1746057600502604,2025-05-01T00:00:00.502604Z



New Schema with the 'trade_time' column:
root
 |-- trade_id: long (nullable = true)
 |-- price: double (nullable = true)
 |-- qty: double (nullable = true)
 |-- quote_qty: double (nullable = true)
 |-- time: long (nullable = true)
 |-- is_buyer_maker: boolean (nullable = true)
 |-- is_best_match: boolean (nullable = true)
 |-- trade_time: timestamp (nullable = true)



In [0]:
from pyspark.sql.functions import window, col, first, last, max, min, sum

print("Aggregating raw trades into 1-minute OHLCV candles")

# Group the data into 1-minute windows based on the 'trade_time' column. Then, for each window perform aggregations
candles_df = transformed_df.groupBy(
    window(col("trade_time"), "1 minute", "1 minute")
).agg(
    first("price").alias("open"),      # Get the first price in the window for the 'open'
    max("price").alias("high"),       # Get the maximum price in the window for the 'high'
    min("price").alias("low"),        # Get the minimum price in the window for the 'low'
    last("price").alias("close"),       # Get the last price in the window for the 'close'
    sum("qty").alias("volume")        # Sum up all quantities to get the total 'volume'
).orderBy(
    col("window").asc() # Order the results by time ascending
)

# Verify the results
print("Displaying the first 20 1-minute candles:")
display(candles_df.limit(20))

print("\nSchema of new candles_df:")
candles_df.printSchema()

Aggregating raw trades into 1-minute OHLCV candles
Displaying the first 20 1-minute candles:


window,open,high,low,close,volume
"List(2025-05-01T00:00:00Z, 2025-05-01T00:01:00Z)",94172.0,94177.96,94130.43,94147.3,6.474360000000202
"List(2025-05-01T00:01:00Z, 2025-05-01T00:02:00Z)",94147.31,94190.47,94147.3,94187.02,3.977809999999966
"List(2025-05-01T00:02:00Z, 2025-05-01T00:03:00Z)",94187.01,94198.77,94187.01,94198.75,2.241029999999981
"List(2025-05-01T00:03:00Z, 2025-05-01T00:04:00Z)",94198.75,94198.76,94169.45,94169.45,3.3184799999999632
"List(2025-05-01T00:04:00Z, 2025-05-01T00:05:00Z)",94169.46,94238.1,94156.73,94238.09,7.514380000000452
"List(2025-05-01T00:05:00Z, 2025-05-01T00:06:00Z)",94238.09,94278.12,94221.4,94221.4,17.57883000000025
"List(2025-05-01T00:06:00Z, 2025-05-01T00:07:00Z)",94221.4,94248.49,94221.4,94231.71,6.250770000000031
"List(2025-05-01T00:07:00Z, 2025-05-01T00:08:00Z)",94231.71,94255.08,94181.43,94248.0,10.17593000000036
"List(2025-05-01T00:08:00Z, 2025-05-01T00:09:00Z)",94248.0,94285.72,94240.16,94285.71,12.365790000000038
"List(2025-05-01T00:09:00Z, 2025-05-01T00:10:00Z)",94285.71,94304.0,94285.71,94303.99,3.6578399999999607



Schema of new candles_df:
root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: double (nullable = true)



In [0]:
# Define the output path in S3 bucket.

output_path = "s3://digital-asset-analytics-data-lake/processed/candles/"

print(f"Saving the candle data to: {output_path}")

# Write the DataFrame to S3 in Parquet format

(candles_df.write
    .mode("overwrite")
    .parquet(output_path)
)

print("Save complete")

Saving the candle data to: s3://digital-asset-analytics-data-lake/processed/candles/
Save complete


In [0]:
# Verify the written files in S3
display(dbutils.fs.ls(output_path))

path,name,size,modificationTime
s3://digital-asset-analytics-data-lake/processed/candles/_SUCCESS,_SUCCESS,0,1749928556000
s3://digital-asset-analytics-data-lake/processed/candles/_committed_2868418332403867721,_committed_2868418332403867721,224,1749928556000
s3://digital-asset-analytics-data-lake/processed/candles/_started_2868418332403867721,_started_2868418332403867721,0,1749928555000
s3://digital-asset-analytics-data-lake/processed/candles/part-00000-tid-2868418332403867721-2cce465c-ad59-4385-9024-cd1dcbcab9c1-208-1.c000.snappy.parquet,part-00000-tid-2868418332403867721-2cce465c-ad59-4385-9024-cd1dcbcab9c1-208-1.c000.snappy.parquet,878416,1749928555000
s3://digital-asset-analytics-data-lake/processed/candles/part-00001-tid-2868418332403867721-2cce465c-ad59-4385-9024-cd1dcbcab9c1-209-1.c000.snappy.parquet,part-00001-tid-2868418332403867721-2cce465c-ad59-4385-9024-cd1dcbcab9c1-209-1.c000.snappy.parquet,859825,1749928555000


In [0]:
# --- Final Verification ---

print("Reading the processed Parquet data from S3")
processed_df = spark.read.parquet("s3://digital-asset-analytics-data-lake/processed/candles/")

row_count = processed_df.count()

print("Verification complete.")
print(f"The aggregated 'candles_df' has a total of {row_count:,} rows (1-minute candles).")
print("For a full 31-day month, expected number of rows is 44,640.")

Reading the processed Parquet data from S3
Verification complete.
The aggregated 'candles_df' has a total of 44,640 rows (1-minute candles).
For a full 31-day month, expected number of rows is 44,640.
