### 🧠 Feature Engineering Overview

**Purpose**: The following features are engineered to enhance the model's ability to learn short-term dynamics, long-term trends, market momentum, and behavioral signals from Bitcoin price data. Each category contributes to improving directional prediction accuracy, which is especially important in financial forecasting.

---

#### ⏰ Time Features
- `hour`, `day`, `day_of_week`: Capture diurnal and weekly patterns, helping the model learn periodic behaviors such as increased volatility during certain hours or days.

---

#### 💹 Price Returns
- `price_return_1h`, `price_return_24h`: Measure percentage price change over hourly and daily intervals to highlight momentum shifts and market sentiment.

---

#### 📉 Moving Averages
- `ma_6h`, `ma_24h`, `ma_72h`: Smooth out price noise to reveal short-, medium-, and long-term trends. Often used to identify trend direction and potential support/resistance levels.

---

#### ⏪ Lag Features
- `price_lag_1h`, `price_lag_24h`: Provide access to recent historical price levels, essential for time series modeling and understanding sequential dependencies.

---

#### 📊 Volatility
- `volatility_6h`, `volatility_24h`: Capture price dispersion over time, signaling market uncertainty or risk. High volatility may indicate upcoming trend reversals or strong momentum.

---

#### ⚡ Momentum Indicators
- `momentum_1h`, `momentum_6h`, `momentum_24h`, `momentum_72h`: Quantify the speed and magnitude of price changes to help detect emerging trends or reversals.

---

#### 📦 Volume-Based Features
- `volume_lag_1h`, `volume_lag_24h`: Reflect recent volume activity.
- `volume_ma_6h`, `volume_ma_24h`: Identify trends in trading activity.
- `volume_return_1h`: Measure percentage change in volume, helpful for detecting surges or drops in interest.

---

#### 📈 Relative Strength Index (RSI)
- `rsi_14`: Momentum oscillator that compares recent gains and losses over a 14-period window to identify overbought or oversold conditions.

---

#### 🔄 Rolling Trend Indicator
- `trend_up_count_6h`: Tracks the number of upward price movements over the past 6 hours, offering a simple but effective measure of short-term trend consistency.

---

#### 📉 MACD (Moving Average Convergence Divergence)
- `ema12`, `ema26`, `macd`: Detects trend direction and momentum by comparing short-term and long-term exponential moving averages. Useful for anticipating trend shifts and convergence/divergence patterns.

---

#### 🧹 Data Cleaning
- Missing or incomplete records (`NaNs`) are dropped to ensure integrity and consistency throughout the dataset, preventing issues in model training or evaluation.

---

These engineered features collectively improve the model’s capability to capture temporal structure, momentum dynamics, and market sentiment — all critical for accurate and directionally consistent Bitcoin price forecasting.



In [0]:
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import ArrayType, DoubleType
import numpy as np
from pyspark.sql.window import Window
from pyspark.sql.functions import collect_list, size, expr

In [0]:
# Get data from delta table
df= spark.read.table("bitcoin_prices.btc_data")

# Show the Spark DataFrame
df.show()

+--------------------+-----------------+--------------------+--------------------+----------+
|           timestamp|            price|          market_cap|              volume|  hour_key|
+--------------------+-----------------+--------------------+--------------------+----------+
|2025-04-08 09:05:...|79411.29383844102|1.576133830964966...|7.942999980693149E10|2025040809|
|2025-04-08 10:04:...|79446.76021326096|1.576845423451024...|7.501415642523972E10|2025040810|
|2025-04-08 11:07:...|79090.38478456163|1.569534776726553...|7.184955018901585E10|2025040811|
|2025-04-08 12:05:...|78932.56330430893|1.567005815002176E12| 7.06165115924608E10|2025040812|
|2025-04-08 13:03:...|78970.69563692114|1.568271491229540...|6.855373497740481E10|2025040813|
|2025-04-08 14:09:...|79860.58407289122|1.586005815663762...|6.538772873357519...|2025040814|
|2025-04-08 15:05:...|79830.81203946835|1.584360309939240...|6.592033073258972E10|2025040815|
|2025-04-08 16:04:...|79839.36409691784|1.585428515676060...

In [0]:
# EMA calculation helper
def ema(values, span):
    alpha = 2 / (span + 1)
    ema_values = []
    ema_prev = values[0]
    ema_values.append(ema_prev)
    
    for price in values[1:]:
        ema_now = alpha * price + (1 - alpha) * ema_prev
        ema_values.append(ema_now)
        ema_prev = ema_now
        
    return ema_values

# UDFs
from pyspark.sql.functions import udf

ema12_udf = udf(lambda x: ema(x, 12), ArrayType(DoubleType()))
ema26_udf = udf(lambda x: ema(x, 26), ArrayType(DoubleType()))

In [0]:
# Collect Price History
w_price = Window.orderBy("timestamp").rowsBetween(-100, 0)
df = df.withColumn("price_history", collect_list("price").over(w_price))

# Apply EMA UDFs
df = df.withColumn("ema12_series", ema12_udf("price_history"))
df = df.withColumn("ema26_series", ema26_udf("price_history"))

df = df.withColumn("ema12", expr("ema12_series[size(ema12_series)-1]"))
df = df.withColumn("ema26", expr("ema26_series[size(ema26_series)-1]"))

# Calculate MACD
df = df.withColumn("macd", df["ema12"] - df["ema26"])

# Clean up temporary columns
df = df.drop("price_history", "ema12_series", "ema26_series")



In [0]:
# Step 1: Extract Time Features
df = df.withColumn('hour', F.hour(df['timestamp']))
df = df.withColumn('day', F.dayofmonth(df['timestamp']))
df = df.withColumn('day_of_week', F.dayofweek(df['timestamp']))  # 1 = Sunday, 7 = Saturday

# Step 2: Calculate Price Returns
df = df.withColumn('price_return_1h', (df['price'] - F.lag(df['price'], 1).over(Window.orderBy('timestamp'))) / F.lag(df['price'], 1).over(Window.orderBy('timestamp')))
df = df.withColumn('price_return_24h', (df['price'] - F.lag(df['price'], 24).over(Window.orderBy('timestamp'))) / F.lag(df['price'], 24).over(Window.orderBy('timestamp')))

# Step 3: Calculate Moving Averages
df = df.withColumn('ma_6h', F.avg(df['price']).over(Window.orderBy('timestamp').rowsBetween(-5, 0)))  # 6 hours moving average
df = df.withColumn('ma_24h', F.avg(df['price']).over(Window.orderBy('timestamp').rowsBetween(-23, 0)))  # 24 hours moving average
df = df.withColumn('ma_72h', F.avg(df['price']).over(Window.orderBy('timestamp').rowsBetween(-71, 0)))  # 72 hours moving average

# Step 4: Lag Features
df = df.withColumn('price_lag_1h', F.lag(df['price'], 1).over(Window.orderBy('timestamp')))
df = df.withColumn('price_lag_24h', F.lag(df['price'], 24).over(Window.orderBy('timestamp')))

# Step 5: Calculate Volatility
df = df.withColumn('volatility_6h', F.stddev(df['price']).over(Window.orderBy('timestamp').rowsBetween(-5, 0)))
df = df.withColumn('volatility_24h', F.stddev(df['price']).over(Window.orderBy('timestamp').rowsBetween(-23, 0)))

# Step 6: Momentum
df = df.withColumn("momentum_1h", df["price"] - df["price_lag_1h"])
df = df.withColumn("momentum_6h", df["price"] - df["ma_6h"])
df = df.withColumn("momentum_24h", df["price"] - df["ma_24h"])
df = df.withColumn("momentum_72h", df["price"] - df["ma_72h"])

# Step 7: Lag and Moving Average
df = df.withColumn("volume_lag_1h", F.lag("volume", 1).over(Window.orderBy("timestamp")))
df = df.withColumn("volume_lag_24h", F.lag("volume", 24).over(Window.orderBy("timestamp")))
df = df.withColumn("volume_ma_6h", F.avg("volume").over(Window.orderBy("timestamp").rowsBetween(-5, 0)))
df = df.withColumn("volume_ma_24h", F.avg("volume").over(Window.orderBy("timestamp").rowsBetween(-23, 0)))

# Step 8: Volume return
df = df.withColumn("volume_return_1h", 
    (df["volume"] - df["volume_lag_1h"]) / df["volume_lag_1h"]
)

# Step 9: Calculate RSI
# Price change
df = df.withColumn("delta", df["price"] - F.lag("price", 1).over(Window.orderBy("timestamp")))
# Gains and losses
df = df.withColumn("gain", F.when(df["delta"] > 0, df["delta"]).otherwise(0))
df = df.withColumn("loss", F.when(df["delta"] < 0, -df["delta"]).otherwise(0))
# Average gain/loss over 14 periods
avg_window = Window.orderBy("timestamp").rowsBetween(-13, 0)
df = df.withColumn("avg_gain", F.avg("gain").over(avg_window))
df = df.withColumn("avg_loss", F.avg("loss").over(avg_window))
# RS and RSI
df = df.withColumn("rs", df["avg_gain"] / (df["avg_loss"] + F.lit(1e-9)))  # Avoid div/0
df = df.withColumn("rsi_14", 100 - (100 / (1 + df["rs"])))

# Step 10: Hourly rolling trend 
# Price increase compared to previous hour
df = df.withColumn("price_increase", F.when(df["delta"] > 0, 1).otherwise(0))
# Sum over rolling 6 hours
trend_window = Window.orderBy("timestamp").rowsBetween(-5, 0)
df = df.withColumn("trend_up_count_6h", F.sum("price_increase").over(trend_window))

# Step 11: Clean up
df = df.drop("delta", "gain", "loss", "avg_gain", "avg_loss", "rs")
df = df.dropna()

# Step 12: Drop NaNs
df = df.dropna()

# Show the resulting DataFrame
df.show()




+--------------------+-----------------+--------------------+--------------------+----------+-----------------+-----------------+-------------------+----+---+-----------+--------------------+--------------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------+-----------------+
|           timestamp|            price|          market_cap|              volume|  hour_key|            ema12|            ema26|               macd|hour|day|day_of_week|     price_return_1h|    price_return_24h|            ma_6h|           ma_24h|           ma_72h|     price_lag_1h|    price_lag_24h|     volatility_6h|    volatility_24h|        momentum_1h|        momentum_6h|       momentum_24h|       momentum_72h|  