In [173]:
import os
import time
import pandas as pd
import numpy as np
#import yfinance as yf
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DateType, IntegerType, DoubleType

In [174]:
# Crear carpeta temporal Spark
temp_path = os.path.join(os.getcwd(), 'spark-temp')
os.makedirs(temp_path, exist_ok=True)

# Definir variables de entorno
os.environ["JAVA_HOME"] = r"C:\java\jdk"
os.environ['SPARK_LOCAL_DIRS'] = temp_path

print('JAVA_HOME:', os.environ.get('JAVA_HOME'))
print('SPARK_LOCAL_DIRS:', os.environ.get('SPARK_LOCAL_DIRS'))

JAVA_HOME: C:\java\jdk
SPARK_LOCAL_DIRS: C:\Users\TESTER\Desktop\Laboral\GIT\btc-3-asset-portfolio-extension\notebooks\spark-temp


In [175]:
# Crear Spark Session y medir tiempo
start_time = time.time()

spark = SparkSession.builder \
.appName('btcproject') \
    .getOrCreate()
#.config('spark.driver.memory', '8g') \
#.config('spark.executor.memory', '8g') \
#.config('spark.local.dir', temp_path) \


#spark.sparkContext.setLogLevel('ERROR')

end_time = time.time()
print('Spark Version:', spark.version)
print(f'Tiempo total en crear SparkSession: {round(end_time - start_time, 2)} segundos')

Spark Version: 3.5.5
Tiempo total en crear SparkSession: 0.01 segundos


# Data Cleaning
## Btc Dataset

df_btc = spark.read \
.option("header", True) \
.option("sep", ",") \
.option("inferSchema", True) \
.csv('../data/Bitcoin Historical Data.csv')

df_btc.show(20)
df_btc.printSchema()

## About the Bitcoin Dataset

This dataset contains **monthly data** for Bitcoin from **August 1, 2010** to **April 1, 2025**.

> ⚠️ **Note**: The data for **April 2025** is not final, as this project was created during the same month. Interpret that row with caution.

---

### Key Information:

- Each row represents **a full calendar month**.
- The `Date` column uses the **first day of the month** as a label (e.g. `2025-04-01` refers to data from April 2025).
- The values shown reflect **the entire month**, not just that specific day.

---

### Column Descriptions:

| Column   | Description                                                |
|----------|------------------------------------------------------------|
| `Date`   | First day of the month (used as time label)                |
| `Open`   | Price at the start of the month                            |
| `Price`  | Closing price at the end of the month                      |
| `High`   | Maximum price reached during the month                     |
| `Low`    | Minimum price reached during the month                     |
| `Change` | % change between the opening and closing price of the month|

> ℹ️ Later on, I rename the column `Price` to `Close` to make it clearer in context.


df_btc = df_btc.withColumn("Date", F.to_date("Date", "MM/dd/yyyy"))

df_btc = df_btc.select(#F.date_format("Date", "dd-MM-yyyy").alias("Date"), Cambia a String y no quiero
              F.col("Date"),
              F.col("Price").alias("Close"),
              F.col("Open"),
              F.col("High"),
              F.col("Low"),
              F.col("Change %").alias("Change")
             )
df_btc.show()

df_btc.printSchema()

df_btc.select(
    F.min("Date").alias("Fecha mínima"),
    F.max("Date").alias("Fecha máxima")
).show()


df_btc = df_btc.withColumn(
    "block_reward",
    F.when(F.col("Date") <= F.lit("2012-11-01"), 50)
     .when((F.col("Date") > F.lit("2012-11-01")) & (F.col("Date") <= F.lit("2016-06-01")), 25)
     .when((F.col("Date") > F.lit("2016-06-01")) & (F.col("Date") <= F.lit("2020-04-01")), 12.5)
     .when((F.col("Date") > F.lit("2020-04-01")) & (F.col("Date") <= F.lit("2024-04-01")), 6.25)
     .otherwise(3.125)
)

df_btc.orderBy(F.col("Date").desc()).toPandas().head(13)

## Block Reward Column

To account for Bitcoin’s changing monetary policy, I added a `block_reward` column that reflects the number of BTC miners received per block over time.

---

### Halving Schedule:

| Halving | Date             | Block Reward |
|---------|------------------|--------------|
| 1st     | Nov 28, 2012     | 25 BTC       |
| 2nd     | Jul 9, 2016      | 12.5 BTC     |
| 3rd     | May 11, 2020     | 6.25 BTC     |
| 4th     | Apr 20, 2024     | 3.125 BTC    |

---

### Logic:

- My dataset is monthly (each row = 1st day of the month).
- Since the 2012 and 2024 halvings occurred at the **end of the month**, I assigned those entire months (`2012-11` and `2024-04`) to the **previous reward**.
- This better reflects the fact that most of the blocks mined during those months still followed the old reward.

---

### Summary:

| Period                 | Assigned Block Reward |
|------------------------|------------------------|
| ≤ November 2012        | 50 BTC                 |
| December 2012 – June 2016 | 25 BTC             |
| July 2016 – April 2020    | 12.5 BTC            |
| May 2020 – April 2024     | 6.25 BTC            |
| ≥ May 2024               | 3.125 BTC            |

This column will help in analyzing how price and volatility behave across halving cycles.


df_btc.printSchema()

cols_to_convert = ["Close", "Open", "High", "Low"]

for col in cols_to_convert:
    df_btc = df_btc.withColumn(col, F.regexp_replace(F.col(col), ",", "").cast(DoubleType()))

df_btc = df_btc.withColumn(
    "Change",
    F.regexp_replace(F.col("Change"), "%", "").cast(DoubleType())
)
df_btc.printSchema()

df_btc.show(10)

df_extended = spark.read \
    .option("header", True) \
    .option("sep", ",") \
    .option("inferSchema", True) \
    .csv('../data/final_df_extended.csv')

df_extended = df_extended.withColumn(
    "Date",
    F.to_date(F.substring(F.col("month"), 1, 10), "yyyy-MM-dd")
)

df_extended = df_extended.drop("month")



df_extended.show(10)


df_combined = df_btc.join(
    df_extended.drop("close_btc", "high_btc", "low_btc"),  # Evitar duplicação de colunas
    on="Date",
    how="left"
)

df_combined.show(5)
df_combined.printSchema()


df_combined.filter(
    F.col("close_sp500").isNull()
).show()

## Daily Bitcoin

In [176]:
df_btc = spark.read \
    .option("header", True) \
    .option("sep", ",") \
    .option("inferSchema", True) \
    .csv('../data/bitcoin_2010-07-17_2025-04-22.csv')

In [177]:
df_btc.show()
df_btc.printSchema()

+----------+----------+--------+--------+--------+--------+--------------------+--------------------+
|     Start|       End|    Open|    High|     Low|   Close|              Volume|          Market Cap|
+----------+----------+--------+--------+--------+--------+--------------------+--------------------+
|2025-04-21|2025-04-22|85190.58|88358.48|85190.58|87525.71|4.610174416478397E10|1.733086941755306...|
|2025-04-20|2025-04-21|85059.77|85320.15|84018.53|85095.42|2.363965210136934E10|1.683010284160163...|
|2025-04-19|2025-04-20|84484.98|85510.04| 84368.6|85127.56|2.200324311390941E10|1.689659242621345E12|
|2025-04-18|2025-04-19|84892.79|85109.28|84344.03|84464.64|3.094039408019164E10|1.679968878910571...|
|2025-04-17|2025-04-18|84081.07|85426.94|83814.62|84931.22|4.330348236714634E10|1.678993522204320...|
|2025-04-16|2025-04-17|83652.14|85332.95| 83163.1|84051.96|4.534236508933449...|1.667600790769742...|
|2025-04-15|2025-04-16|84521.75|86413.41|83632.78|83677.65|5.796099945000349E10|1.

In [178]:
df_btc = df_btc.select(
    F.col("Start").alias("Date"),
    F.col("Open").alias("open_btc"),
    F.col("High").alias("high_btc"),
    F.col("Low").alias("low_btc"),
    F.col("Close").alias("close_btc"),
    F.col("Volume").alias("volume_btc")
)

In [179]:
df_btc.select(
    F.min("Date").alias("Fecha Minima"),
    F.max("Date").alias("Fecha Maxima")
).show()

+------------+------------+
|Fecha Minima|Fecha Maxima|
+------------+------------+
|  2010-07-17|  2025-04-21|
+------------+------------+



In [180]:
df_btc.show()

+----------+--------+--------+--------+---------+--------------------+
|      Date|open_btc|high_btc| low_btc|close_btc|          volume_btc|
+----------+--------+--------+--------+---------+--------------------+
|2025-04-21|85190.58|88358.48|85190.58| 87525.71|4.610174416478397E10|
|2025-04-20|85059.77|85320.15|84018.53| 85095.42|2.363965210136934E10|
|2025-04-19|84484.98|85510.04| 84368.6| 85127.56|2.200324311390941E10|
|2025-04-18|84892.79|85109.28|84344.03| 84464.64|3.094039408019164E10|
|2025-04-17|84081.07|85426.94|83814.62| 84931.22|4.330348236714634E10|
|2025-04-16|83652.14|85332.95| 83163.1| 84051.96|4.534236508933449...|
|2025-04-15|84521.75|86413.41|83632.78| 83677.65|5.796099945000349E10|
|2025-04-14| 83779.1| 85756.0| 83773.4| 84495.07|7.025542211409059E10|
|2025-04-13|85285.14|85780.08| 83171.6|  83693.1|6.052662212396167E10|
|2025-04-12|83359.29|85786.19|82810.94| 85288.46|6.935436243821603E10|
|2025-04-11|79575.97|84102.66| 79006.6| 83520.03|6.817255156309059E10|
|2025-

In [181]:
df_nulls = df_btc.filter(
    F.col("open_btc").isNull() |
    F.col("high_btc").isNull() |
    F.col("low_btc").isNull() |
    F.col("close_btc").isNull() |
    F.col("volume_btc").isNull()
)
df_nulls.show()

+----+--------+--------+-------+---------+----------+
|Date|open_btc|high_btc|low_btc|close_btc|volume_btc|
+----+--------+--------+-------+---------+----------+
+----+--------+--------+-------+---------+----------+



In [182]:
df_btc = df_btc.withColumn(
    "block_reward",
    F.when(F.col("Date") <= F.lit("2012-11-28"), 50)      # 1er halving: 28/11/2012
     .when((F.col("Date") > F.lit("2012-11-28")) & 
           (F.col("Date") <= F.lit("2016-07-09")), 25)    # 2do halving: 09/07/2016
     .when((F.col("Date") > F.lit("2016-07-09")) & 
           (F.col("Date") <= F.lit("2020-05-11")), 12.5)  # 3er halving: 11/05/2020
     .when((F.col("Date") > F.lit("2020-05-11")) & 
           (F.col("Date") <= F.lit("2024-04-20")), 6.25)  # 4to halving: 20/04/2024
     .otherwise(3.125)                                    # Post 4to halving
)
df_btc.show()

+----------+--------+--------+--------+---------+--------------------+------------+
|      Date|open_btc|high_btc| low_btc|close_btc|          volume_btc|block_reward|
+----------+--------+--------+--------+---------+--------------------+------------+
|2025-04-21|85190.58|88358.48|85190.58| 87525.71|4.610174416478397E10|       3.125|
|2025-04-20|85059.77|85320.15|84018.53| 85095.42|2.363965210136934E10|       3.125|
|2025-04-19|84484.98|85510.04| 84368.6| 85127.56|2.200324311390941E10|       3.125|
|2025-04-18|84892.79|85109.28|84344.03| 84464.64|3.094039408019164E10|       3.125|
|2025-04-17|84081.07|85426.94|83814.62| 84931.22|4.330348236714634E10|       3.125|
|2025-04-16|83652.14|85332.95| 83163.1| 84051.96|4.534236508933449...|       3.125|
|2025-04-15|84521.75|86413.41|83632.78| 83677.65|5.796099945000349E10|       3.125|
|2025-04-14| 83779.1| 85756.0| 83773.4| 84495.07|7.025542211409059E10|       3.125|
|2025-04-13|85285.14|85780.08| 83171.6|  83693.1|6.052662212396167E10|      

## Daily SP500

In [183]:
df_sp500 = spark.read \
    .option("header", True) \
    .option("sep", ",") \
    .option("inferSchema", True) \
    .csv('../data/spx_d.csv')

In [184]:
df_sp500.show()
df_sp500.printSchema()

+----------+-------+-------+-------+-------+-------------+
|      Date|   Open|   High|    Low|  Close|       Volume|
+----------+-------+-------+-------+-------+-------------+
|2010-01-04|1116.56|1133.87|1116.56|1132.99|2.217444444E9|
|2010-01-05|1132.66|1136.63|1129.66|1136.52|     1.3839E9|
|2010-01-06|1135.71|1139.19|1133.95|1137.14|2.762588889E9|
|2010-01-07|1136.27|1142.46|1131.32|1141.69|2.928155556E9|
|2010-01-08|1140.52|1145.39|1136.22|1144.98|2.438661111E9|
|2010-01-11|1145.96|1149.74|1142.02|1146.98|2.364322222E9|
|2010-01-12|1143.81|1143.81|1131.77|1136.22|2.620088889E9|
|2010-01-13|1137.31| 1148.4|1133.18|1145.68|2.316866667E9|
|2010-01-14|1145.68|1150.41| 1143.8|1148.46|2.175111111E9|
|2010-01-15|1147.72|1147.77|1131.39|1136.03|2.643738889E9|
|2010-01-19|1136.03|1150.45|1135.77|1150.23|2.624905556E9|
|2010-01-20|1147.95|1147.95|1129.25|1138.04|2.672533333E9|
|2010-01-21|1138.68|1141.58|1114.84|1116.48|    3.81905E9|
|2010-01-22|1115.49|1115.49|1090.18|1091.76|    3.44925E

In [185]:
df_sp500.select(
    F.min("Date").alias("Fecha Minima"),
    F.max("Date").alias("Fecha Maxima")
).show()

+------------+------------+
|Fecha Minima|Fecha Maxima|
+------------+------------+
|  2010-01-04|  2025-04-21|
+------------+------------+



In [186]:
df_sp500 = df_sp500.select(
    F.col("Date"),
    F.col("Open").alias("open_sp500"),
    F.col("High").alias("high_sp500"),
    F.col("Low").alias("low_sp500"),
    F.col("Close").alias("close_sp500"),
    F.col("Volume").alias("volume_sp500")
)


In [187]:
df_sp500.show()

+----------+----------+----------+---------+-----------+-------------+
|      Date|open_sp500|high_sp500|low_sp500|close_sp500| volume_sp500|
+----------+----------+----------+---------+-----------+-------------+
|2010-01-04|   1116.56|   1133.87|  1116.56|    1132.99|2.217444444E9|
|2010-01-05|   1132.66|   1136.63|  1129.66|    1136.52|     1.3839E9|
|2010-01-06|   1135.71|   1139.19|  1133.95|    1137.14|2.762588889E9|
|2010-01-07|   1136.27|   1142.46|  1131.32|    1141.69|2.928155556E9|
|2010-01-08|   1140.52|   1145.39|  1136.22|    1144.98|2.438661111E9|
|2010-01-11|   1145.96|   1149.74|  1142.02|    1146.98|2.364322222E9|
|2010-01-12|   1143.81|   1143.81|  1131.77|    1136.22|2.620088889E9|
|2010-01-13|   1137.31|    1148.4|  1133.18|    1145.68|2.316866667E9|
|2010-01-14|   1145.68|   1150.41|   1143.8|    1148.46|2.175111111E9|
|2010-01-15|   1147.72|   1147.77|  1131.39|    1136.03|2.643738889E9|
|2010-01-19|   1136.03|   1150.45|  1135.77|    1150.23|2.624905556E9|
|2010-

In [188]:
df_nulls = df_sp500.filter(
    F.col("open_sp500").isNull() |
    F.col("high_sp500").isNull() |
    F.col("low_sp500").isNull() |
    F.col("close_sp500").isNull() |
    F.col("volume_sp500").isNull()
)
df_nulls.show()

+----+----------+----------+---------+-----------+------------+
|Date|open_sp500|high_sp500|low_sp500|close_sp500|volume_sp500|
+----+----------+----------+---------+-----------+------------+
+----+----------+----------+---------+-----------+------------+



## CPI 
## 100 = 2010-01-01

In [189]:
df_cpi = spark.read \
    .option("header", True) \
    .option("sep", ",") \
    .option("inferSchema", True) \
    .csv('../data/CPIAUCSL_NBD20100101.csv')

In [190]:
df_cpi.show()
df_cpi.printSchema()

+----------------+--------------------+
|observation_date|CPIAUCSL_NBD20100101|
+----------------+--------------------+
|      2010-01-01|               100.0|
|      2010-02-01|            99.90482|
|      2010-03-01|            99.93793|
|      2010-04-01|            99.96092|
|      2010-05-01|            99.90896|
|      2010-06-01|            99.86712|
|      2010-07-01|            100.0538|
|      2010-08-01|           100.20001|
|      2010-09-01|           100.36186|
|      2010-10-01|            100.7113|
|      2010-11-01|           100.96649|
|      2010-12-01|           101.37203|
|      2011-01-01|           101.70078|
|      2011-02-01|            102.0277|
|      2011-03-01|           102.55554|
|      2011-04-01|           103.03695|
|      2011-05-01|           103.36478|
|      2011-06-01|           103.36478|
|      2011-07-01|            103.6356|
|      2011-08-01|           103.96252|
+----------------+--------------------+
only showing top 20 rows

root
 |-- obse

In [191]:
df_cpi = df_cpi.select(
    F.col("observation_date").alias("Date"),
    F.col("CPIAUCSL_NBD20100101").alias("cpi")
)

In [192]:
df_cpi.show()

+----------+---------+
|      Date|      cpi|
+----------+---------+
|2010-01-01|    100.0|
|2010-02-01| 99.90482|
|2010-03-01| 99.93793|
|2010-04-01| 99.96092|
|2010-05-01| 99.90896|
|2010-06-01| 99.86712|
|2010-07-01| 100.0538|
|2010-08-01|100.20001|
|2010-09-01|100.36186|
|2010-10-01| 100.7113|
|2010-11-01|100.96649|
|2010-12-01|101.37203|
|2011-01-01|101.70078|
|2011-02-01| 102.0277|
|2011-03-01|102.55554|
|2011-04-01|103.03695|
|2011-05-01|103.36478|
|2011-06-01|103.36478|
|2011-07-01| 103.6356|
|2011-08-01|103.96252|
+----------+---------+
only showing top 20 rows



## Join

In [193]:
from pyspark.sql.functions import year, month

# 1. Crear columnas de año y mes en ambos datasets
df_btc_month = df_btc.withColumn("year", year("Date")) \
                    .withColumn("month", month("Date"))

df_btc_month.show(5)
df_cpi_month = df_cpi.withColumn("year", year("Date")) \
                    .withColumn("month", month("Date"))

# 2. Hacer join usando las columnas de año y mes
df_combined = df_btc_month.join(
    df_cpi_month.drop("Date"),
    ["year", "month"],
    "left"
).drop("year", "month")

# 3. Ordenar por fecha
df_combined = df_combined.orderBy(F.col("Date").desc())

df_combined.show(30)


+----------+--------+--------+--------+---------+--------------------+------------+----+-----+
|      Date|open_btc|high_btc| low_btc|close_btc|          volume_btc|block_reward|year|month|
+----------+--------+--------+--------+---------+--------------------+------------+----+-----+
|2025-04-21|85190.58|88358.48|85190.58| 87525.71|4.610174416478397E10|       3.125|2025|    4|
|2025-04-20|85059.77|85320.15|84018.53| 85095.42|2.363965210136934E10|       3.125|2025|    4|
|2025-04-19|84484.98|85510.04| 84368.6| 85127.56|2.200324311390941E10|       3.125|2025|    4|
|2025-04-18|84892.79|85109.28|84344.03| 84464.64|3.094039408019164E10|       3.125|2025|    4|
|2025-04-17|84081.07|85426.94|83814.62| 84931.22|4.330348236714634E10|       3.125|2025|    4|
+----------+--------+--------+--------+---------+--------------------+------------+----+-----+
only showing top 5 rows

+----------+--------+--------+--------+---------+--------------------+------------+---------+
|      Date|open_btc|high_

In [194]:
df_combined = df_combined.join(
    df_sp500,
    on = "Date",
    how = "left"
)

df_combined = df_combined.orderBy(F.col("Date").desc())

df_combined.orderBy(F.col("Date").desc()).toPandas().head(30)

Unnamed: 0,Date,open_btc,high_btc,low_btc,close_btc,volume_btc,block_reward,cpi,open_sp500,high_sp500,low_sp500,close_sp500,volume_sp500
0,2025-04-21,85190.58,88358.48,85190.58,87525.71,46101740000.0,3.125,,5232.94,5232.94,5101.63,5158.2,2964605000.0
1,2025-04-20,85059.77,85320.15,84018.53,85095.42,23639650000.0,3.125,,,,,,
2,2025-04-19,84484.98,85510.04,84368.6,85127.56,22003240000.0,3.125,,,,,,
3,2025-04-18,84892.79,85109.28,84344.03,84464.64,30940390000.0,3.125,,,,,,
4,2025-04-17,84081.07,85426.94,83814.62,84931.22,43303480000.0,3.125,,5305.45,5328.31,5255.58,5282.7,3153700000.0
5,2025-04-16,83652.14,85332.95,83163.1,84051.96,45342370000.0,3.125,,5335.75,5367.24,5220.79,5275.7,3352866000.0
6,2025-04-15,84521.75,86413.41,83632.78,83677.65,57961000000.0,3.125,,5411.99,5450.41,5386.44,5396.63,2856819000.0
7,2025-04-14,83779.1,85756.0,83773.4,84495.07,70255420000.0,3.125,,5441.96,5459.46,5358.02,5405.97,3287278000.0
8,2025-04-13,85285.14,85780.08,83171.6,83693.1,60526620000.0,3.125,,,,,,
9,2025-04-12,83359.29,85786.19,82810.94,85288.46,69354360000.0,3.125,,,,,,


In [195]:
from pyspark.sql.window import Window
window_ffill = Window.orderBy("Date").rowsBetween(Window.unboundedPreceding, 0)

for colname in ["open_sp500", "high_sp500", "low_sp500", "close_sp500", "volume_sp500"]:
    df_combined = df_combined.withColumn(
        colname,
        F.last(colname, ignorenulls=True).over(window_ffill)
    )

In [196]:
df_combined.orderBy(F.col("Date").desc()).toPandas().head(30)

Unnamed: 0,Date,open_btc,high_btc,low_btc,close_btc,volume_btc,block_reward,cpi,open_sp500,high_sp500,low_sp500,close_sp500,volume_sp500
0,2025-04-21,85190.58,88358.48,85190.58,87525.71,46101740000.0,3.125,,5232.94,5232.94,5101.63,5158.2,2964605000.0
1,2025-04-20,85059.77,85320.15,84018.53,85095.42,23639650000.0,3.125,,5305.45,5328.31,5255.58,5282.7,3153700000.0
2,2025-04-19,84484.98,85510.04,84368.6,85127.56,22003240000.0,3.125,,5305.45,5328.31,5255.58,5282.7,3153700000.0
3,2025-04-18,84892.79,85109.28,84344.03,84464.64,30940390000.0,3.125,,5305.45,5328.31,5255.58,5282.7,3153700000.0
4,2025-04-17,84081.07,85426.94,83814.62,84931.22,43303480000.0,3.125,,5305.45,5328.31,5255.58,5282.7,3153700000.0
5,2025-04-16,83652.14,85332.95,83163.1,84051.96,45342370000.0,3.125,,5335.75,5367.24,5220.79,5275.7,3352866000.0
6,2025-04-15,84521.75,86413.41,83632.78,83677.65,57961000000.0,3.125,,5411.99,5450.41,5386.44,5396.63,2856819000.0
7,2025-04-14,83779.1,85756.0,83773.4,84495.07,70255420000.0,3.125,,5441.96,5459.46,5358.02,5405.97,3287278000.0
8,2025-04-13,85285.14,85780.08,83171.6,83693.1,60526620000.0,3.125,,5255.56,5381.46,5220.77,5363.36,3769018000.0
9,2025-04-12,83359.29,85786.19,82810.94,85288.46,69354360000.0,3.125,,5255.56,5381.46,5220.77,5363.36,3769018000.0


In [197]:
df_nulls = df_combined.filter(
    F.col("cpi").isNull() )
df_nulls.show(25)

+----------+--------+--------+--------+---------+--------------------+------------+----+----------+----------+---------+-----------+-------------+
|      Date|open_btc|high_btc| low_btc|close_btc|          volume_btc|block_reward| cpi|open_sp500|high_sp500|low_sp500|close_sp500| volume_sp500|
+----------+--------+--------+--------+---------+--------------------+------------+----+----------+----------+---------+-----------+-------------+
|2025-04-01|82612.64|85417.94|82470.01| 85216.81|4.768397020478049E10|       3.125|NULL|   5597.53|   5650.57|  5558.52|    5633.07|2.806321225E9|
|2025-04-02|85170.68|87898.01| 82487.4| 82548.31|5.237611157841463...|       3.125|NULL|   5580.76|   5695.31|  5571.48|    5670.97|2.785811866E9|
|2025-04-03|82259.03| 83781.7|81307.75| 83199.95|7.766843260005576E10|       3.125|NULL|   5492.74|   5499.53|  5390.83|    5396.52|5.005321892E9|
|2025-04-04|83259.08|84676.27|81767.53| 83879.86|6.263226227224738E10|       3.125|NULL|   5292.14|   5292.14|   5069.

In [198]:
# Definir ventana para forward fill
window_ffill = Window.orderBy("Date").rowsBetween(Window.unboundedPreceding, 0)

# Rellenar valores NULL en la columna 'cpi' con el último valor no nulo anterior
df_combined = df_combined.withColumn(
    "cpi",
    F.last("cpi", ignorenulls=True).over(window_ffill)
)

# (Opcional) Verifica el resultado
df_combined.select("Date", "cpi").orderBy(F.col("Date").desc()).show(25)

+----------+---------+
|      Date|      cpi|
+----------+---------+
|2025-04-21|146.95753|
|2025-04-20|146.95753|
|2025-04-19|146.95753|
|2025-04-18|146.95753|
|2025-04-17|146.95753|
|2025-04-16|146.95753|
|2025-04-15|146.95753|
|2025-04-14|146.95753|
|2025-04-13|146.95753|
|2025-04-12|146.95753|
|2025-04-11|146.95753|
|2025-04-10|146.95753|
|2025-04-09|146.95753|
|2025-04-08|146.95753|
|2025-04-07|146.95753|
|2025-04-06|146.95753|
|2025-04-05|146.95753|
|2025-04-04|146.95753|
|2025-04-03|146.95753|
|2025-04-02|146.95753|
|2025-04-01|146.95753|
|2025-03-31|146.95753|
|2025-03-30|146.95753|
|2025-03-29|146.95753|
|2025-03-28|146.95753|
+----------+---------+
only showing top 25 rows



In [199]:
df_combined.orderBy(F.col("Date").desc()).toPandas().head(30)

Unnamed: 0,Date,open_btc,high_btc,low_btc,close_btc,volume_btc,block_reward,cpi,open_sp500,high_sp500,low_sp500,close_sp500,volume_sp500
0,2025-04-21,85190.58,88358.48,85190.58,87525.71,46101740000.0,3.125,146.95753,5232.94,5232.94,5101.63,5158.2,2964605000.0
1,2025-04-20,85059.77,85320.15,84018.53,85095.42,23639650000.0,3.125,146.95753,5305.45,5328.31,5255.58,5282.7,3153700000.0
2,2025-04-19,84484.98,85510.04,84368.6,85127.56,22003240000.0,3.125,146.95753,5305.45,5328.31,5255.58,5282.7,3153700000.0
3,2025-04-18,84892.79,85109.28,84344.03,84464.64,30940390000.0,3.125,146.95753,5305.45,5328.31,5255.58,5282.7,3153700000.0
4,2025-04-17,84081.07,85426.94,83814.62,84931.22,43303480000.0,3.125,146.95753,5305.45,5328.31,5255.58,5282.7,3153700000.0
5,2025-04-16,83652.14,85332.95,83163.1,84051.96,45342370000.0,3.125,146.95753,5335.75,5367.24,5220.79,5275.7,3352866000.0
6,2025-04-15,84521.75,86413.41,83632.78,83677.65,57961000000.0,3.125,146.95753,5411.99,5450.41,5386.44,5396.63,2856819000.0
7,2025-04-14,83779.1,85756.0,83773.4,84495.07,70255420000.0,3.125,146.95753,5441.96,5459.46,5358.02,5405.97,3287278000.0
8,2025-04-13,85285.14,85780.08,83171.6,83693.1,60526620000.0,3.125,146.95753,5255.56,5381.46,5220.77,5363.36,3769018000.0
9,2025-04-12,83359.29,85786.19,82810.94,85288.46,69354360000.0,3.125,146.95753,5255.56,5381.46,5220.77,5363.36,3769018000.0


In [200]:
df_combined.filter(
    F.col("open_sp500").isNull()
).show()

+----------+--------+--------+-------+---------+----------+------------+--------+----------+----------+---------+-----------+------------+
|      Date|open_btc|high_btc|low_btc|close_btc|volume_btc|block_reward|     cpi|open_sp500|high_sp500|low_sp500|close_sp500|volume_sp500|
+----------+--------+--------+-------+---------+----------+------------+--------+----------+----------+---------+-----------+------------+
|2010-07-17|    0.05|    0.05|   0.05|     0.05|       0.0|        50.0|100.0538|      NULL|      NULL|     NULL|       NULL|        NULL|
|2010-07-18|  0.0858|  0.0858| 0.0858|   0.0858|       0.0|        50.0|100.0538|      NULL|      NULL|     NULL|       NULL|        NULL|
+----------+--------+--------+-------+---------+----------+------------+--------+----------+----------+---------+-----------+------------+



In [206]:
df_combined = df_combined.filter(F.col("open_sp500").isNotNull())

In [207]:
df_combined.orderBy(F.col("Date").asc()).show()

+----------+--------+--------+-------+---------+----------+------------+---------+----------+----------+---------+-----------+-------------+
|      Date|open_btc|high_btc|low_btc|close_btc|volume_btc|block_reward|      cpi|open_sp500|high_sp500|low_sp500|close_sp500| volume_sp500|
+----------+--------+--------+-------+---------+----------+------------+---------+----------+----------+---------+-----------+-------------+
|2010-07-19|  0.0808|  0.0808| 0.0808|   0.0808|       0.0|        50.0| 100.0538|   1066.85|    1074.7|  1061.11|    1071.25|2.271944444E9|
|2010-07-20|  0.0747|  0.0747| 0.0747|   0.0747|       0.0|        50.0| 100.0538|   1064.53|   1083.94|  1056.88|    1083.48|2.618488889E9|
|2010-07-21|  0.0792|  0.0792| 0.0792|   0.0792|       0.0|        50.0| 100.0538|   1086.67|   1088.96|  1065.25|    1069.59|2.637322222E9|
|2010-07-22|  0.0505|  0.0505| 0.0505|   0.0505|       0.0|        50.0| 100.0538|   1072.14|    1097.5|  1072.14|    1093.67|2.681611111E9|
|2010-07-23| 

In [208]:
df_combined.toPandas().to_csv("../data/final_btc_ml_dataset.csv", index=False)