In [None]:
!pip install synapseml

Collecting synapseml
  Downloading synapseml-1.0.11-py2.py3-none-any.whl.metadata (774 bytes)
Downloading synapseml-1.0.11-py2.py3-none-any.whl (584 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/584.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m102.4/584.7 kB[0m [31m2.8 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m583.7/584.7 kB[0m [31m9.1 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m584.7/584.7 kB[0m [31m7.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: synapseml
Successfully installed synapseml-1.0.11


In [None]:
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator
from synapse.ml.lightgbm import LightGBMRegressor
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from synapse.ml.lightgbm import LightGBMRegressor
import pandas as pd
import numpy as np
from sklearn.feature_selection import SelectKBest, f_regression
from pyspark.sql.functions import col

# Initialize spark session :

spark = SparkSession.builder \
    .appName("Random forest regressor") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.11.2")\
    .getOrCreate()

print(spark.version)


# Load data
def load_data(file_path):
    return spark.read.csv(file_path, header=True, inferSchema=True)

data = load_data("data_preprocessed_V3.csv")


# Define LightGBM regressor
lgbm = LightGBMRegressor(
    labelCol="price",
    featuresCol="features",
    objective="regression",
    verbosity=-1,
    boostingType="gbdt"
)


# Step 1: Feature Selection using SelectKBest
def select_features(data, target_col, k=10):
    # Convert to Pandas for feature selection
    pandas_df = data.toPandas()
    X = pandas_df.drop(target_col, axis=1)
    y = pandas_df[target_col]

    # Apply SelectKBest
    selector = SelectKBest(score_func=f_regression, k=k)
    selector.fit(X, y)

    # Get selected feature names
    selected_features = X.columns[selector.get_support()].tolist()
    return selected_features

target_col = 'price'
selected_features = select_features(data, target_col, k=10)

# Step 2: Split data into train and test (before assembling features)
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Step 3: Define VectorAssembler for selected features
assembler = VectorAssembler(inputCols=selected_features, outputCol="features")


# Define the pipeline (replace with your actual pipeline if needed)
pipeline = Pipeline(stages=[assembler, lgbm])  

# Build the param grid
paramGrid = ParamGridBuilder() \
    .addGrid(lgbm.learningRate, [0.01, 0.05, 0.1]) \
    .addGrid(lgbm.numLeaves, [15, 31, 63]) \
    .addGrid(lgbm.featureFraction, [0.8, 1.0]) \
    .addGrid(lgbm.maxDepth, [5, 10, -1]) \
    .addGrid(lgbm.lambdaL1, [0.0, 0.1]) \
    .addGrid(lgbm.lambdaL2, [0.0, 1.0]) \
    .addGrid(lgbm.baggingFraction, [0.7, 1.0]) \
    .addGrid(lgbm.baggingFreq, [0, 5]) \
    .build()

# Define evaluator
evaluator = RegressionEvaluator(
    labelCol="price",
    predictionCol="prediction"
)

# Use TrainValidationSplit instead of CrossValidator
tvs = TrainValidationSplit(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    trainRatio=0.8,
    parallelism=4,
    seed=42
)

# Train the model
tvsModel = tvs.fit(train_data)


3.5.1


In [None]:
# 11. Evaluate on test data
test_predictions = tvsModel.transform(test_data)
rmse_test = evaluator.evaluate(test_predictions)
mae_test = evaluator.evaluate(test_predictions, {evaluator.metricName: "mae"})
r2_test = evaluator.evaluate(test_predictions, {evaluator.metricName: "r2"})

print(f"Test data: {{'rmse': {rmse_test}, 'mae': {mae_test}, 'r2': {r2_test}}}")

# evaluate on train data :
train_predictions = tvsModel.transform(train_data)
rmse_train = evaluator.evaluate(train_predictions)
mae_train = evaluator.evaluate(train_predictions, {evaluator.metricName: "mae"})
r2_train = evaluator.evaluate(train_predictions, {evaluator.metricName: "r2"})

print(f"Train data: {{'rmse': {rmse_train}, 'mae': {mae_train}, 'r2': {r2_train}}}")

Test data: {'rmse': 72112.70273157304, 'mae': 40468.73687482394, 'r2': 0.6310948468295978}
Train data: {'rmse': 65192.40440527915, 'mae': 37111.00446411725, 'r2': 0.7053967781604382}


In [None]:
spark.stop()