In [0]:
"""
Bitcoin Price Forecasting with ARIMA  
Spring 2025 - DATA605 Final Project 
Author: Ritik | UMD | Databricks CLI Automation
"""
# Config
DBFS_DATA_PATH = "dbfs:/bitcoin/bitcoin_price.json"
DBFS_OUTPUT_PATH = "dbfs:/bitcoin/forecast_output.csv"
DBFS_OUTPUT_PATH_METRICS = "dbfs:/bitcoin/metrics.json"
MODEL_ORDER = (1, 1, 1)
ARIMA_ORDER_CANDIDATES = [(1, 1, 1), (2, 1, 2), (0, 1, 1)]
# Libraries
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import json
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from statsmodels.tsa.arima.model import ARIMA
import logging
from pyspark.sql.types import StructType, StructField, TimestampType, DoubleType
from pyspark.sql.functions import from_json, col
from sklearn.metrics import mean_absolute_error, mean_squared_error

# Logger setup
logging.basicConfig()
logger = logging.getLogger("bitcoin_forecast")
logger.setLevel(logging.INFO)
logger.info(" Libraries loaded and config set")

INFO:bitcoin_forecast: Libraries loaded and config set


In [0]:
def load_data(dbfs_path):
    """Load and validate raw data from DBFS"""
    try:
        raw_df = spark.read.text(dbfs_path)
        if raw_df.isEmpty():
            raise ValueError(f"No data found at {dbfs_path}")
        
        logger.info(f" Loaded {raw_df.count()} rows from DBFS")
        return raw_df
    except Exception as e:
        logger.error(f" Data loading failed: {str(e)}")
        raise

# Execute
raw_df = load_data(DBFS_DATA_PATH)
raw_df.show(5, truncate=False)



INFO:bitcoin_forecast: Loaded 1526 rows from DBFS


+-----------------------------------------------------------+
|value                                                      |
+-----------------------------------------------------------+
|{"timestamp": "2025-03-30T04:25:56.663227", "price": 83060}|
|{"timestamp": "2025-03-30T05:39:57.371254", "price": 83122}|
|{"timestamp": "2025-03-30T05:40:58.257615", "price": 83112}|
|{"timestamp": "2025-03-30T05:41:46.834078", "price": 83112}|
|{"timestamp": "2025-03-30T05:41:49.688149", "price": 83112}|
+-----------------------------------------------------------+
only showing top 5 rows



In [0]:


def clean_data(raw_df):

    try:
        
        schema = StructType([
            StructField("timestamp", TimestampType(), True),
            StructField("price",     DoubleType(),    True),
        ])
        parsed = (
            raw_df
            .select(from_json(col("value"), schema).alias("j"))
            .select("j.*")
            .na.drop(subset=["timestamp", "price"])
        )

        
        pdf = parsed.toPandas()
        if pdf.empty:
            raise ValueError("No valid JSON rows after parsing.")

        pdf["timestamp"] = (
            pd.to_datetime(pdf["timestamp"], utc=True)
              .dt.tz_convert(None)
        )

        
        pdf = (
            pdf.dropna(subset=["timestamp", "price"])
               .sort_values("timestamp")
               .drop_duplicates("timestamp", keep="last")
               .set_index("timestamp")
        )

        
        pdf = pdf[["price"]].resample("D").ffill()

        
        pdf = pdf.dropna(subset=["price"])

        
        pdf["price_change_pct"] = pdf["price"].pct_change()
        pdf["price_ma_5"]       = pdf["price"].rolling(5).mean()

        
        assert not pdf.empty, "No data after cleaning"
        assert pdf.index.is_unique, "Duplicate timestamps remain"
        
        freq = pd.tseries.frequencies.to_offset("D")
        assert pdf.index.freq == freq, f"Index not daily (got {pdf.index.freq})"
        assert pdf["price"].notna().all(), "Missing price values after cleaning"

        return pdf

    except Exception as e:
        logger.error(f"clean_data failed: {e}", exc_info=True)
        raise


pdf = clean_data(raw_df)
print("Index frequency now:", pdf.index.freq)
pdf.tail()

Index frequency now: <Day>


Unnamed: 0_level_0,price,price_change_pct,price_ma_5
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2025-05-13,102854.0,-0.00898,103215.2
2025-05-14,104236.0,0.013437,103473.0
2025-05-15,103501.0,-0.007051,103607.6
2025-05-16,103554.0,0.000512,103586.2
2025-05-17,103673.0,0.001149,103563.6


In [0]:
def plot_historical(data: pd.DataFrame):
    """Plot price history with moving average and clean date ticks."""
    plt.figure(figsize=(14, 6))
    plt.plot(data.index, data['price'],
             label="Price (USD)", color="#F7931A")
    plt.plot(data.index, data['price_ma_5'],
             label="5-Point MA", linestyle="--", color="#0D2D6C")
    
    plt.title("₿ Bitcoin Price Analysis", pad=20)
    plt.xlabel("Timestamp", labelpad=10)
    plt.ylabel("USD", labelpad=10)
    plt.grid(True, alpha=0.3)
    plt.legend()
    plt.figtext(0.15, 0.85, "BTC",
                fontsize=120, alpha=0.1, color="#F7931A", ha='center')
    
    # ——— clean up the x-axis dates ———
    ax = plt.gca()
    ax.xaxis.set_major_locator(mdates.AutoDateLocator())
    ax.xaxis.set_major_formatter(
        mdates.ConciseDateFormatter(ax.xaxis.get_major_locator())
    )
    plt.xticks(rotation=45, ha='right')
    # ——————————————
    
    plt.tight_layout()
    plt.show()

# Execute
plot_historical(pdf)



In [0]:

def train_arima(data: pd.Series):
    """ARIMA training with manual order selection"""
    try:
        #model = ARIMA(data, order=MODEL_ORDER).fit()
        freq = pd.infer_freq(data.index)
        if freq is not None:
            data = data.copy()
            data.index = pd.DatetimeIndex(data.index, freq=freq)
        orders = ARIMA_ORDER_CANDIDATES
        best_model = None
        best_order = None
        best_aic = np.inf

        for order in orders:
            try:
                #trial_model = ARIMA(data, order=order).fit()
                trial_model = ARIMA(data, order=order, trend='t',enforce_stationarity=False,enforce_invertibility=False).fit()

                trial_aic = trial_model.aic
                logger.info(f" Order {order} AIC: {trial_aic:.1f}")
                if trial_aic < best_aic:
                    best_model = trial_model
                    best_order = order
                    best_aic = trial_aic
            except Exception as e:
                logger.warning(f" ARIMA{order} failed: {e}")

        if best_model is None:
            raise RuntimeError("No valid ARIMA model could be fit.")

        logger.info(f" Selected ARIMA{best_order} | AIC: {best_aic:.1f}")
        return best_model, best_order
    except Exception as e:
        logger.error(f"ARIMA failed: {e}")
        raise RuntimeError("ARIMA model training failed. Check input data.")

model_fit, best_order = train_arima(pdf['price'])
print("Parameters:", model_fit.params)
names = model_fit.param_names
trend_name = [n for n in names
              if not n.startswith(('ar.','ma.','sigma2'))][0]
print("Drift per day:", model_fit.params[trend_name])
model_fit.plot_diagnostics(figsize=(10, 6))
plt.tight_layout()
plt.show()
forecast_results = model_fit.get_forecast(steps=30)
forecast = forecast_results.predicted_mean
conf_int = forecast_results.conf_int()



In [0]:
def plot_forecast(history: pd.DataFrame,
                  forecast: pd.Series,
                  conf_int: pd.DataFrame | None = None):
    """Plot historical data + forecast with matching frequency."""
    freq = pd.infer_freq(history.index)
    offset = pd.Timedelta(1, unit=freq)
    future_index = pd.date_range(
        start=history.index[-1] + offset,
        periods=len(forecast),
        freq=freq
    )
    
    plt.figure(figsize=(12, 5))
    plt.plot(history.index, history['price'],
             label='Historical', color="#0D2D6C")
    plt.plot(future_index, forecast,
             label='Forecast', linestyle="--", color="#F7931A")
    
    if conf_int is not None:
        plt.fill_between(future_index,
                         conf_int.iloc[:, 0],
                         conf_int.iloc[:, 1],
                         alpha=0.1,
                         label='95% CI')
    
    plt.title('🔮 Bitcoin Price Forecast (Next 30 Intervals)', pad=15)
    plt.xlabel('Time')
    plt.ylabel('USD')
    plt.grid(True, alpha=0.3)
    
    # cleaner date formatting
    ax = plt.gca()
    ax.xaxis.set_major_locator(mdates.AutoDateLocator())
    ax.xaxis.set_major_formatter(
        mdates.ConciseDateFormatter(ax.xaxis.get_major_locator())
    )
    plt.xticks(rotation=45)
    
    plt.legend()
    plt.tight_layout()
    plt.show()

# Execute
plot_forecast(pdf, forecast, forecast_results.conf_int())


plt.show()


In [0]:
def save_results(forecast: pd.Series, history: pd.DataFrame, model_order):
    """Save forecast to DBFS with metadata"""
    try:
        dbutils.fs.mkdirs("dbfs:/bitcoin")
        future_index = pd.date_range(
            start=history.index[-1] + pd.Timedelta(minutes=15),
            periods=len(forecast),
            freq='15T'
        )
        fc_res    = model_fit.get_forecast(steps=len(forecast))
        fc_mean   = fc_res.predicted_mean
        fc_ci     = fc_res.conf_int(alpha=0.05)

        # 2) build a DataFrame with mean and bounds
        forecast_df = pd.DataFrame({
            'timestamp'      : future_index,
            'forecast_price' : fc_mean.values,
            'lower_ci'       : fc_ci.iloc[:, 0].values,
            'upper_ci'       : fc_ci.iloc[:, 1].values,
            'model'          : str(model_order),
            'last_trained'   : pd.Timestamp.now().isoformat()
        })

        y_true = history['price'].iloc[-len(fc_mean):].values  
        y_pred = fc_mean.values

        metrics = {
            "MAE":  mean_absolute_error(y_true, y_pred),
            "RMSE": mean_squared_error(y_true, y_pred, squared=False),
            "MAPE (%)": (abs((y_true - y_pred) / y_true) * 100).mean()
        }
        
        output_path = DBFS_OUTPUT_PATH.replace("dbfs:/", "/dbfs/")
        forecast_df.to_csv(output_path, index=False)
        logger.info(f" Saved forecast to {output_path}")
        
        metrics_path = DBFS_OUTPUT_PATH_METRICS.replace("dbfs:/", "/dbfs/")
        with open(metrics_path, "w") as f:
            json.dump(metrics, f)
        logger.info(f" Saved metrics to {metrics_path}")
        
        # Verification
        assert len(dbutils.fs.ls("dbfs:/bitcoin/")) > 0, "DBFS write failed"
        logger.info(" DBFS write verified")
    except Exception as e:
        logger.error(f" Failed to save results: {str(e)}")
        raise

# Execute
save_results(forecast, pdf, best_order)



INFO:bitcoin_forecast: Saved forecast to /dbfs/bitcoin/forecast_output.csv
INFO:bitcoin_forecast: Saved metrics to /dbfs/bitcoin/metrics.json
INFO:bitcoin_forecast: DBFS write verified


In [0]:
# Validation
assert not pdf.empty, "DataFrame should not be empty"
assert pdf.index.is_unique, "Timestamps contain duplicates"
assert pdf['price'].isna().sum() == 0, "Missing price values detected"
logger.info(" All data validation tests passed")

# Show output
display(dbutils.fs.ls("dbfs:/bitcoin/"))


INFO:bitcoin_forecast: All data validation tests passed


path,name,size,modificationTime
dbfs:/bitcoin/bitcoin_price.json,bitcoin_price.json,93286,1747443823000
dbfs:/bitcoin/forecast_output.csv,forecast_output.csv,3499,1747444274000
dbfs:/bitcoin/metrics.json,metrics.json,86,1747444275000
