# **🚀 Machine Learning Pyshark**

In [41]:
!pip install -q numpy findspark

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.\
        builder.\
        appName("linear-regression-spark").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

sc = SparkContext.getOrCreate()

22/12/12 19:38:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## **🌀 Load and prepare data**

In [3]:
autoData = sc.textFile("auto-mpg.csv")
autoData.cache()
autoData.take(5)

                                                                                

['mpg,cylinders,displacement,horsepower,weight,acceleration,model year,origin,car name',
 '18,8,307,130,3504,12,70,1,chevrolet chevelle malibu',
 '15,8,350,165,3693,11.5,70,1,buick skylark 320',
 '18,8,318,150,3436,11,70,1,plymouth satellite',
 '16,8,304,150,3433,12,70,1,amc rebel sst']

Remove header line.


In [4]:
dataLines = autoData.filter(lambda x: "cylinders" not in x)
dataLines.count()

                                                                                

398

### **🛸 Transform data**

In [5]:
averageHorsePower = sc.broadcast(80.0)

def mapToRows(inputStr):
    global averageHorsePower
    attList = inputStr.split(",")
    
    # ❓️ Replace ? values with a normal value
    horsePowerValue = attList[3]
    if horsePowerValue == "?":
        horsePowerValue = averageHorsePower.value # 80
       
    # ✅ Create a row with cleaned up and converted data
    values = Row(     
        MPG=float(attList[0]),\
        CYLINDERS=float(attList[1]), \
        DISPLACEMENT=float(attList[2]), 
        HORSEPOWER=float(horsePowerValue),\
        WEIGHT=float(attList[4]), \
        ACCELERATION=float(attList[5]), \
        MODELYEAR=float(attList[6]),\
        NAME=attList[7],
    ) 
    return values

autoMap = dataLines.map(mapToRows)
autoMap.cache()
autoMap.take(3)

[Row(MPG=18.0, CYLINDERS=8.0, DISPLACEMENT=307.0, HORSEPOWER=130.0, WEIGHT=3504.0, ACCELERATION=12.0, MODELYEAR=70.0, NAME='1'),
 Row(MPG=15.0, CYLINDERS=8.0, DISPLACEMENT=350.0, HORSEPOWER=165.0, WEIGHT=3693.0, ACCELERATION=11.5, MODELYEAR=70.0, NAME='1'),
 Row(MPG=18.0, CYLINDERS=8.0, DISPLACEMENT=318.0, HORSEPOWER=150.0, WEIGHT=3436.0, ACCELERATION=11.0, MODELYEAR=70.0, NAME='1')]

### **㊗️ Transform RDD to DataFrame**

In [6]:
autoDf = spark.createDataFrame(autoMap)
autoDf.show(2)

+----+---------+------------+----------+------+------------+---------+----+
| MPG|CYLINDERS|DISPLACEMENT|HORSEPOWER|WEIGHT|ACCELERATION|MODELYEAR|NAME|
+----+---------+------------+----------+------+------------+---------+----+
|18.0|      8.0|       307.0|     130.0|3504.0|        12.0|     70.0|   1|
|15.0|      8.0|       350.0|     165.0|3693.0|        11.5|     70.0|   1|
+----+---------+------------+----------+------+------------+---------+----+
only showing top 2 rows



                                                                                

### **🕒️ Data Analytics**

Basic statistics and correlations.

In [7]:
autoDf.select("MPG","CYLINDERS").describe().show()



+-------+-----------------+------------------+
|summary|              MPG|         CYLINDERS|
+-------+-----------------+------------------+
|  count|              398|               398|
|   mean|23.51457286432161| 5.454773869346734|
| stddev|7.815984312565783|1.7010042445332125|
|    min|              9.0|               3.0|
|    max|             46.6|               8.0|
+-------+-----------------+------------------+



                                                                                

In [8]:
for column in autoDf.columns:
  if not isinstance(autoDf.select(column).take(1)[0][0], str):
    print( "Correlation to MPG for ", column, autoDf.stat.corr('MPG', column))

Correlation to MPG for  MPG 1.0
Correlation to MPG for  CYLINDERS -0.7753962854205549
Correlation to MPG for  DISPLACEMENT -0.804202824805898
Correlation to MPG for  HORSEPOWER -0.7746308409203809
Correlation to MPG for  WEIGHT -0.8317409332443345
Correlation to MPG for  ACCELERATION 0.42028891210165126
Correlation to MPG for  MODELYEAR 0.5792671330833099


### **🔥 Label encoder**

Transform to a Data Frame for input to Machine Learing. Drop columns that are not required (low correlation).

In [9]:
def transformToLabeledPoint(row) :
    labeledPoint = (
        row["MPG"], 
        Vectors.dense([
            row["ACCELERATION"],
            row["DISPLACEMENT"],
            row["WEIGHT"],
        ])
    )
    return labeledPoint


autoLp = autoMap.map(transformToLabeledPoint)
autoDF = spark.createDataFrame(autoLp, ["label", "features"])
autoDF.select("label", "features").show(2)

+-----+-------------------+
|label|           features|
+-----+-------------------+
| 18.0|[12.0,307.0,3504.0]|
| 15.0|[11.5,350.0,3693.0]|
+-----+-------------------+
only showing top 2 rows



## **🚀 Linear Regression**

In [14]:
(trainingData, testData) = autoDF.randomSplit([0.9, 0.1])

print(trainingData.count())
print(testData.count())

360
38


In [15]:
lr = LinearRegression(maxIter=10)
lrModel = lr.fit(trainingData)

print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

22/12/12 12:20:22 WARN Instrumentation: [8fb2e7ed] regParam is zero, which might cause numerical instability and overfitting.


Coefficients: [0.22742460655840563,-0.0064283955436110285,-0.006596397457056918]
Intercept: 40.79518707982478


In [16]:
predictions = lrModel.transform(testData)

predictions.select("prediction","label","features").show()

+------------------+-----+-------------------+
|        prediction|label|           features|
+------------------+-----+-------------------+
| 8.445575261284624| 11.0|[14.0,400.0,4997.0]|
| 7.713173969735415| 12.0|[11.0,455.0,4951.0]|
| 7.987428159691454| 12.0|[11.5,429.0,4952.0]|
|13.081341186800518| 13.0|[12.0,350.0,4274.0]|
|15.195335466077204| 13.0|[13.0,350.0,3988.0]|
|16.622531024945857| 14.0| [8.0,340.0,3609.0]|
| 19.78803062058817| 14.0|[10.0,455.0,3086.0]|
|11.898513924758753| 14.0|[16.0,302.0,4638.0]|
| 16.67917168363256| 15.0|[12.5,318.0,3777.0]|
|18.924617643191493| 16.0|[12.0,304.0,3433.0]|
|19.272188383593708| 16.0|[18.0,258.0,3632.0]|
|18.702501232161573| 18.0|[19.0,225.0,3785.0]|
|28.045825505367866| 19.0| [13.5,70.0,2330.0]|
|21.988985390590152| 19.0|[17.0,232.0,3211.0]|
| 20.93418342675683| 20.0|[13.5,262.0,3221.0]|
|28.445866455015086| 20.0|[19.5,140.0,2408.0]|
|28.160596818990072| 22.0|[16.5,108.0,2379.0]|
| 22.95658623411437| 22.5|[17.6,232.0,3085.0]|
| 28.41792900

### **✅ Evaluate model**

In [17]:
evaluator = RegressionEvaluator(
    predictionCol="prediction",
    labelCol="label",
    metricName="r2",
)
evaluator.evaluate(predictions)

0.7083646989273807

In [10]:
spark.stop()