# Bringing data to explotation zone for the training of a Linear Regression

## Data Loading

In [7]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import UserDefinedFunction, col, mean, log1p, UserDefinedFunction, explode, rand, when, lit, split, explode
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, MinMaxScaler
import matplotlib.pyplot as plt
import pandas as pd
from pyspark.ml import Pipeline
from pyspark.sql.types import FloatType
import os

"""
Spark session variables declaration and spark initialization
"""

path = os.getcwd()

conf = SparkConf() \
    .setAppName("PostgreSQL Writing to Formatted Table") \
    .set("spark.jars", path+"/../.."+"/driver/postgresql-42.7.3.jar")

spark = SparkSession.builder \
    .config(conf=conf) \
    .getOrCreate()


"""
Connection details for table from formatted zone read
"""

jdbc_url = "jdbc:postgresql://localhost:5432/bda_project1_db"
driver_class = "org.postgresql.Driver"
user = "postgres"
password = "hola123"
connectionProperties = {"user": "postgres", "password": "hola123"}


df_caract = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/bda_project1_db") \
    .option("dbtable", "dog_caract_trusted") \
    .option("user", connectionProperties["user"]) \
    .option("password", connectionProperties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .load()

df_caract = df_caract.withColumnRenamed("Breed", "breed")

df_caract.show()

+-------+-----------+--------+------+------------------+--------------------+-------------------+--------+--------------------+-----------+--------------+--------+------------+-----------------+---------------+-------------------+------------------------------+--------------------------------+
|barking|coat_length|drooling|energy|good_with_children|good_with_other_dogs|good_with_strangers|grooming|               breed|playfulness|protectiveness|shedding|trainability|avg_height_female|avg_height_male|avg_life_expectancy|avg_weight_male_log_normalized|avg_weight_female_log_normalized|
+-------+-----------+--------+------+------------------+--------------------+-------------------+--------+--------------------+-----------+--------------+--------+------------+-----------------+---------------+-------------------+------------------------------+--------------------------------+
|      1|          1|       0|     0|                 3|                   3|                  0|       0|    Ameri

In [8]:
df_intel = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/bda_project1_db") \
    .option("dbtable", "dog_intelligence_trusted") \
    .option("user", connectionProperties["user"]) \
    .option("password", connectionProperties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .load()

df_intel.show()

+--------------------+--------------------+----+--------+
|               breed|      classification|obey|avg_reps|
+--------------------+--------------------+----+--------+
|       Border Collie|      Brightest Dogs|  95|     2.5|
|              Poodle|      Brightest Dogs|  95|     2.5|
|     German Shepherd|      Brightest Dogs|  95|     2.5|
|    Golden Retriever|      Brightest Dogs|  95|     2.5|
|   Doberman Pinscher|      Brightest Dogs|  95|     2.5|
|   Shetland Sheepdog|      Brightest Dogs|  95|     2.5|
|  Labrador Retriever|      Brightest Dogs|  95|     2.5|
|            Papillon|      Brightest Dogs|  95|     2.5|
|          Rottweiler|      Brightest Dogs|  95|     2.5|
|Australian Cattle...|      Brightest Dogs|  95|     2.5|
|Pembroke Welsh Corgi|Excellent Working...|  85|    10.0|
| Miniature Schnauzer|Excellent Working...|  85|    10.0|
|English Springer ...|Excellent Working...|  85|    10.0|
|Belgian Shepherd ...|Excellent Working...|  85|    10.0|
|          Sch

In [3]:
df_caract2 = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "dog_caract2_trusted") \
    .option("user", connectionProperties["user"]) \
    .option("password", connectionProperties["password"]) \
    .option("driver", driver_class) \
    .load()

df_caract2.show()

+--------------------+------------+---+--------------------+--------------------+
|            bred_for| breed_group| id|               Breed|         temperament|
+--------------------+------------+---+--------------------+--------------------+
|Small rodent hunt...|         Toy|  1|       Affenpinscher|Stubborn, Curious...|
|Coursing and hunting|       Hound|  2|        Afghan Hound|Aloof, Clownish, ...|
|  A wild pack animal|     Working|  3| African Hunting Dog|Wild, Hardworking...|
|Badger, otter hun...|     Terrier|  4|    Airedale Terrier|Outgoing, Friendl...|
|      Sheep guarding|     Working|  5|          Akbash Dog|Loyal, Independen...|
|       Hunting bears|     Working|  6|               Akita|Docile, Alert, Re...|
|            Guarding|       Mixed|  7|Alapaha Blue Bloo...|Loving, Protectiv...|
|        Sled pulling|       Mixed|  8|       Alaskan Husky|Friendly, Energet...|
|Hauling heavy fre...|     Working|  9|    Alaskan Malamute|Friendly, Affecti...|
|              L

In [5]:
unique_categories = df_caract2.select(explode(split(df_caract2["temperament"], ",\s*")).alias("temperament")).distinct()

# Creem les columnes dummy
for category_row in unique_categories.collect():
    category = category_row["temperament"]
    df_caract2 = df_caract2.withColumn(category, df_caract2["temperament"].contains(category).cast("int"))

df_caract2 = df_caract2.drop(*["id", "temperament", "bred_for"])

df_caract2.show()

24/04/25 22:19:06 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+------------+--------------------+------+--------+------+------+--------+----------+--------+--------+-----+--------+-----+--------+-----+---+-------------+-----+---------+----+------+---------+----+-----+-------------+--------+---------+------------+-----------+------+---------+---------+-------+-------------+------+--------+-------------+--------+-----+----+---------+------+----+--------------+----------+-----+-----------+--------+-----+------+--------------+-----+-------+--------+-------+----------+------+--------+------+-----------+--------+-----------+---------+-----+------+------+------------+--------+--------+-----------+-----------+-----------+--------+-----------+-------+----+------+---------------+--------+--------+-----+-------+------+------+---------+-------+----------+--------+------------+--------+------+---------------+----------+--------+---------+----------+-------+----+---------+------------+-------------+----------+------+--------+-------+-----------+-----+--------+

## Table Join

In [9]:
df = df_caract.join(df_intel, 
               df_caract.breed == df_intel.breed, 
               "inner").drop(df_intel.breed) # Drop allows to remove duplicates
df = df.join(df_caract2, 
               df.breed == df_caract2.Breed, 
               "inner").drop(df_caract2.Breed) # Drop allows to remove duplicates
df = df.distinct()

print(df.count())
df.show()

92
+-------+-----------+--------+------+------------------+--------------------+-------------------+--------+--------------------+-----------+--------------+--------+------------+-----------------+---------------+-------------------+------------------------------+--------------------------------+--------------------+----+--------+------------+------+--------+------+------+--------+----------+--------+--------+-----+--------+-----+--------+-----+---+-------------+-----+---------+----+------+---------+----+-----+-------------+--------+---------+------------+-----------+------+---------+---------+-------+-------------+------+--------+-------------+--------+-----+----+---------+------+----+--------------+----------+-----+-----------+--------+-----+------+--------------+-----+-------+--------+-------+----------+------+--------+------+-----------+--------+-----------+---------+-----+------+------+------------+--------+--------+-----------+-----------+-----------+--------+-----------+-------+

## Categorical Variables

### Classification Variable

In [10]:
print("Classification variable has",len(set(df.rdd.map(lambda r: r.classification).collect())),"modalities")

[Stage 38:>                                                         (0 + 1) / 1]

Classification variable has 5 modalities


                                                                                

In [11]:
pivoted = df.groupBy("breed").pivot("classification").agg(lit(1))
pivoted = pivoted.na.fill(0)
df = df.join(pivoted, 
               df.breed == pivoted.breed, 
               "inner").drop(pivoted.breed) # Drop allows to remove duplicates
df = df.distinct()
print(df.count())
df.show()

                                                                                

92
+-------+-----------+--------+------+------------------+--------------------+-------------------+--------+-----------+--------------+--------+------------+-----------------+---------------+-------------------+------------------------------+--------------------------------+--------------------+----+--------+------------+------+--------+------+------+--------+----------+--------+--------+-----+--------+-----+--------+-----+---+-------------+-----+---------+----+------+---------+----+-----+-------------+--------+---------+------------+-----------+------+---------+---------+-------+-------------+------+--------+-------------+--------+-----+----+---------+------+----+--------------+----------+-----+-----------+--------+-----+------+--------------+-----+-------+--------+-------+----------+------+--------+------+-----------+--------+-----------+---------+-----+------+------+------------+--------+--------+-----------+-----------+-----------+--------+-----------+-------+----+------+---------

### Breed Group Variable

In [14]:
print("Breed Group variable has",len(set(df.rdd.map(lambda r: r.breed_group).collect())),"modalities")

Breed Group variable has 7 modalities


In [12]:
pivoted = df.groupBy("breed").pivot("breed_group").agg(lit(1))
pivoted = pivoted.na.fill(0)
df = df.join(pivoted, 
               df.breed == pivoted.breed, 
               "inner").drop(pivoted.breed) # Drop allows to remove duplicates
df = df.distinct()
print(df.count())
df.show()

                                                                                

92
+-------+-----------+--------+------+------------------+--------------------+-------------------+--------+-----------+--------------+--------+------------+-----------------+---------------+-------------------+------------------------------+--------------------------------+--------------------+----+--------+------------+------+--------+------+------+--------+----------+--------+--------+-----+--------+-----+--------+-----+---+-------------+-----+---------+----+------+---------+----+-----+-------------+--------+---------+------------+-----------+------+---------+---------+-------+-------------+------+--------+-------------+--------+-----+----+---------+------+----+--------------+----------+-----+-----------+--------+-----+------+--------------+-----+-------+--------+-------+----------+------+--------+------+-----------+--------+-----------+---------+-----+------+------+------------+--------+--------+-----------+-----------+-----------+--------+-----------+-------+----+------+---------

## Numerical Variables

In [15]:
df = df.drop('breed')
assemblers = []
scalers = []
for name, dtype in df.dtypes:
    if name == 'avg_reps':
        continue
    if dtype in ["int","bigint","double"]:
        assemblers.append(VectorAssembler(inputCols=[name], outputCol=name + "_vec"))
        scalers.append(MinMaxScaler(inputCol=name + "_vec", outputCol=name + "_scaled"))

pipeline = Pipeline(stages=assemblers + scalers)
scalerModel = pipeline.fit(df)
scaledData = scalerModel.transform(df)
scaledData.show()

24/04/25 22:42:30 WARN DAGScheduler: Broadcasting large task binary with size 1650.5 KiB
[Stage 3908:>                                                       (0 + 1) / 1]

+-------+-----------+--------+------+------------------+--------------------+-------------------+--------+-----------+--------------+--------+------------+-----------------+---------------+-------------------+------------------------------+--------------------------------+--------------------+----+--------+------------+------+--------+------+------+--------+----------+--------+--------+-----+--------+-----+--------+-----+---+-------------+-----+---------+----+------+---------+----+-----+-------------+--------+---------+------------+-----------+------+---------+---------+-------+-------------+------+--------+-------------+--------+-----+----+---------+------+----+--------------+----------+-----+-----------+--------+-----+------+--------------+-----+-------+--------+-------+----------+------+--------+------+-----------+--------+-----------+---------+-----+------+------+------------+--------+--------+-----------+-----------+-----------+--------+-----------+-------+----+------+------------

                                                                                

## Variable Selection

In [16]:
scaled_vars = [var for var, dtype in scaledData.dtypes if var[-6:] == "scaled"]
df_regression = scaledData.select('avg_reps',*scaled_vars)
df_regression.show()

24/04/25 22:44:17 WARN DAGScheduler: Broadcasting large task binary with size 1484.3 KiB
[Stage 3928:>                                                       (0 + 1) / 1]

+--------+--------------+------------------+---------------+-------------+-------------------------+---------------------------+--------------------------+---------------+------------------+---------------------+---------------+--------------------+------------------------+----------------------+--------------------------+-------------------------------------+---------------------------------------+--------------------+-------------+---------------+-------------+-------------+---------------+-----------------+---------------+---------------+------------+---------------+------------+---------------+------------+----------+--------------------+------------+----------------+-----------+-------------+----------------+-----------+------------+--------------------+---------------+----------------+-------------------+------------------+-------------+----------------+----------------+--------------+--------------------+-------------+---------------+--------------------+---------------+--------

                                                                                

# Linear Regression Training

In [17]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [18]:
df_regression = df_regression.withColumn('isVal', when(rand() > 0.5, 1).otherwise(0))

train = df_regression.where(col("isVal") > 0.5)

test = df_regression.where(col("isVal") < 0.5)

train = train.drop('isVal')

test = test.drop('isVal')

assembler = VectorAssembler(inputCols=scaled_vars, outputCol="features")

linear_regression = LinearRegression(labelCol="avg_reps",predictionCol="predicted_avg_reps", featuresCol="features",regParam=0.1)

pipeline = Pipeline(stages=[assembler, linear_regression])

model = pipeline.fit(train)

model

24/04/25 22:47:32 WARN DAGScheduler: Broadcasting large task binary with size 1569.8 KiB
24/04/25 22:49:13 WARN DAGScheduler: Broadcasting large task binary with size 1562.6 KiB
                                                                                

PipelineModel_6988ef1ec6eb

In [19]:
predictions = model.transform(test)

# Evaluate the model using MSE

evaluator = RegressionEvaluator(labelCol="avg_reps", predictionCol="predicted_avg_reps", metricName="mse")
mse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(mse))

24/04/25 22:51:30 WARN DAGScheduler: Broadcasting large task binary with size 1564.4 KiB
[Stage 4005:>                                                       (0 + 1) / 1]

Root Mean Squared Error (RMSE) on test data: 85.467


                                                                                

In [None]:
predictions

DataFrame[avg_reps: double, classification_onehot: vector, breed_onehot: vector, barking_scaled: vector, coat_length_scaled: vector, drooling_scaled: vector, energy_scaled: vector, good_with_children_scaled: vector, good_with_other_dogs_scaled: vector, good_with_strangers_scaled: vector, grooming_scaled: vector, playfulness_scaled: vector, protectiveness_scaled: vector, shedding_scaled: vector, trainability_scaled: vector, avg_height_female_scaled: vector, avg_height_male_scaled: vector, avg_life_expectancy_scaled: vector, avg_weight_male_log_normalized_scaled: vector, avg_weight_female_log_normalized_scaled: vector, obey_scaled: vector, avg_reps_scaled: vector, breed_indexed_scaled: vector, classification_indexed_scaled: vector, features: vector, prediction: double]