In [47]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import when

In [48]:
spark = SparkSession.builder.appName("PySpark SQL Server Connection").config(
    "spark.jars", "/usr/share/java/mysql-connector-java-9.1.0.jar").getOrCreate()

In [None]:
# Define MySQL connection parameters
jdbc_url = "jdbc:mysql://192.168.0.110/auto_mpg"
connection_properties = {"user": "nighthawksdb",
                         "password": "sunlightsam829", "driver": "com.mysql.cj.jdbc.Driver"}

In [64]:
# Load the data from MySQL into a Spark DataFrame
df = spark.read.jdbc(url=jdbc_url, table="auto_mpg", properties=connection_properties)

In [65]:
df.show(5)

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model year|origin|            car name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|18.0|        8|       307.0|       130|  3504|        12.0|        70|     1|chevrolet chevell...|
|15.0|        8|       350.0|       165|  3693|        11.5|        70|     1|   buick skylark 320|
|18.0|        8|       318.0|       150|  3436|        11.0|        70|     1|  plymouth satellite|
|16.0|        8|       304.0|       150|  3433|        12.0|        70|     1|       amc rebel sst|
|17.0|        8|       302.0|       140|  3449|        10.5|        70|     1|         ford torino|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
only showing top 5 rows



In [66]:
df.dtypes

[('mpg', 'decimal(3,1)'),
 ('cylinders', 'int'),
 ('displacement', 'decimal(4,1)'),
 ('horsepower', 'string'),
 ('weight', 'int'),
 ('acceleration', 'decimal(3,1)'),
 ('model year', 'int'),
 ('origin', 'int'),
 ('car name', 'string')]

In [67]:
# Change data type of horsepower from string to int
df=df.withColumn("horsepower",col("horsepower").cast("int"))

In [68]:
df.dtypes

[('mpg', 'decimal(3,1)'),
 ('cylinders', 'int'),
 ('displacement', 'decimal(4,1)'),
 ('horsepower', 'int'),
 ('weight', 'int'),
 ('acceleration', 'decimal(3,1)'),
 ('model year', 'int'),
 ('origin', 'int'),
 ('car name', 'string')]

In [69]:
print(f'There are total {df.count()} row. Let print first 2 data rows:')
df.limit(2).show()

There are total 398 row. Let print first 2 data rows:
+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model year|origin|            car name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|18.0|        8|       307.0|       130|  3504|        12.0|        70|     1|chevrolet chevell...|
|15.0|        8|       350.0|       165|  3693|        11.5|        70|     1|   buick skylark 320|
+----+---------+------------+----------+------+------------+----------+------+--------------------+



In [None]:
# Calculate the number of null values in each column
from pyspark.sql.functions import col, sum as _sum
null_counts = df.select([_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])  # Show the result 
null_counts.show()

+---+---------+------------+----------+------+------------+----------+------+--------+
|mpg|cylinders|displacement|horsepower|weight|acceleration|model year|origin|car name|
+---+---------+------------+----------+------+------------+----------+------+--------+
|  0|        0|           0|         6|     0|           0|         0|     0|       0|
+---+---------+------------+----------+------+------------+----------+------+--------+



In [74]:
from pyspark.ml.feature import Imputer
imputer = Imputer(
    inputCols=["horsepower"],  # Column(s) to fill
    outputCols=["horsepower"],  # Column(s) to store filled values
    strategy="mean"  # Strategy: 'mean' or 'median'
)

df_imputed=imputer.fit(df).transform(df)
df_imputed.show()

[Stage 69:>                                                         (0 + 1) / 1]

+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model year|origin|            car name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|18.0|        8|       307.0|       130|  3504|        12.0|        70|     1|chevrolet chevell...|
|15.0|        8|       350.0|       165|  3693|        11.5|        70|     1|   buick skylark 320|
|18.0|        8|       318.0|       150|  3436|        11.0|        70|     1|  plymouth satellite|
|16.0|        8|       304.0|       150|  3433|        12.0|        70|     1|       amc rebel sst|
|17.0|        8|       302.0|       140|  3449|        10.5|        70|     1|         ford torino|
|15.0|        8|       429.0|       198|  4341|        10.0|        70|     1|    ford galaxie 500|
|14.0|        8|       454.0|       220|  4354|         9.0|        70|     1|    chevrolet impala|


                                                                                

In [87]:
# Calculate the number of null values in each column
from pyspark.sql.functions import col, sum as _sum
null_counts = df_imputed.select([_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])  # Show the result 
null_counts.show()

+---+---------+------------+----------+------+------------+----------+------+--------+
|mpg|cylinders|displacement|horsepower|weight|acceleration|model year|origin|car name|
+---+---------+------------+----------+------+------------+----------+------+--------+
|  0|        0|           0|         0|     0|           0|         0|     0|       0|
+---+---------+------------+----------+------+------------+----------+------+--------+



In [88]:
# Change the numeric origin values to corresponding region names
df_with_region = df.withColumn("origin", when(df["origin"] == 1, "United States")
                   .when(df["origin"] == 2, "Europe")
                   .when(df["origin"] == 3, "Japan")
                   .otherwise("Unknown"))
df_with_region.show()

+----+---------+------------+----------+------+------------+----------+-------------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model year|       origin|            car name|
+----+---------+------------+----------+------+------------+----------+-------------+--------------------+
|18.0|        8|       307.0|       130|  3504|        12.0|        70|United States|chevrolet chevell...|
|15.0|        8|       350.0|       165|  3693|        11.5|        70|United States|   buick skylark 320|
|18.0|        8|       318.0|       150|  3436|        11.0|        70|United States|  plymouth satellite|
|16.0|        8|       304.0|       150|  3433|        12.0|        70|United States|       amc rebel sst|
|17.0|        8|       302.0|       140|  3449|        10.5|        70|United States|         ford torino|
|15.0|        8|       429.0|       198|  4341|        10.0|        70|United States|    ford galaxie 500|
|14.0|        8|       454.0|       2

In [77]:
df_grouped=df_with_region.groupBy('origin').count()
df_grouped.show()

+-------------+-----+
|       origin|count|
+-------------+-----+
|       Europe|   70|
|United States|  249|
|        Japan|   79|
+-------------+-----+



# Feature Engineering
Given the multiple columns, we need to merge them into a single column using VectorAssembler. It is a feature transformer that merges multiple columns into a vector column. One can select the number of columns used as input features and pass only those columns through the VectorAssembler. We will pass all seven input columns to create a single feature vector column in our case.

In [78]:
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler

In [79]:
assembler = VectorAssembler(inputCols=["cylinders", "displacement", "horsepower",
                            "weight", "acceleration", "model year", "origin"], outputCol='features')

In [80]:
assembler

VectorAssembler_650b0858746c

In [89]:
df_transformed=assembler.transform(df_imputed)

In [90]:
df_transformed.show()

+----+---------+------------+----------+------+------------+----------+------+--------------------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model year|origin|            car name|            features|
+----+---------+------------+----------+------+------------+----------+------+--------------------+--------------------+
|18.0|        8|       307.0|       130|  3504|        12.0|        70|     1|chevrolet chevell...|[8.0,307.0,130.0,...|
|15.0|        8|       350.0|       165|  3693|        11.5|        70|     1|   buick skylark 320|[8.0,350.0,165.0,...|
|18.0|        8|       318.0|       150|  3436|        11.0|        70|     1|  plymouth satellite|[8.0,318.0,150.0,...|
|16.0|        8|       304.0|       150|  3433|        12.0|        70|     1|       amc rebel sst|[8.0,304.0,150.0,...|
|17.0|        8|       302.0|       140|  3449|        10.5|        70|     1|         ford torino|[8.0,302.0,140.0,...|
|15.0|        8|       429.0|   

In [91]:
df_transformed.select('features','mpg').show(5)

+--------------------+----+
|            features| mpg|
+--------------------+----+
|[8.0,307.0,130.0,...|18.0|
|[8.0,350.0,165.0,...|15.0|
|[8.0,318.0,150.0,...|18.0|
|[8.0,304.0,150.0,...|16.0|
|[8.0,302.0,140.0,...|17.0|
+--------------------+----+
only showing top 5 rows



In [92]:
final_data=df_transformed.select('features','mpg')

In [None]:
train_data,test_data=final_data.randomSplit([0.7,0.3])

In [94]:
train_data.describe().show()

+-------+-----------------+
|summary|              mpg|
+-------+-----------------+
|  count|              294|
|   mean|         23.56803|
| stddev|7.736624361074818|
|    min|              9.0|
|    max|             46.6|
+-------+-----------------+



In [95]:
test_data.describe().show()

+-------+-----------------+
|summary|              mpg|
+-------+-----------------+
|  count|              104|
|   mean|         23.36346|
| stddev|8.072331476979532|
|    min|             10.0|
|    max|             44.6|
+-------+-----------------+



#  PySpark Linear Regression

In [96]:
from pyspark.ml.regression import LinearRegression

In [97]:
lr=LinearRegression(featuresCol="features",labelCol="mpg")

In [98]:
trained_lr_model=lr.fit(train_data)

24/12/02 21:43:49 WARN Instrumentation: [2a90dcb3] regParam is zero, which might cause numerical instability and overfitting.
24/12/02 21:43:49 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

In [99]:
results=trained_lr_model.evaluate(train_data)

In [None]:
# Print the evaluation metrics
print("Train data Root Mean Squared Error (RMSE): {}".format(results.rootMeanSquaredError))
print("Train data Mean Absolute Error (MAE): {}".format(results.meanAbsoluteError))
print("Train data R-squared (R²): {}".format(results.r2))

Root Mean Squared Error (RMSE): 3.3034701706649665
Mean Absolute Error (MAE): 2.5028361312990053
R-squared (R²): 0.8170562961740342


In [101]:
unlabeled_data=test_data.select("features")
unlabeled_data.show(5)

+--------------------+
|            features|
+--------------------+
|[3.0,70.0,100.0,2...|
|[4.0,76.0,52.0,16...|
|[4.0,79.0,67.0,19...|
|[4.0,79.0,70.0,20...|
|[4.0,81.0,60.0,17...|
+--------------------+
only showing top 5 rows



In [102]:
predictions=trained_lr_model.transform(unlabeled_data)
predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[3.0,70.0,100.0,2...|30.505967677205696|
|[4.0,76.0,52.0,16...|31.660666142364292|
|[4.0,79.0,67.0,19...|30.135740160653924|
|[4.0,79.0,70.0,20...|25.898597479270443|
|[4.0,81.0,60.0,17...| 36.03016228444024|
|[4.0,85.0,65.0,20...| 33.45639810081174|
|[4.0,89.0,60.0,19...| 34.52958714699049|
|[4.0,89.0,62.0,20...| 34.44650480053773|
|[4.0,90.0,70.0,19...|28.912113080827503|
|[4.0,91.0,53.0,17...|31.832471003572238|
|[4.0,91.0,67.0,18...| 34.43439557987582|
|[4.0,91.0,67.0,19...| 35.34939111789171|
|[4.0,97.0,67.0,19...| 31.90701208114558|
|[4.0,97.0,71.0,18...| 30.16669781556415|
|[4.0,97.0,75.0,21...|30.038189916132872|
|[4.0,97.0,78.0,19...|30.534700482700657|
|[4.0,97.0,78.0,21...|31.318922036503686|
|[4.0,97.0,88.0,21...|27.473818217194328|
|[4.0,97.0,88.0,21...|25.457643587219835|
|[4.0,98.0,63.0,20...|28.980485387116453|
+--------------------+------------

In [103]:
test_results=trained_lr_model.evaluate(test_data)

In [104]:
# Print the evaluation metrics
print("Test data Root Mean Squared Error (RMSE): {}".format(test_results.rootMeanSquaredError))
print("Test data Mean Absolute Error (MAE): {}".format(test_results.meanAbsoluteError))
print("Test data R-squared (R²): {}".format(test_results.r2))

Test data Root Mean Squared Error (RMSE): 3.339541974679029
Test data Mean Absolute Error (MAE): 2.5302759675353386
Test data R-squared (R²): 0.8271887732437024
