<p style="text-align:center">
    <a href="https://skills.network/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01">
    <img src="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/assets/logos/SN_web_lightmode.png" width="200" alt="Skills Network Logo">
    </a>
</p>


## Practice Project - Create a machine learning pipeline for a regression project


Estimated time needed: **90** minutes


## Scenario


You are a data engineer at a data analytics consulting company. Your company prides itself in being able to efficiently handle huge datasets. Data scientists in your office need to work with different algorithms and data in different formats. While they are good at Machine Learning, they count on you to be able do ETL jobs and build ML pipelines.



## Objectives

In this 4 part assignment you will:

- Part 1 ETL
  - Load a csv dataset
  - Remove duplicates if any
  - Drop rows with null values if any
  - Make transformations
  - Store the cleaned data in parquet format
- Part 2 Machine Learning Pipeline creation
  - Create a machine learning pipeline for prediction
- Part 3 Model evaluation
  - Evaluate the model using metrics
  - Print the intercept and the coefficients
- Part 4 Model Persistance
  - Cave the model for future production use
  - Load and verify the stored model


## Datasets

In this lab you will be using dataset(s):

 - Modified version of car mileage dataset. Original dataset available at https://archive.ics.uci.edu/ml/datasets/auto+mpg 
 


----


## Setup


For this lab, we will be using the following libraries:

*   [`PySpark`](https://spark.apache.org/docs/latest/api/python/index.html?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMSkillsNetworkBD0231ENCoursera2789-2023-01-01) for connecting to the Spark Cluster


### Installing Required Libraries

Spark Cluster is pre-installed in the Skills Network Labs environment. However, you need libraries like pyspark and findspark to
 connect to this cluster.


The following required libraries are __not__ pre-installed in the Skills Network Labs environment. __You will need to run the following cell__ to install them:


In [2]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

### Importing Required Libraries

_We recommend you import all required libraries in one place (here):_


In [3]:
# You can also use this section to suppress warnings generated by your code:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# FindSpark simplifies the process of using Apache Spark with Python

import findspark
findspark.init()

## Part 1 - ETL


### Task 1 - Import required libraries


In [4]:
# Suppress warnings
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

# Import required libraries
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark Session
spark = SparkSession.builder.appName("CarMileageETL").getOrCreate()

24/08/15 08:41:04 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


<details>
    <summary>Click here for a Hint</summary>
    
Import all the required libraries
    
</details>


<details>
    <summary>Click here for Solution</summary>

```python
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler

```

</details>


### Task 2 - Create a spark session


In [5]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("CarMileageETL") \
    .getOrCreate()

# Verify the Spark session creation
print("Spark session created successfully!")

Spark session created successfully!


<details>
    <summary>Click here for a Hint</summary>
    
Use the SparkSession.builder

</details>


<details>
    <summary>Click here for Solution</summary>

```python
spark = SparkSession.builder.appName("Practice Project").getOrCreate()
```

</details>


### Task 3 - Load the csv file into a dataframe


Download the data file


In [6]:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/mpg-raw.csv

--2024-08-15 08:41:44--  https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/mpg-raw.csv
Resolving cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)... 169.63.118.104, 169.63.118.104
Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 14354 (14K) [text/csv]
Saving to: ‘mpg-raw.csv’


2024-08-15 08:41:44 (40.0 MB/s) - ‘mpg-raw.csv’ saved [14354/14354]



Load the dataset into the spark dataframe


In [7]:
# Define the path to the CSV file
csv_file_path = "mpg-raw.csv"  # Assuming the file is in your current working directory

# Load the CSV file into a Spark DataFrame
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# Show the first few rows to verify the data load
df.show(5)

                                                                                

+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|46.6|        4|       86.0|        65|  2110|      17.9|  80|Japanese|
|44.6|        4|       91.0|        67|  1850|      13.8|  80|Japanese|
|44.3|        4|       90.0|        48|  2085|      21.7|  80|European|
|44.0|        4|       97.0|        52|  2130|      24.6|  82|European|
|43.4|        4|       90.0|        48|  2335|      23.7|  80|European|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rows



<details>
    <summary>Click here for a Hint</summary>
    
Use  spark.read.csv

</details>


<details>
    <summary>Click here for Solution</summary>

```python
df = spark.read.csv("mpg-raw.csv", header=True, inferSchema=True)

```

</details>


### Task 4 - Print top 5 rows of the dataset


In [8]:
df.show(5)

+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|46.6|        4|       86.0|        65|  2110|      17.9|  80|Japanese|
|44.6|        4|       91.0|        67|  1850|      13.8|  80|Japanese|
|44.3|        4|       90.0|        48|  2085|      21.7|  80|European|
|44.0|        4|       97.0|        52|  2130|      24.6|  82|European|
|43.4|        4|       90.0|        48|  2335|      23.7|  80|European|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rows



### Task 5 - Print the number of cars in each Origin


In [9]:
# Group by the 'Origin' column and count the number of occurrences
origin_counts = df.groupBy("Origin").count()

# Show the results
origin_counts.show()



+--------+-----+
|  Origin|count|
+--------+-----+
|European|   70|
|    null|    1|
|Japanese|   88|
|American|  247|
+--------+-----+



                                                                                

<details>
    <summary>Click here for a Hint</summary>
    
Use  df.groupBy

</details>


<details>
    <summary>Click here for Solution</summary>

```python
df.groupBy('Origin').count().orderBy('count').show()
```

</details>


### Task 6 - Print the total number of rows in the dataset


In [11]:
# Print the total number of rows in the dataset
total_rows = df.count()

print(f"Total number of rows in the dataset: {total_rows}")

Total number of rows in the dataset: 406


### Task 7 - Drop all the duplicate rows from the dataset


In [12]:
# Drop duplicate rows from the dataset
df_no_duplicates = df.dropDuplicates()

# Verify by printing the total number of rows after dropping duplicates
total_rows_after_dropping_duplicates = df_no_duplicates.count()
print(f"Total number of rows after dropping duplicates: {total_rows_after_dropping_duplicates}")



Total number of rows after dropping duplicates: 392


                                                                                

<details>
    <summary>Click here for a Hint</summary>
    
Use  df.dropDuplicates

</details>


<details>
    <summary>Click here for Solution</summary>

```python
df = df.dropDuplicates()
```

</details>


### Task 8 - Print the total number of rows in the dataset


In [13]:
# Print the total number of rows in the dataset after dropping duplicates
total_rows_after_dropping_duplicates = df_no_duplicates.count()

print(f"Total number of rows in the dataset after dropping duplicates: {total_rows_after_dropping_duplicates}")



Total number of rows in the dataset after dropping duplicates: 392


                                                                                

### Task 9 - Drop all the rows that contain null values from the dataset


In [14]:
# Drop rows that contain null values from the dataset
df_cleaned = df_no_duplicates.dropna()

# Verify by printing the total number of rows after dropping null values
total_rows_after_dropping_nulls = df_cleaned.count()
print(f"Total number of rows after dropping rows with null values: {total_rows_after_dropping_nulls}")

                                                                                

Total number of rows after dropping rows with null values: 385


<details>
    <summary>Click here for a Hint</summary>
    
Use df.dropna

</details>


<details>
    <summary>Click here for Solution</summary>

```python
df=df.dropna()
```

</details>


### Task 10 - Print the total number of rows in the dataset


In [15]:
# Print the total number of rows in the dataset after dropping null values
total_rows_after_dropping_nulls = df_cleaned.count()

print(f"Total number of rows in the dataset after dropping null values: {total_rows_after_dropping_nulls}")



Total number of rows in the dataset after dropping null values: 385


                                                                                

### Task 11 - Rename the column "Engine Disp" to "Engine_Disp"Drop


In [16]:
# Rename the column "Engine Disp" to "Engine_Disp"
df_renamed = df_cleaned.withColumnRenamed("Engine Disp", "Engine_Disp")

# Show the first few rows to verify the column rename
df_renamed.show(5)

+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine_Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|24.0|        4|      134.0|        96|  2702|      13.5|  75|Japanese|
|18.0|        6|      250.0|        88|  3139|      14.5|  71|American|
|29.0|        4|       68.0|        49|  1867|      19.5|  73|European|
|22.4|        6|      231.0|       110|  3415|      15.8|  81|American|
|20.5|        6|      231.0|       105|  3425|      16.9|  77|American|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rows



<details>
    <summary>Click here for a Hint</summary>
    
Use df.withColumnRenamed

</details>


<details>
    <summary>Click here for Solution</summary>

```python
df = df.withColumnRenamed("Engine Disp","Engine_Disp")
```

</details>


### Task 12 - Save the dataframe in parquet format, name the file as "mpg-cleaned.parquet"


In [17]:
# Save the DataFrame in Parquet format
df_renamed.write.parquet("mpg-cleaned.parquet")

print("DataFrame has been saved in Parquet format as 'mpg-cleaned.parquet'.")

Scaling row group sizes to 96.54% for 7 writers
24/08/15 08:46:01 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
24/08/15 08:46:01 WARN hadoop.MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
                                                                                

DataFrame has been saved in Parquet format as 'mpg-cleaned.parquet'.


<details>
    <summary>Click here for a Hint</summary>
    
Use df.write.mode("overwrite").parquet

</details>


<details>
    <summary>Click here for Solution</summary>

```python

df.write.mode("overwrite").parquet("mpg-cleaned.parquet")
```

</details>


#### Part 1 - Evaluation



Run the code cell below.<br>
If the code throws up any errors, go back and review the code you have written.


In [21]:
# Calculate row counts
rowcount1 = df.count()  # Total rows after the initial load
rowcount2 = df_no_duplicates.count()  # Total rows after dropping duplicate rows
rowcount3 = df_cleaned.count()  # Total rows after dropping duplicate rows and rows with null values

# Run the evaluation code
print("Part 1 - Evaluation")

print("Total rows = ", rowcount1)
print("Total rows after dropping duplicate rows = ", rowcount2)
print("Total rows after dropping duplicate rows and rows with null values = ", rowcount3)
print("Renamed column name = ", df_renamed.columns[2])



Part 1 - Evaluation
Total rows =  406
Total rows after dropping duplicate rows =  392
Total rows after dropping duplicate rows and rows with null values =  385
Renamed column name =  Engine_Disp


                                                                                

## Part - 2 Machine Learning Pipeline creation


### Task 1 - Load data from "mpg-cleaned.parquet" into a dataframe


In [22]:
# Load the Parquet file into a Spark DataFrame
df_loaded = spark.read.parquet("mpg-cleaned.parquet")

# Show the first few rows to verify the data load
df_loaded.show(5)

+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine_Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|32.2|        4|      108.0|        75|  2265|      15.2|  80|Japanese|
|28.0|        4|      107.0|        86|  2464|      15.5|  76|European|
|26.0|        4|      156.0|        92|  2585|      14.5|  82|American|
|25.0|        4|      104.0|        95|  2375|      17.5|  70|European|
|25.0|        4|      140.0|        75|  2542|      17.0|  74|American|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rows



In [23]:
# Show the top 5 rows of the loaded DataFrame
df_loaded.show(5)

+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine_Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|32.2|        4|      108.0|        75|  2265|      15.2|  80|Japanese|
|28.0|        4|      107.0|        86|  2464|      15.5|  76|European|
|26.0|        4|      156.0|        92|  2585|      14.5|  82|American|
|25.0|        4|      104.0|        95|  2375|      17.5|  70|European|
|25.0|        4|      140.0|        75|  2542|      17.0|  74|American|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 5 rows



In [24]:
# Print the schema of the DataFrame
df_loaded.printSchema()

root
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Engine_Disp: double (nullable = true)
 |-- Horsepower: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Accelerate: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Origin: string (nullable = true)



### Task 2 - Define the StringIndexer pipeline stage


In [25]:
from pyspark.ml.feature import StringIndexer

# Initialize StringIndexer to convert the "Origin" column into "OriginIndex"
indexer = StringIndexer(inputCol="Origin", outputCol="OriginIndex")

# Fit the indexer to the DataFrame and transform the data
df_indexed = indexer.fit(df_loaded).transform(df_loaded)

# Show the top 5 rows to verify the transformation
df_indexed.show(5)

                                                                                

+----+---------+-----------+----------+------+----------+----+--------+-----------+
| MPG|Cylinders|Engine_Disp|Horsepower|Weight|Accelerate|Year|  Origin|OriginIndex|
+----+---------+-----------+----------+------+----------+----+--------+-----------+
|32.2|        4|      108.0|        75|  2265|      15.2|  80|Japanese|        1.0|
|28.0|        4|      107.0|        86|  2464|      15.5|  76|European|        2.0|
|26.0|        4|      156.0|        92|  2585|      14.5|  82|American|        0.0|
|25.0|        4|      104.0|        95|  2375|      17.5|  70|European|        2.0|
|25.0|        4|      140.0|        75|  2542|      17.0|  74|American|        0.0|
+----+---------+-----------+----------+------+----------+----+--------+-----------+
only showing top 5 rows



<details>
    <summary>Click here for a Hint</summary>
    
Use StringIndexer class

</details>


<details>
    <summary>Click here for Solution</summary>

```python
indexer = StringIndexer(inputCol="Origin", outputCol="OriginIndex")
```

</details>


### Task 3 - Define the VectorAssembler pipeline stage


In [26]:
from pyspark.ml.feature import VectorAssembler

# Initialize the VectorAssembler to combine the input columns into a single "features" column
assembler = VectorAssembler(
    inputCols=['Cylinders', 'Engine_Disp', 'Horsepower', 'Weight', 'Accelerate', 'Year', 'OriginIndex'],
    outputCol="features"
)

# Transform the data using the assembler
df_assembled = assembler.transform(df_indexed)

# Show the top 5 rows to verify the transformation
df_assembled.select("features", *df_assembled.columns[:-1]).show(5, truncate=False)

+-------------------------------------+----+---------+-----------+----------+------+----------+----+--------+-----------+
|features                             |MPG |Cylinders|Engine_Disp|Horsepower|Weight|Accelerate|Year|Origin  |OriginIndex|
+-------------------------------------+----+---------+-----------+----------+------+----------+----+--------+-----------+
|[4.0,108.0,75.0,2265.0,15.2,80.0,1.0]|32.2|4        |108.0      |75        |2265  |15.2      |80  |Japanese|1.0        |
|[4.0,107.0,86.0,2464.0,15.5,76.0,2.0]|28.0|4        |107.0      |86        |2464  |15.5      |76  |European|2.0        |
|[4.0,156.0,92.0,2585.0,14.5,82.0,0.0]|26.0|4        |156.0      |92        |2585  |14.5      |82  |American|0.0        |
|[4.0,104.0,95.0,2375.0,17.5,70.0,2.0]|25.0|4        |104.0      |95        |2375  |17.5      |70  |European|2.0        |
|[4.0,140.0,75.0,2542.0,17.0,74.0,0.0]|25.0|4        |140.0      |75        |2542  |17.0      |74  |American|0.0        |
+-----------------------

<details>
    <summary>Click here for a Hint</summary>
    
Use the VectorAssembler class

</details>


<details>
    <summary>Click here for Solution</summary>

```python
assembler = VectorAssembler(inputCols=['Cylinders','Engine_Disp','Horsepower','Weight','Accelerate','Year'], outputCol="features")
```

</details>


### Task 4 - Define the StandardScaler pipeline stage


In [27]:
from pyspark.ml.feature import StandardScaler

# Initialize the StandardScaler to scale the "features" column
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

# Fit the scaler to the DataFrame and transform the data
scaler_model = scaler.fit(df_assembled)
df_scaled = scaler_model.transform(df_assembled)

# Show the top 5 rows to verify the transformation
df_scaled.select("scaledFeatures", *df_scaled.columns[:-1]).show(5, truncate=False)

                                                                                

+------------------------------------------------------------------------------------------------------------------------------------+----+---------+-----------+----------+------+----------+----+--------+-----------+-------------------------------------+
|scaledFeatures                                                                                                                      |MPG |Cylinders|Engine_Disp|Horsepower|Weight|Accelerate|Year|Origin  |OriginIndex|features                             |
+------------------------------------------------------------------------------------------------------------------------------------+----+---------+-----------+----------+------+----------+----+--------+-----------+-------------------------------------+
|[2.337745829755739,1.027489209607369,1.9395119792463553,2.6577769322671063,5.507434227818882,21.833602544207945,1.2988371546369353] |32.2|4        |108.0      |75        |2265  |15.2      |80  |Japanese|1.0        |[4.0,108.0,75.0,226

<details>
    <summary>Click here for a Hint</summary>
    
Use the StandardScaler class

</details>


<details>
    <summary>Click here for Solution</summary>

```python
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

```

</details>


### Task 5 - Define the Model creation pipeline stage


In [28]:
from pyspark.ml.regression import LinearRegression

# Initialize the LinearRegression model to predict "MPG"
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="MPG", predictionCol="prediction")

# Fit the model to the data
lr_model = lr.fit(df_scaled)

# Show the model summary
print(f"Coefficients: {lr_model.coefficients}")
print(f"Intercept: {lr_model.intercept}")

# Make predictions using the model
df_predictions = lr_model.transform(df_scaled)

# Show the top 5 predictions
df_predictions.select("MPG", "prediction").show(5)

24/08/15 08:54:01 WARN util.Instrumentation: [a21822fc] regParam is zero, which might cause numerical instability and overfitting.
24/08/15 08:54:01 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/08/15 08:54:01 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/08/15 08:54:04 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/08/15 08:54:04 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

Coefficients: [-0.6626863070802211,2.1302823948333454,-0.34367945292672525,-6.044930400353515,0.2527452043446002,2.9107740567426537,1.0187609457448632]
Intercept: -18.870325526818064
+----+------------------+
| MPG|        prediction|
+----+------------------+
|32.2|31.304543513546683|
|28.0| 27.94801149509903|
|26.0|30.057960287367045|
|25.0|23.855219488968103|
|25.0|24.063459157595283|
+----+------------------+
only showing top 5 rows



<details>
    <summary>Click here for a Hint</summary>
    
Use the LinearRegression class

</details>


<details>
    <summary>Click here for Solution</summary>

```python
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="MPG")
```

</details>


### Task 6 - Build the pipeline


In [29]:
from pyspark.ml import Pipeline

# Define the pipeline stages
pipeline = Pipeline(stages=[indexer, assembler, scaler, lr])

# Fit the pipeline to the data
pipeline_model = pipeline.fit(df_loaded)

# Make predictions using the pipeline model
df_pipeline_predictions = pipeline_model.transform(df_loaded)

# Show the top 5 predictions
df_pipeline_predictions.select("MPG", "prediction").show(5)

24/08/15 08:54:40 WARN util.Instrumentation: [5ecb919d] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

+----+------------------+
| MPG|        prediction|
+----+------------------+
|32.2|31.304543513546683|
|28.0| 27.94801149509903|
|26.0|30.057960287367045|
|25.0|23.855219488968103|
|25.0|24.063459157595283|
+----+------------------+
only showing top 5 rows



<details>
    <summary>Click here for a Hint</summary>
    
Use the Pipeline class

</details>


<details>
    <summary>Click here for Solution</summary>

```python
pipeline = Pipeline(stages=[indexer,assembler, scaler, lr])
```

</details>


### Task 7 - Split the data


In [30]:
# Split the data into training (70%) and testing (30%) sets with seed 42
(trainingData, testingData) = df_loaded.randomSplit([0.7, 0.3], seed=42)

# Verify the number of rows in each set
print(f"Training Data Count: {trainingData.count()}")
print(f"Testing Data Count: {testingData.count()}")

                                                                                

Training Data Count: 256




Testing Data Count: 129


                                                                                

<details>
    <summary>Click here for a Hint</summary>
    
Use the randomSplit method

</details>


<details>
    <summary>Click here for Solution</summary>

```python
(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)
```

</details>


### Task 8 - Fit the pipeline


In [31]:
# Fit the pipeline to the training data
pipeline_model = pipeline.fit(trainingData)

print("Pipeline has been successfully fitted to the training data.")

24/08/15 08:56:00 WARN util.Instrumentation: [33a83706] regParam is zero, which might cause numerical instability and overfitting.

Pipeline has been successfully fitted to the training data.


                                                                                

<details>
    <summary>Click here for a Hint</summary>
    
Use the pipeline.fit method

</details>


<details>
    <summary>Click here for Solution</summary>

```python
pipelineModel = pipeline.fit(trainingData)
```

</details>


#### Part 2 - Evaluation



Run the code cell below.<br>
If the code throws up any errors, go back and review the code you have written.


In [34]:
# Assuming rowcount4 is the count of the total number of rows in the testing dataset
rowcount4 = testingData.count()

# Get the stages of the pipeline and extract their names
ps = [str(stage).split("_")[0] for stage in pipeline.getStages()]

# Print Part 2 - Evaluation
print("Part 2 - Evaluation")
print("Total rows = ", rowcount4)
print("Pipeline Stage 1 = ", ps[0])
print("Pipeline Stage 2 = ", ps[1])
print("Pipeline Stage 3 = ", ps[2])

# Print the label column used in Linear Regression
print("Label column = ", lr.getLabelCol())



Part 2 - Evaluation
Total rows =  129
Pipeline Stage 1 =  StringIndexer
Pipeline Stage 2 =  VectorAssembler
Pipeline Stage 3 =  StandardScaler
Label column =  MPG


                                                                                

## Part 3 - Model Evaluation


### Task 1 - Predict using the model


In [35]:
# Make predictions on the testing data
predictions = pipeline_model.transform(testingData)

# Show the top 5 predictions
predictions.select("MPG", "prediction").show(5)

[Stage 92:>                                                         (0 + 1) / 1]

+----+------------------+
| MPG|        prediction|
+----+------------------+
|13.0|10.947222765312034|
|13.0|14.626657902314012|
|13.0|10.731547156600616|
|15.0|13.276559495758608|
|15.5|16.110272322579608|
+----+------------------+
only showing top 5 rows



                                                                                

<details>
    <summary>Click here for a Hint</summary>
    
Use the transform method of the model

</details>


<details>
    <summary>Click here for Solution</summary>

```python
predictions = pipelineModel.transform(testingData)
```

</details>


### Task 2 - Print the MSE


In [36]:
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize the evaluator for Mean Squared Error (MSE)
evaluator = RegressionEvaluator(labelCol="MPG", predictionCol="prediction", metricName="mse")

# Calculate the Mean Squared Error (MSE) on the predictions
mse = evaluator.evaluate(predictions)

# Print the MSE
print(mse)



9.961706393688939


                                                                                

<details>
    <summary>Click here for a Hint</summary>
    
Use the RegressionEvaluator

</details>


<details>
    <summary>Click here for Solution</summary>

```python
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)

```

</details>


### Task 3 - Print the MAE


In [37]:
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize the evaluator for Mean Absolute Error (MAE)
evaluator = RegressionEvaluator(labelCol="MPG", predictionCol="prediction", metricName="mae")

# Calculate the Mean Absolute Error (MAE) on the predictions
mae = evaluator.evaluate(predictions)

# Print the MAE
print(mae)



2.5562598012608344


                                                                                

<details>
    <summary>Click here for a Hint</summary>
    
Use the RegressionEvaluator

</details>


<details>
    <summary>Click here for Solution</summary>

```python
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="mae")
mae = evaluator.evaluate(predictions)
print(mae)

```

</details>


### Task 4 - Print the R-Squared(R2)


In [38]:
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize the evaluator for R-squared (R2)
evaluator = RegressionEvaluator(labelCol="MPG", predictionCol="prediction", metricName="r2")

# Calculate the R-squared (R2) on the predictions
r2 = evaluator.evaluate(predictions)

# Print the R-squared (R2)
print(r2)



0.8357346511587201


                                                                                

<details>
    <summary>Click here for a Hint</summary>
    
Use the RegressionEvaluator

</details>


<details>
    <summary>Click here for Solution</summary>

```python
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="r2")
r2 = evaluator.evaluate(predictions)
print(r2)
```

</details>


#### Part 3 - Evaluation



Run the code cell below.<br>
If the code throws up any errors, go back and review the code you have written.


In [39]:
print("Part 3 - Evaluation")

print("Mean Squared Error = ", round(mse, 2))
print("Mean Absolute Error = ", round(mae, 2))
print("R Squared = ", round(r2, 2))

# Extract the LinearRegression model from the pipeline
lr_model = pipeline_model.stages[-1]

print("Intercept = ", round(lr_model.intercept, 2))

Part 3 - Evaluation
Mean Squared Error =  9.96
Mean Absolute Error =  2.56
R Squared =  0.84
Intercept =  -17.44


## Part 4 - Model persistance


### Task 1 - Save the model to the path "Practice_Project"


In [None]:
# Save the pipeline model
# your code goes here


<details>
    <summary>Click here for a Hint</summary>
    
Use the model.write().save method

</details>


<details>
    <summary>Click here for Solution</summary>

```python
pipelineModel.write().save("Practice_Project")
```

</details>


### Task 2 - Load the model from the path "Practice_Project"


In [None]:
# Load the pipeline model
# your code goes here


<details>
    <summary>Click here for a Hint</summary>
    
Use the load method of the model

</details>


<details>
    <summary>Click here for Solution</summary>

```python
loadedPipelineModel = PipelineModel.load("Practice_Project")
```

</details>


### Task 3 - Make predictions using the loaded model on the test data


In [None]:
# Use the loaded pipeline model for predictions
# your code goes here


<details>
    <summary>Click here for a Hint</summary>
    
Use the transform method

</details>


<details>
    <summary>Click here for Solution</summary>

```python
predictions = loadedPipelineModel.transform(testingData)
```

</details>


### Task 4 - Show the predictions


In [None]:
# your code goes here


<details>
    <summary>Click here for a Hint</summary>
    
Use the the select method of the dataframe

</details>


<details>
    <summary>Click here for Solution</summary>

```python
predictions.select("MPG","prediction").show()
```

</details>


#### Part 4 - Evaluation



Run the code cell below.<br>
If the code throws up any errors, go back and review the code you have written.


In [None]:
print("Part 4 - Evaluation")

loadedmodel = loadedPipelineModel.stages[-1]
totalstages = len(loadedPipelineModel.stages)
inputcolumns = loadedPipelineModel.stages[1].getInputCols()

print("Number of stages in the pipeline = ", totalstages)
for i,j in zip(inputcolumns, loadedmodel.coefficients):
    print(f"Coefficient for {i} is {round(j,4)}")

### Task 5 - Stop Spark Session


In [None]:
spark.stop()

Congratulations!! you have successfully finished the practice project.


## Authors


[Ramesh Sannareddy](https://www.linkedin.com/in/rsannareddy/?utm_medium=Exinfluencer&utm_source=Exinfluencer&utm_content=000026UJ&utm_term=10006555&utm_id=NA-SkillsNetwork-Channel-SkillsNetworkCoursesIBMBD0231ENSkillsNetwork866-2023-01-01)


### Other Contributors


## Change Log


|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2023-05-26|0.1|Ramesh Sannareddy|Initial Version Created|


Copyright © 2023 IBM Corporation. All rights reserved.
