In [1]:
import os
os.environ['SPARK_HOME'] = '/mnt/c/spark'
print(os.environ.get('SPARK_HOME'))

/mnt/c/spark


In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import time

spark = SparkSession.builder.appName("prediction-models").getOrCreate()
df = spark.read.csv("preprocessed_datasets/factorized_data.csv", sep = ',', header=True)

24/12/17 14:58:06 WARN Utils: Your hostname, Laptok resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/12/17 14:58:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/17 14:58:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/17 14:58:09 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/12/17 14:58:09 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
                                                                                

In [3]:
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql.functions import col

df = df.withColumn("model_year", col("model_year").cast(IntegerType())) \
               .withColumn("milage", col("milage").cast(DoubleType())) \
               .withColumn("engine_capacity", col("engine_capacity").cast(DoubleType())) \
               .withColumn("engine_horsepower", col("engine_horsepower").cast(DoubleType())) \
               .withColumn("brand_numeric", col("brand_numeric").cast(IntegerType())) \
               .withColumn("transmission_numeric", col("transmission_numeric").cast(IntegerType())) \
               .withColumn("fuel_type_numeric", col("fuel_type_numeric").cast(IntegerType())) \
               .withColumn("ext_col_numeric", col("ext_col_numeric").cast(IntegerType())) \
               .withColumn("int_col_numeric", col("int_col_numeric").cast(IntegerType())) \
               .withColumn("accident_numeric", col("accident_numeric").cast(IntegerType())) \
                .withColumn("price", col("price").cast(IntegerType()))

In [4]:
feature_columns = ["model_year", "milage", "engine_capacity", "engine_horsepower", 
                   "brand_numeric", "transmission_numeric", 
                   "fuel_type_numeric", "ext_col_numeric", 
                   "int_col_numeric", "accident_numeric"]

In [5]:
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

In [6]:
df_vectorized = assembler.transform(df)

In [7]:
final_data = df_vectorized.select("features", "price")

In [8]:
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)

### Linear Regression

In [9]:
# Initialize the model
lr = LinearRegression(featuresCol="features", labelCol="price")

# Define the parameter grid for hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(lr.maxIter, [50, 100, 200]) \
    .addGrid(lr.regParam, [0.01, 0.1, 0.5, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .addGrid(lr.tol, [1e-6, 1e-8]) \
    .build()

# Define the evaluator to minimize RMSE
evaluator = RegressionEvaluator(
    labelCol="price",
    predictionCol="prediction",
    metricName="rmse"
)
    
# Set up CrossValidator for hyperparameter tuning
crossval = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5
)

# Perform the hyperparameter tuning
cv_model = crossval.fit(train_data)

best_model = cv_model.bestModel
print("Best Parameters:")
print(f"  MaxIter: {best_model._java_obj.getMaxIter()}")
print(f"  RegParam: {best_model._java_obj.getRegParam()}")
print(f"  ElasticNetParam: {best_model._java_obj.getElasticNetParam()}")
print(f"  Tol: {best_model._java_obj.getTol()}")

# Make predictions
predictions = best_model.transform(test_data)

# Show predictions
predictions.select("features", "price", "prediction").show()

evaluator_rmse = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
evaluator_mae = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mae")
evaluator_r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
rmse = evaluator_rmse.evaluate(predictions)
mae = evaluator_mae.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"Mean Absolute Error (MAE): {mae}")
print(f"R-squared or Coefficient of Determination (R2): {r2}")

best_model.write().overwrite().save(r"prediction_models\LR")
print("Best model Linear Regression saved")

24/12/17 14:58:23 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/12/17 14:58:24 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK

KeyboardInterrupt

                                                            

### Random Forest

In [None]:
rf = RandomForestRegressor(featuresCol="features", labelCol="price", seed=42)

# Define the parameter grid for hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .addGrid(rf.minInstancesPerNode, [1, 2, 4]) \
    .addGrid(rf.maxBins, [32, 64, 128]) \
    .addGrid(rf.subsamplingRate, [0.5, 0.7, 1.0]) \
    .addGrid(rf.featureSubsetStrategy, ['all', 'sqrt', 'log2']) \
    .addGrid(rf.maxMemoryInMB, [2048, 4096]) \
    .build()

# Define the evaluator to minimize RMSE
evaluator_rmse = RegressionEvaluator(
    labelCol="price",
    predictionCol="prediction",
    metricName="rmse"
)

# Set up CrossValidator for hyperparameter tuning
crossval = CrossValidator(
    estimator=rf,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator_rmse,
    numFolds=3  # 3-fold cross-validation
)

start_time = time.time()

# Perform the hyperparameter tuning
cv_model = crossval.fit(train_data)

end_time = time.time()
training_time = end_time - start_time

# Best model and parameters
best_model = cv_model.bestModel
print("Best Parameters:")
print(f"  NumTrees: {best_model.getNumTrees}")
print(f"  MaxDepth: {best_model.getOrDefault('maxDepth')}")
print(f"  MinInstancesPerNode: {best_model.getOrDefault('minInstancesPerNode')}")
print(f"  MaxBins: {best_model.getOrDefault('maxBins')}")
print(f"  SubsamplingRate: {best_model.getOrDefault('subsamplingRate')}")
print(f"  FeatureSubsetStrategy: {best_model.getOrDefault('featureSubsetStrategy')}")
print(f"  MaxMemoryInMB: {best_model.getOrDefault('maxMemoryInMB')}")
print(f"Training time: {training_time:.2f} seconds")

# Evaluate the best model on the test data
predictions = best_model.transform(test_data)
evaluator_mae = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mae")
evaluator_r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
rmse = evaluator_rmse.evaluate(predictions)
mae = evaluator_mae.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"Mean Absolute Error (MAE): {mae}")
print(f"R-squared or Coefficient of Determination (R2): {r2}")

# Show some predictions
predictions.select("features", "price", "prediction").show()

# Save the best model
best_model.write().overwrite().save("prediction_models/RF_best")
print("Best Radnom Forrrest model saved.")

                                                                                

### Decision Tree

In [None]:
dt = DecisionTreeRegressor(featuresCol="features", labelCol="price", seed=42)

# Define the parameter grid for hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 10, 15, 20]) \
    .addGrid(dt.maxBins, [20, 30, 40, 50]) \
    .addGrid(dt.minInstancesPerNode, [1, 2, 4]) \
    .addGrid(dt.minInfoGain, [0.0, 0.1, 0.2]) \
    .addGrid(dt.maxMemoryInMB, [512, 1024, 2048]) \
    .addGrid(dt.cacheNodeIds, [True, False]) \
    .addGrid(dt.checkpointInterval, [10, 20, 30]) \
    .build()

# Define the evaluator to minimize RMSE
evaluator = RegressionEvaluator(
    labelCol="price",
    predictionCol="prediction",
    metricName="rmse"
)

# Set up CrossValidator for hyperparameter tuning
crossval = CrossValidator(
    estimator=dt,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5  # 5-fold cross-validation
)

# Record the start time
start_time = time.time()

# Perform the hyperparameter tuning
cv_model = crossval.fit(train_data)

# Record the end time and calculate the training time
end_time = time.time()
training_time = end_time - start_time

# Best model and parameters
best_model = cv_model.bestModel
print("Best Parameters:")
print(f"  MaxDepth: {best_model.getOrDefault('maxDepth')}")
print(f"  MaxBins: {best_model.getOrDefault('maxBins')}")
print(f"  MinInstancesPerNode: {best_model.getOrDefault('minInstancesPerNode')}")
print(f"  MinInfoGain: {best_model.getOrDefault('minInfoGain')}")
print(f"  MaxMemoryInMB: {best_model.getOrDefault('maxMemoryInMB')}")
print(f"  CacheNodeIds: {best_model.getOrDefault('cacheNodeIds')}")
print(f"  CheckpointInterval: {best_model.getOrDefault('checkpointInterval')}")
print(f"Training time: {training_time:.2f} seconds")

# Evaluate the best model on the test data
predictions = best_model.transform(test_data)
evaluator_mae = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mae")
evaluator_r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
rmse = evaluator.evaluate(predictions)
mae = evaluator_mae.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"Mean Absolute Error (MAE): {mae}")
print(f"R-squared or Coefficient of Determination (R2): {r2}")

# Show some predictions
predictions.select("features", "price", "prediction").show()

# Save the best model
best_model.write().overwrite().save("prediction_models/DT_best")
print("Best Decision Tree model saved.")


### How to make a prediction on raw numbers

In [None]:
from pyspark.ml.regression import RandomForestRegressionModel

# Load a Random Forest model
model = RandomForestRegressionModel.load("prediction_models/RF_best")

In [None]:
feature_columns = ["model_year", "milage", "engine_capacity", "engine_horsepower", 
                   "brand_numeric", "transmission_numeric", 
                   "fuel_type_numeric", "ext_col_numeric", 
                   "int_col_numeric", "accident_numeric"]

new_data = spark.createDataFrame([(2007, 213000, 1.6, 150.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)], 
                                 feature_columns)
new_vector = assembler.transform(new_data)

prediction = model.transform(new_vector)
prediction.select("features", "prediction").show()


### How to make a prediction using column mappings

In [None]:
import json

# Load mappings from the JSON file
with open("column_mappings.json", "r") as json_file:
    mappings_dict = json.load(json_file)


In [None]:
def map_to_index(column_name, value, mappings):
    """
    Maps a categorical value to its corresponding index.
    :param column_name: Name of the column (e.g., "brand", "transmission").
    :param value: The raw categorical value to map (e.g., "Ford", "Automatic").
    :param mappings: Dictionary containing the mappings for all categorical columns.
    :return: The index of the value in the mapping, or -1 if not found.
    """
    if column_name in mappings:
        try:
            return mappings[column_name].index(value)
        except ValueError:
            return -1  # Return -1 if the value is not found
    else:
        raise KeyError(f"Column '{column_name}' not found in mappings.")


In [None]:
# Example raw input with categorical values
raw_input = {
    "brand": "Ford",
    "transmission": "Automatic",
    "fuel_type": "Gasoline",
    "ext_col": "Black",
    "int_col": "Black",
    "accident": "None reported"
}

# Map raw input to indices
mapped_input = {
    col: map_to_index(col, value, mappings_dict)
    for col, value in raw_input.items()
}

# Add other non-categorical values
numeric_input = {
    "model_year": 2007,
    "milage": 213000,
    "engine_capacity": 1.6,
    "engine_horsepower": 150,
}

numeric_input.update(mapped_input)

print(numeric_input)
print("Features MUST be in correct order. Model input vectors are created from them.")

In [None]:
from pyspark.sql import Row

# Define the feature columns (same as in your pipeline)
feature_columns = ["model_year", "engine_capacity", "engine_horsepower", "milage", "brand_numeric", "transmission_numeric", 
                   "fuel_type_numeric", "ext_col_numeric", "int_col_numeric", "accident_numeric"]

# Convert the mapped input to a Spark DataFrame
new_data = spark.createDataFrame([Row(**numeric_input)], schema=feature_columns)

# Transform data using the same assembler
new_vector = assembler.transform(new_data)

# Make a prediction
prediction = model.transform(new_vector)
prediction.select("features", "prediction").show()
