# LightGBM Training with PySpark & GPU

This notebook trains a LightGBM model on the M5 Sales data using:
- **PySpark**: For efficient data loading and memory management.
- **LightGBM**: For gradient boosting.
- **GPU (RTX 3050)**: To accelerate training and prevent system crashes.

**NOTE**: If you encounter `ConnectionRefusedError`, please **Restart the Kernel** to clear the previous crashed Spark session.

In [1]:
import os
import gc
import pandas as pd
import numpy as np
import lightgbm as lgb
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# 1. Initialize Spark Session with Memory Limits
# Disabling Arrow optimization ('spark.sql.execution.arrow.pyspark.enabled': 'false') 
# because it caused JVM crashes/memory leaks on this specific setup.
# Reduced memory to 2g to be safer on laptops.
spark = SparkSession.builder \
    .appName("M5_LGBM_Training") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/14 13:52:27 WARN Utils: Your hostname, parrot, resolves to a loopback address: 127.0.1.1; using 192.168.1.12 instead (on interface wlo1)
25/12/14 13:52:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/14 13:52:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Version: 4.0.1


## 2. Load Data from Parquet
Loading the optimized dataset created in the ETL step.

In [2]:
# Path to the optimized parquet file
# Adjust this path if 'final_optimized.parquet' is in a different subdirectory
DATA_PATH = "ETL Process/final_optimized.parquet"
MODEL_DIR = "models/LightGBM"
os.makedirs(MODEL_DIR, exist_ok=True)

# Read Parquet file
df_spark = spark.read.parquet(DATA_PATH)

print(f"Total Rows: {df_spark.count():,}")
df_spark.printSchema()

Total Rows: 58,327,370
root
 |-- sell_price: float (nullable = true)
 |-- wm_yr_wk: integer (nullable = true)
 |-- sales: integer (nullable = true)
 |-- wday: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- snap_CA: integer (nullable = true)
 |-- snap_TX: integer (nullable = true)
 |-- snap_WI: integer (nullable = true)
 |-- store_id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- d: string (nullable = true)
 |-- id: string (nullable = true)
 |-- dept_id: string (nullable = true)
 |-- cat_id: string (nullable = true)
 |-- state_id: string (nullable = true)
 |-- event_name_1: string (nullable = true)
 |-- event_type_1: string (nullable = true)



## 3. Data Preparation
Converting from Spark DataFrame to Pandas for LightGBM. 
**Memory Optimization**: We explicitly cast to suitable types to save RAM.

In [3]:
# Define features and target
# Assuming 'sales' is the target
TARGET = 'sales'

# Convert to Pandas (Standard conversion, safer than Arrow for large data on constrained RAM)
print("Converting to Pandas (this might take a minute)...")
df = df_spark.toPandas()

# Optimize types to save memory
for col in df.columns:
    if df[col].dtype == 'float64':
        df[col] = df[col].astype('float32')
    if df[col].dtype == 'int64':
        # Check range to see if int32/int16 suffices
        if df[col].max() < 2147483647:
            df[col] = df[col].astype('int32')

print("Data loaded into Pandas via Spark.")
print(df.info())

# Free up Spark memory
spark.stop()

Converting to Pandas (this might take a minute)...


[Stage 4:==>                                                      (1 + 19) / 20]



25/12/14 13:52:34 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 22)
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$2(SparkPlan.scala:382)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$2$adapted(SparkPlan.scala:382)
	at org.apache.spark.sql.execution.SparkPlan$$Lambda$3817/0x00007fbe31044000.apply(Unknown Source)
	at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
	at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
	at java.base/java.io.DataOutputStream.write(DataOutput

ConnectionRefusedError: [Errno 111] Connection refused

## 4. LightGBM Training (GPU Optimized)
Configuring LGBM parameters for RTX 3050.

In [None]:
# Prepare Dataset
X = df.drop(columns=[TARGET])
y = df[TARGET]

# Identify categorical features automatically or manually
# Common M5 columns: item_id, dept_id, cat_id, store_id, state_id
cat_feats = [c for c in X.columns if c in [
    'item_id', 'dept_id', 'cat_id', 'store_id', 'state_id', 
    'year', 'month', 'wday', 'event_name_1', 'event_type_1', 
    'event_name_2', 'event_type_2'
]]

print(f"Categorical Features: {cat_feats}")

# Convert object columns to category type for LGBM
for c in cat_feats:
    if c in X.columns:
        X[c] = X[c].astype('category')

train_data = lgb.Dataset(X, label=y, categorical_feature=cat_feats)

# GPU Configuration
params = {
    'objective': 'tweedie',
    'metric': 'rmse',
    'boosting_type': 'gbdt',
    'learning_rate': 0.05,
    'num_leaves': 63,
    'feature_fraction': 0.8,
    'bagging_fraction': 0.8,
    'bagging_freq': 5,
    'verbose': 1,
    'n_jobs': -1,
    'seed': 42,
    
    # GPU Parameters for RTX 3050
    'device': 'gpu',
    'gpu_platform_id': 0,
    'gpu_device_id': 0,
    # 'gpu_use_dp': False, # Use double precision only if needed (default false is good for speed)
    'force_col_wise': True # Optimized for column-wise parallelism
}

print("Starting training with GPU...")
model = lgb.train(
    params,
    train_data,
    num_boost_round=1000,
    valid_sets=[train_data],
    valid_names=['train'],
    callbacks=[
        lgb.early_stopping(stopping_rounds=50),
        lgb.log_evaluation(period=50)
    ]
)

print("Training completed!")

## 5. Save Model and Feature Importance

In [None]:
# Save model
model_path = os.path.join(MODEL_DIR, 'lgb_gpu_model.txt')
model.save_model(model_path)
print(f"Model saved to {model_path}")

# Feature Importance
importance = pd.DataFrame({
    'Feature': model.feature_name(),
    'Importance': model.feature_importance(importance_type='gain')
}).sort_values(by='Importance', ascending=False)

print("\nTop 10 Features:")
print(importance.head(10))