In [None]:
crypto_vol_project/
├── data/
│   └── dataset.csv                # provided by user
├── src/
│   ├── data_processing.py         # load & clean
│   ├── feature_engineering.py     # create features
│   ├── model_training.py          # training + hyperparam tuning
│   ├── eval_reports.py            # evaluation metrics & plots
│   └── utils.py                   # helper functions
├── deploy/
│   └── app_streamlit.py           # Streamlit app for local deployment
├── notebooks/
│   └── eda.ipynb                  # optional EDA notebook
├── requirements.txt
├── README.md
└── docs/
    ├── HLD.md
    └── LLD.md

requirements.txt

pandas
numpy
scikit-learn
xgboost
lightgbm
matplotlib
seaborn
streamlit
joblib
pmdarima
statsmodels
scipy
optuna

In [3]:
# src/utils.py
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import joblib


def load_dataset(path: str) -> pd.DataFrame:
    """Load dataset from CSV path."""
    df = pd.read_csv(path, parse_dates=True)
    return df


def save_model(model, path: str):
    joblib.dump(model, path)


def load_model(path: str):
    return joblib.load(path)


def train_test_split_time_series(X, y, test_size: float = 0.2):
    """
    Simple time-aware split: keep last `test_size` portion as test.
    Assumes data is sorted by time index.
    """
    n = len(X)
    split_idx = int(n * (1 - test_size))
    X_train, X_test = X.iloc[:split_idx], X.iloc[split_idx:]
    y_train, y_test = y.iloc[:split_idx], y.iloc[split_idx:]
    return X_train, X_test, y_train, y_test

In [None]:
# src/data_processing.py
import pandas as pd
import numpy as np
from src.utils import load_dataset


def basic_cleaning(df: pd.DataFrame) -> pd.DataFrame:
    """Basic cleaning: parse dates, sort, handle missing values."""
    # assume there's a 'timestamp' or 'date' column
    date_cols = [c for c in df.columns if 'time' in c.lower() or 'date' in c.lower()]
    if date_cols:
        df['timestamp'] = pd.to_datetime(df[date_cols[0]])
    else:
        # if no date column, try index or raise
        if not np.issubdtype(df.index.dtype, np.datetime64):
            raise ValueError('No date-like column found; ensure dataset has a timestamp/date column')
    df = df.sort_values('timestamp').reset_index(drop=True)

    # forward-fill or interpolate missing numeric data
    num_cols = df.select_dtypes(include='number').columns.tolist()
    df[num_cols] = df[num_cols].interpolate().ffill().bfill()

    return df


if __name__ == '__main__':
    df = load_dataset('/mnt/data/dataset.csv')
    df_clean = basic_cleaning(df)
    df_clean.to_csv('data/dataset_clean.csv', index=False)
    print('Saved cleaned dataset to data/dataset_clean.csv')

In [None]:
# src/feature_engineering.py
import pandas as pd
import numpy as np


def add_return_features(df: pd.DataFrame, price_col: str = 'close') -> pd.DataFrame:
    """Add log returns, simple returns, rolling vol, and lag features."""
    df = df.copy()
    # log return
    df['log_return'] = np.log(df[price_col]) - np.log(df[price_col].shift(1))
    # simple return
    df['ret_1'] = df[price_col].pct_change()

    # rolling volatility (std of returns) - target candidate
    df['rolling_vol_7'] = df['log_return'].rolling(window=7).std()
    df['rolling_vol_21'] = df['log_return'].rolling(window=21).std()

    # lags of returns
    for lag in [1,2,3,5,7]:
        df[f'logret_lag_{lag}'] = df['log_return'].shift(lag)

    # moving averages
    df['ma_7'] = df[price_col].rolling(window=7).mean()
    df['ma_21'] = df[price_col].rolling(window=21).mean()
    df['ma_ratio'] = df['ma_7'] / (df['ma_21'] + 1e-9)

    # Drop rows with NaNs due to shifting/rolling
    df = df.dropna().reset_index(drop=True)
    return df


if __name__ == '__main__':
    import sys
    path = sys.argv[1] if len(sys.argv) > 1 else 'data/dataset_clean.csv'
    df = pd.read_csv(path, parse_dates=['timestamp'])
    df_fe = add_return_features(df, price_col='close')
    df_fe.to_csv('data/dataset_fe.csv', index=False)
    print('Saved feature-engineered dataset to data/dataset_fe.csv')

In [None]:
# src/model_training.py
import pandas as pd
import numpy as np
from sklearn.model_selection import TimeSeriesSplit, RandomizedSearchCV
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
import joblib

from src.utils import train_test_split_time_series


def train_model(df_path='data/dataset_fe.csv', target='rolling_vol_7', model_out='models/rf_vol_model.joblib'):
    df = pd.read_csv(df_path, parse_dates=['timestamp'])
    features = [c for c in df.columns if c not in ['timestamp', target]]
    X = df[features]
    y = df[target]

    X_train, X_test, y_train, y_test = train_test_split_time_series(X, y, test_size=0.2)

    # baseline model
    rf = RandomForestRegressor(n_jobs=-1, random_state=42)

    # time-series friendly CV
    tscv = TimeSeriesSplit(n_splits=5)

    param_dist = {
        'n_estimators': [50, 100, 200, 400],
        'max_depth': [3, 5, 8, 12, None],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4]
    }

    rsearch = RandomizedSearchCV(rf, param_distributions=param_dist, n_iter=20,
                                 cv=tscv, scoring='neg_mean_squared_error', n_jobs=-1, random_state=42, verbose=1)
    rsearch.fit(X_train, y_train)

    best = rsearch.best_estimator_
    preds = best.predict(X_test)

    mse = mean_squared_error(y_test, preds)
    rmse = np.sqrt(mse)
    r2 = r2_score(y_test, preds)

    print('Best params:', rsearch.best_params_)
    print('Test RMSE:', rmse)
    print('Test R2:', r2)

    # save model and optionally the search
    joblib.dump(best, model_out)
    joblib.dump(rsearch, model_out.replace('.joblib', '_search.joblib'))

    # save an evaluation CSV
    pd.DataFrame({'y_true': y_test.values, 'y_pred': preds}).to_csv('models/eval_results.csv', index=False)

    return best, {'rmse': rmse, 'r2': r2}


if __name__ == '__main__':
    import os
    os.makedirs('models', exist_ok=True)
    train_model()

In [None]:
# src/eval_reports.py
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import mean_squared_error, r2_score


def plot_preds(eval_csv='models/eval_results.csv'):
    df = pd.read_csv(eval_csv)
    plt.figure(figsize=(10,4))
    plt.plot(df['y_true'].values, label='True')
    plt.plot(df['y_pred'].values, label='Pred')
    plt.legend()
    plt.title('True vs Predicted Volatility (Test set)')
    plt.savefig('models/plot_true_vs_pred.png', dpi=150)
    print('Saved plot to models/plot_true_vs_pred.png')


if __name__ == '__main__':
    plot_preds()

In [None]:
# deploy/app_streamlit.py
import streamlit as st
import pandas as pd
import numpy as np
import joblib
from src.feature_engineering import add_return_features

MODEL_PATH = 'models/rf_vol_model.joblib'

st.title('Crypto Volatility Predictor')
st.markdown('Upload CSV (ohclv) — columns should include timestamp and close price')

uploaded = st.file_uploader('Choose a CSV', type=['csv'])
if uploaded is not None:
    df = pd.read_csv(uploaded, parse_dates=True)
    st.write('Raw preview:')
    st.dataframe(df.head())

    # basic FE
    try:
        df_fe = add_return_features(df, price_col='close')
    except Exception as e:
        st.error(f'Feature engineering failed: {e}')
    else:
        st.write('Feature preview:')
        st.dataframe(df_fe.head())

        # load model
        model = joblib.load(MODEL_PATH)
        features = [c for c in df_fe.columns if c not in ['timestamp', 'rolling_vol_7', 'rolling_vol_21']]
        X = df_fe[features]
        preds = model.predict(X)
        df_fe['pred_vol'] = preds

        st.line_chart(df_fe.set_index('timestamp')[['rolling_vol_7', 'pred_vol']].tail(200))
        st.download_button('Download predictions CSV', df_fe.to_csv(index=False), file_name='predi.csv')

In [None]:
pip install -r requirements.txt
streamlit run deploy/app_streamlit.py

In [None]:
ip install -r requirements.txt
streamlit run deploy/app_streamlit.py

Final Report (guidance)

Include: dataset description, key EDA plots (trend, distribution, correlation heatmap), description of features, model selection rationale, hyperparameter tuning results, final metrics table, top failure cases and limitations, and suggestions for next steps (ensemble, volatility regimes, more exogenous features like orderbook or sentiment).

How to run the pipeline quickly

In [None]:
#1.Clean data:
python src/data_processing.py

In [None]:
#2Feature engineering:

python src/featu
