# Feature Engineering

BRAUX Owen and CAMBIER Elliot

    This notebook combines on-chain transaction data with market price data to create features for a prediction model.

## Set up spark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_unixtime

In [None]:
spark = SparkSession.builder \
    .appName("BDA - Feature Engineering") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/15 10:16:41 WARN Utils: Your hostname, OBPC, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/11/15 10:16:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/15 10:16:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Load Prepared Datasets

In [None]:
# Load the market price data of notebook 1
price_data_path = "../data/prices/btcusd_1-min_data.csv"
df_prices_raw = spark.read.csv(price_data_path, header=True, inferSchema=True)
df_prices_raw.printSchema()

# Clean up the price data columns
df_prices = df_prices_raw.withColumnRenamed("Timestamp", "unix_timestamp") \
                         .withColumnRenamed("Open", "price_open") \
                         .withColumnRenamed("High", "price_high") \
                         .withColumnRenamed("Low", "price_low") \
                         .withColumnRenamed("Close", "price_close") \
                         .withColumnRenamed("Volume", "volume_btc")

if "Volume_(Currency)" in df_prices.columns:
    df_prices = df_prices.withColumnRenamed("Volume_(Currency)", "volume_currency")

if "Weighted_Price" in df_prices.columns:
    df_prices = df_prices.withColumnRenamed("Weighted_Price", "weighted_price")


# Convert Unix timestamp to a proper timestamp type
df_prices = df_prices.withColumn("timestamp_utc", from_unixtime(col("unix_timestamp")).cast("timestamp"))

print("\n Cleaned Price Data Schema and Sample :")
df_prices.printSchema()
df_prices.select("timestamp_utc", "price_open", "price_close", "volume_btc").show(5)

                                                                                

root
 |-- Timestamp: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)


 Cleaned Price Data Schema and Sample :
root
 |-- unix_timestamp: double (nullable = true)
 |-- price_open: double (nullable = true)
 |-- price_high: double (nullable = true)
 |-- price_low: double (nullable = true)
 |-- price_close: double (nullable = true)
 |-- volume_btc: double (nullable = true)
 |-- timestamp_utc: timestamp (nullable = true)

+-------------------+----------+-----------+----------+
|      timestamp_utc|price_open|price_close|volume_btc|
+-------------------+----------+-----------+----------+
|2012-01-01 11:01:00|      4.58|       4.58|       0.0|
|2012-01-01 11:02:00|      4.58|       4.58|       0.0|
|2012-01-01 11:03:00|      4.58|       4.58|       0.0|
|2012-01-01 11:04:00|      4.58|       4.58|       0.0|
|2012-01-01 11:05:00|      4

## Aggregate Data

In [4]:
from pyspark.sql.functions import window, sum, count, avg

In [5]:
# load the parquet file
transactions_path = "../data/processed/transactions.parquet"
df_transactions = spark.read.parquet(transactions_path)

print("\n data schema :")
df_transactions.printSchema()
df_transactions.show(5)

# Aggregate transaction data into 1-minute windows
# (We group transactions by time windows to match the granularity of our price data)
onchain_features_df = df_transactions.groupBy(
    # 'window' creates tumbling (non-overlapping) windows of a specified duration.
    window(col("timestamp_utc"), "1 minute")
).agg(
    count("*").alias("tx_count"),
    sum("total_amount_btc").alias("tx_volume_btc"),
    avg("n_inputs").alias("avg_inputs"),
    avg("n_outputs").alias("avg_outputs")
)

print("\n--- On-Chain Features Schema and Sample ---")
onchain_features_df.printSchema()

# Show the results, sorting by the window time
onchain_features_df.sort("window").show(10, truncate=False)


 data schema :
root
 |-- block_timestamp: long (nullable = true)
 |-- n_inputs: integer (nullable = true)
 |-- n_outputs: integer (nullable = true)
 |-- total_amount_satoshi: long (nullable = true)
 |-- total_amount_btc: double (nullable = true)
 |-- timestamp_utc: timestamp (nullable = true)

+---------------+--------+---------+--------------------+----------------+-------------------+
|block_timestamp|n_inputs|n_outputs|total_amount_satoshi|total_amount_btc|      timestamp_utc|
+---------------+--------+---------+--------------------+----------------+-------------------+
|     1339713349|       1|        1|          5025512500|       50.255125|2012-06-15 00:35:49|
|     1339713349|       4|        2|          9779326418|     97.79326418|2012-06-15 00:35:49|
|     1339713349|       3|        2|         14502850000|        145.0285|2012-06-15 00:35:49|
|     1339713349|       1|        1|           100000000|             1.0|2012-06-15 00:35:49|
|     1339713349|       2|        2|   



+------------------------------------------+--------+------------------+------------------+------------------+
|window                                    |tx_count|tx_volume_btc     |avg_inputs        |avg_outputs       |
+------------------------------------------+--------+------------------+------------------+------------------+
|{2012-06-10 00:54:00, 2012-06-10 00:55:00}|87      |750.3537802699998 |2.4597701149425286|1.9540229885057472|
|{2012-06-10 01:54:00, 2012-06-10 01:55:00}|132     |1869.5332449499995|1.7954545454545454|1.946969696969697 |
|{2012-06-10 03:25:00, 2012-06-10 03:26:00}|189     |977.1499989299997 |1.8783068783068784|2.5396825396825395|
|{2012-06-10 05:14:00, 2012-06-10 05:15:00}|554     |4660.899853159999 |2.108303249097473 |3.267148014440433 |
|{2012-06-10 06:46:00, 2012-06-10 06:47:00}|233     |1512.524592979997 |1.793991416309013 |2.3433476394849784|
|{2012-06-10 08:18:00, 2012-06-10 08:19:00}|81      |3584.4071749400005|1.5925925925925926|1.9876543209876543|
|

                                                                                

## Join On-Chain Features with Market Prices

In [6]:
from pyspark.sql.functions import col

In [7]:
# Prepare the data for the join.
#  renamed to 'timestamp_utc' to match the price df column
onchain_features_to_join = onchain_features_df.withColumn("timestamp_utc", col("window.start")) \
                                              .drop("window")

# join both df
print("Joining price data with on-chain features...")
df_combined = df_prices.join(
    onchain_features_to_join,
    on="timestamp_utc",  # The common column to join on
    how="left"
)

# cleaning we replace nulls with 0
feature_columns = ["tx_count", "tx_volume_btc", "avg_inputs", "avg_outputs"]
df_combined = df_combined.fillna(0, subset=feature_columns)

df_combined.printSchema() # Quick check

print("\n Sample of the Final Combined DataFrame : ")
df_combined.select(
    "timestamp_utc",
    "price_close",
    "tx_count",
    "tx_volume_btc"
).sort("timestamp_utc", ascending=False).show(15)

# Count the rows to make sure we didn't lose our price data
print(f"Total rows in price data: {df_prices.count():,}")
print(f"Total rows in combined data: {df_combined.count():,}")

Joining price data with on-chain features...
root
 |-- timestamp_utc: timestamp (nullable = true)
 |-- unix_timestamp: double (nullable = true)
 |-- price_open: double (nullable = true)
 |-- price_high: double (nullable = true)
 |-- price_low: double (nullable = true)
 |-- price_close: double (nullable = true)
 |-- volume_btc: double (nullable = true)
 |-- tx_count: long (nullable = true)
 |-- tx_volume_btc: double (nullable = false)
 |-- avg_inputs: double (nullable = false)
 |-- avg_outputs: double (nullable = false)


 Sample of the Final Combined DataFrame : 


                                                                                

+-------------------+-----------+--------+-------------+
|      timestamp_utc|price_close|tx_count|tx_volume_btc|
+-------------------+-----------+--------+-------------+
|2025-10-14 01:56:00|   115242.0|       0|          0.0|
|2025-10-14 01:55:00|   115217.0|       0|          0.0|
|2025-10-14 01:54:00|   115221.0|       0|          0.0|
|2025-10-14 01:53:00|   115218.0|       0|          0.0|
|2025-10-14 01:52:00|   115232.0|       0|          0.0|
|2025-10-14 01:51:00|   115213.0|       0|          0.0|
|2025-10-14 01:50:00|   115211.0|       0|          0.0|
|2025-10-14 01:49:00|   115174.0|       0|          0.0|
|2025-10-14 01:48:00|   115159.0|       0|          0.0|
|2025-10-14 01:47:00|   115107.0|       0|          0.0|
|2025-10-14 01:46:00|   115109.0|       0|          0.0|
|2025-10-14 01:45:00|   115134.0|       0|          0.0|
|2025-10-14 01:44:00|   115282.0|       0|          0.0|
|2025-10-14 01:43:00|   115293.0|       0|          0.0|
|2025-10-14 01:42:00|   115265.

[Stage 23:>                                                         (0 + 8) / 8]

Total rows in combined data: 7,248,636


                                                                                

In [9]:
print(" Inspecting the combined data around June 2012 :")

df_combined.filter("timestamp_utc between '2012-06-15 00:30:00' and '2012-06-15 00:45:00'") \
           .select("timestamp_utc", "price_close", "tx_count", "tx_volume_btc") \
           .sort("timestamp_utc") \
           .show()

 Inspecting the combined data around June 2012 :




+-------------------+-----------+--------+------------------+
|      timestamp_utc|price_close|tx_count|     tx_volume_btc|
+-------------------+-----------+--------+------------------+
|2012-06-15 00:30:00|       5.87|       0|               0.0|
|2012-06-15 00:31:00|       5.87|       0|               0.0|
|2012-06-15 00:32:00|       5.87|       0|               0.0|
|2012-06-15 00:33:00|       5.87|       0|               0.0|
|2012-06-15 00:34:00|       5.87|       0|               0.0|
|2012-06-15 00:35:00|       5.87|     413|1413.6811811800012|
|2012-06-15 00:36:00|       5.87|     263|1029.1968248699995|
|2012-06-15 00:37:00|       5.87|       0|               0.0|
|2012-06-15 00:38:00|       5.87|       0|               0.0|
|2012-06-15 00:39:00|       5.87|       0|               0.0|
|2012-06-15 00:40:00|       5.87|       0|               0.0|
|2012-06-15 00:41:00|       5.87|       0|               0.0|
|2012-06-15 00:42:00|       5.87|       0|               0.0|
|2012-06

                                                                                

In [11]:
df_combined.write.mode("overwrite").parquet("../data/processed/features.parquet")
print("Saving in parquet format Done !")



Saving in parquet format Done !


                                                                                