In [1]:
import pyspark
import kagglehub
from pyspark.sql import SparkSession
import pyspark.sql.functions as F  #  to_date, log, count, col
from pyspark.sql.window import Window
from pyspark.rdd import RDD
from collections import deque
import numpy as np
import mlflow
import mlflow.keras
import pandas as pd
import tensorflow as tf
import os

2025-09-26 15:38:13.242740: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.
2025-09-26 15:38:13.242968: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-09-26 15:38:13.272537: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI AVX512_BF16 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2025-09-26 15:38:14.089128: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation or

In [2]:
DATE_COL: str = "date_col"
RATIO: str = "ratio"
MIN_SAMPLE_SIZE: int = 1000
TRAIN_RATIO: float = 0.7  # how much of the data is for training
VALID_RATIO: float = 0.15  # how much is for validation
SEQUENCE_LENGTH: int = 21  # the last trade days are in all sequences
BATCH_SIZE: int = 32  # Batch size
STOCK_NAME: str = "stock_name"  # Stock column name

# PySpark & MLflow Demo Notebook Checklist

## Setup
- [x] Install PySpark and MLflow in the environment
- [x] Import necessary Python libraries (pandas, numpy, matplotlib/seaborn)
- [x] Initialize Spark session

## Data Loading
- [x] Load sample dataset (CSV, Parquet, or Delta Lake)
- [x] Inspect dataset: `.show()`, `.describe()`, `.printSchema()`
- [x] Handle missing values and data types

## Data Preprocessing
- [x] Select relevant features for modeling
- [ ] Perform transformations (scaling, normalization, encoding)
- [ ] Split dataset into training and test sets

## PySpark Feature Engineering
- [ ] Create PySpark DataFrame from preprocessed data
- [ ] Apply transformations using PySpark (e.g., `VectorAssembler`, `StandardScaler`)
- [ ] Generate new features if needed

## Model Training
- [ ] Train a simple ML model (e.g., Linear Regression, Random Forest) in PySpark or Scikit-learn
- [ ] Evaluate model performance (RMSE, R², accuracy)
- [ ] Log model metrics to MLflow

## MLflow Tracking
- [ ] Start MLflow run (`mlflow.start_run()`)
- [ ] Log parameters (learning rate, max depth, etc.)
- [ ] Log metrics (RMSE, accuracy, etc.)
- [ ] Log trained model (`mlflow.sklearn.log_model` or `mlflow.spark.log_model`)
- [ ] End MLflow run

## MLflow Model Registry (Optional)
- [ ] Register model in MLflow Model Registry
- [ ] Promote model to `Staging` or `Production` stage
- [ ] Load registered model for inference

## Deployment & Inference
- [ ] Perform predictions on test set using MLflow model
- [ ] Compare predictions vs. actual values
- [ ] (Optional) Save predictions to a file or Delta Table

## Visualization & Reporting
- [ ] Plot feature importance or coefficients
- [ ] Plot predicted vs. actual values
- [ ] (Optional) Create a simple dashboard in Notebook

## Cleanup
- [ ] Stop Spark session
- [ ] Close MLflow run/session if not already done

In [3]:
spark = (
    SparkSession.builder
    .appName("Datacamp Pyspark Tutorial")
    .config("spark.memory.offHeap.enabled", "true")
    .config("spark.memory.offHeap.size", "10g")
    .getOrCreate()
)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/26 15:38:15 WARN Utils: Your hostname, laptop, resolves to a loopback address: 127.0.1.1; using 192.168.178.43 instead (on interface wlp2s0)
25/09/26 15:38:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/26 15:38:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# getting some test data
path: str = kagglehub.dataset_download("farukece/semiconductor-stocks-and-the-ai-surge")
file_path: str = os.path.join(path, os.listdir(path)[0])
assert os.path.isfile(file_path)

In [5]:
df = spark.read.option("multiline", "true").csv(file_path, header=True, inferSchema=True)  # ignoring the header warning, this is a Kaggle data set which is a bit off
df = df.withColumn("date", F.col("date").cast("string"))
df = df.withColumn(DATE_COL, F.to_date(F.col('date'))).filter(F.col(DATE_COL).isNotNull())

                                                                                

In [6]:
stock_counts = df.groupBy('stock_name').agg(F.count('*').alias('count')).filter(F.col('count') > MIN_SAMPLE_SIZE)
df_filtered = df.join(stock_counts, on="stock_name", how="inner")

In [7]:
df_filtered = df_filtered.withColumn(DATE_COL, F.to_date(F.col('date'))).filter(F.col(DATE_COL).isNotNull())
df_filtered = df_filtered.withColumn(RATIO, F.log(F.col('close')) - F.log(F.col('open')))
stock_names = df_filtered.select(STOCK_NAME).distinct()

In [8]:
def sequence_generator(rdd: RDD, seq_length: int = SEQUENCE_LENGTH,
                       batch_size: int = BATCH_SIZE):
    buffer = deque(maxlen=seq_length + 1)
    X_batch, y_batch = [], []
    
    for row in rdd.toLocalIterator():
        val = row[0]
        buffer.append(val)
        if len(buffer) == seq_length + 1:
            X = np.array(list(buffer)[:-1]).reshape(seq_length, 1)
            y = np.array([buffer[-1]])
            X_batch.append(X)
            y_batch.append(y)
            if len(X_batch) == batch_size:
                yield X_batch, y_batch
                X_batch, y_batch = [], []

    if X_batch:
        yield X_batch, y_batch

In [9]:
def to_tf_dataset(generator, seq_length: int = SEQUENCE_LENGTH, batch_size: int = BATCH_SIZE):
    dataset = tf.data.Dataset.from_generator(
        generator,
        output_signature=(
            tf.TensorSpec(shape=(None, seq_length, 1), dtype=tf.float32),
            tf.TensorSpec(shape=(None, 1), dtype=tf.float32)
        )
    )
    return dataset

In [10]:
w = Window.partitionBy(STOCK_NAME).orderBy(F.col(DATE_COL))

for stock_name in stock_names.rdd.toLocalIterator():
    stock = stock_name[STOCK_NAME]
    full_data = df_filtered.where(F.col(STOCK_NAME) == stock) \
        .orderBy(F.col(DATE_COL)) \
        .withColumn("row_num", F.row_number().over(w)) \
        .select(F.col(RATIO), F.col("row_num"))
    n_samples = full_data.count()
    n_train = int(TRAIN_RATIO * n_samples)
    n_valid = int(VALID_RATIO * n_samples)
    train_data = full_data.where(F.col("row_num") <= n_train).select(RATIO).rdd
    val_data = full_data.where((F.col("row_num") > n_train) & (F.col("row_num") <= n_train + n_valid)).rdd
    test_data = full_data.where(F.col("row_num") > n_train + n_valid).rdd
    train_dataset = to_tf_dataset(lambda: sequence_generator(train_data))
    val_dataset = to_tf_dataset(lambda: sequence_generator(val_data))
    test_dataset = to_tf_dataset(lambda: sequence_generator(test_data))
    # todo create MinMaxScaler
    model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=(SEQUENCE_LENGTH, 1)),
        tf.keras.layers.LSTM(4),
        tf.keras.layers.Dense(1)
    ])
    model.compile(optimizer="adam", loss="mse")
    early_stop = tf.keras.callbacks.EarlyStopping(
        monitor="val_loss",
        patience=3,
        restore_best_weights=True
    )
    model.fit(train_dataset,
              validation_data=val_dataset,
              validation_steps=None,
              callbacks=[early_stop])
    break

25/09/26 15:38:20 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , company_name, stock_name, date, open, high, low, close, volume
 Schema: _c0, company_name, stock_name, date, open, high, low, close, volume
Expected: _c0 but found: 
CSV file: file:///home/dierck/.cache/kagglehub/datasets/farukece/semiconductor-stocks-and-the-ai-surge/versions/1/semi_conductor_se.csv
25/09/26 15:38:20 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , company_name, stock_name, date, open, high, low, close, volume
 Schema: _c0, company_name, stock_name, date, open, high, low, close, volume
Expected: _c0 but found: 
CSV file: file:///home/dierck/.cache/kagglehub/datasets/farukece/semiconductor-stocks-and-the-ai-surge/versions/1/semi_conductor_se.csv
25/09/26 15:38:21 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , company_name, stock_name, date, open, high, low, close, volume
 Schema: _c0, company_name, stock_name, date, op

[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 10ms/step - loss: 0.0012 - val_loss: 9.7682e-04


2025-09-26 15:38:24.316673: I tensorflow/core/framework/local_rendezvous.cc:407] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence
	 [[{{node IteratorGetNext}}]]
2025-09-26 15:38:24.483233: I tensorflow/core/framework/local_rendezvous.cc:407] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence
	 [[{{node IteratorGetNext}}]]
