In [2]:
def warn(*args,**kwargs):
    pass
import warnings as wng 
wng.warn = warn
wng.filterwarnings("ignore")

import findspark as fs
fs.init()

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline

from pyspark.ml.evaluation import RegressionEvaluator

In [3]:
spark = SparkSession.builder \
    .appName("ML pipelines Using Spark") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

In [4]:
mpg_df = spark.read.csv("mpg.csv",header = True,inferSchema=True)

In [5]:
mpg_df.printSchema()

root
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Engine Disp: double (nullable = true)
 |-- Horsepower: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Accelerate: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Origin: string (nullable = true)



In [6]:
mpg_df.show(2)

+----+---------+-----------+----------+------+----------+----+--------+
| MPG|Cylinders|Engine Disp|Horsepower|Weight|Accelerate|Year|  Origin|
+----+---------+-----------+----------+------+----------+----+--------+
|15.0|        8|      390.0|       190|  3850|       8.5|  70|American|
|21.0|        6|      199.0|        90|  2648|      15.0|  70|American|
+----+---------+-----------+----------+------+----------+----+--------+
only showing top 2 rows


In [7]:
assembler = VectorAssembler(inputCols=["Cylinders","Engine Disp","Horsepower","Weight","Accelerate","Year"],outputCol="features")
mpg_plus = assembler.transform(mpg_df)

In [8]:
mpg_plus.select("MPG","features").show(truncate=False)

+----+----------------------------------+
|MPG |features                          |
+----+----------------------------------+
|15.0|[8.0,390.0,190.0,3850.0,8.5,70.0] |
|21.0|[6.0,199.0,90.0,2648.0,15.0,70.0] |
|18.0|[6.0,199.0,97.0,2774.0,15.5,70.0] |
|16.0|[8.0,304.0,150.0,3433.0,12.0,70.0]|
|14.0|[8.0,455.0,225.0,3086.0,10.0,70.0]|
|15.0|[8.0,350.0,165.0,3693.0,11.5,70.0]|
|18.0|[8.0,307.0,130.0,3504.0,12.0,70.0]|
|14.0|[8.0,454.0,220.0,4354.0,9.0,70.0] |
|15.0|[8.0,400.0,150.0,3761.0,9.5,70.0] |
|10.0|[8.0,307.0,200.0,4376.0,15.0,70.0]|
|15.0|[8.0,383.0,170.0,3563.0,10.0,70.0]|
|11.0|[8.0,318.0,210.0,4382.0,13.5,70.0]|
|10.0|[8.0,360.0,215.0,4615.0,14.0,70.0]|
|15.0|[8.0,429.0,198.0,4341.0,10.0,70.0]|
|21.0|[6.0,200.0,85.0,2587.0,16.0,70.0] |
|17.0|[8.0,302.0,140.0,3449.0,10.5,70.0]|
|9.0 |[8.0,304.0,193.0,4732.0,18.5,70.0]|
|14.0|[8.0,340.0,160.0,3609.0,8.0,70.0] |
|22.0|[6.0,198.0,95.0,2833.0,15.5,70.0] |
|14.0|[8.0,440.0,215.0,4312.0,8.5,70.0] |
+----+----------------------------

In [9]:
(training_data,testing_data)=mpg_plus.randomSplit([0.7,0.3],seed = 42)

In [10]:
lr=LinearRegression(featuresCol="features",labelCol="MPG")
pipeline =Pipeline(stages=[lr])
model=pipeline.fit(training_data)

In [11]:
%%bash

mkdir model_storage
ls

mkdir: cannot create directory ‘model_storage’: File exists


Connecting To a Spark Cluster.ipynb
Feature Extraction and Transformation.ipynb
ML Pipelines Using Spark.ipynb
Model Persistence.ipynb
diamonds.csv
iris.csv
model_storage
movie.json
mpg.csv
mpg.csv.1
proverbs.csv
submit.py
virtual


In [12]:
import os
import sys

# Point to the FOLDER ABOVE the bin folder
os.environ['HADOOP_HOME'] = "C:\\hadoop"
# Add the bin folder to your system path
os.environ['PATH'] = os.environ['PATH'] + ";" + "C:\\hadoop\\bin"

In [13]:


model.write().overwrite().save("./model_storage/")

In [14]:
from pyspark.ml.pipeline import PipelineModel

In [15]:
loaded_model = PipelineModel.load("./model_storage")

In [16]:
predictions = loaded_model.transform(testing_data)

In [17]:
predictions.select("MPG","prediction").show()

+----+------------------+
| MPG|        prediction|
+----+------------------+
|10.0| 6.683344024048662|
|11.0| 8.344953219723493|
|12.0|10.043420590827143|
|12.0| 5.252194346982389|
|13.0|21.473697417345097|
|13.0|17.421344951368052|
|13.0|11.168080223812325|
|13.0|14.154650234586434|
|13.0| 9.853448998812159|
|13.0|10.994457356405125|
|13.0|13.044087217951844|
|13.0|10.850712619404437|
|13.0| 7.021267492937749|
|13.0| 4.102584238938459|
|13.0| 8.389478890010661|
|14.0|10.241735354705064|
|14.0|15.991415050247195|
|14.0|12.234738750650223|
|14.0| 10.67833831392161|
|14.0|10.883442729946164|
+----+------------------+
only showing top 20 rows


In [18]:
spark.stop()