# import

In [3]:
import pyspark
from pyspark.sql import SparkSession

# session

In [4]:
# creating sessoin
spark = SparkSession.builder.appName('practice').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/22 23:17:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/22 23:17:42 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/04/22 23:17:42 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/04/22 23:17:42 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


# data

In [8]:
#  read data                                                       #infer schema prevent all values automatically taking 'string' values
df = spark.read.option('header','true').csv('data.csv', inferSchema= True)

df.show()

+-------+----+------+---------+
|Country| Age|Salary|Purchased|
+-------+----+------+---------+
| France|  44| 72000|       No|
|  Spain|  27| 48000|      Yes|
|Germany|  30| 54000|       No|
|  Spain|  38| 61000|       No|
|Germany|  40|  null|      Yes|
| France|  35| 58000|      Yes|
|  Spain|null| 52000|       No|
| France|  48| 79000|      Yes|
|Germany|  50| 83000|       No|
| France|  37| 67000|      Yes|
+-------+----+------+---------+



In [11]:
df.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Purchased: string (nullable = true)



In [9]:
# drop where null
df = df.na.drop()

In [85]:
# 
from pyspark.sql.functions import lit
# 

df = df.withColumn("height", lit(40))
# 
df = df.withColumn("height", df["height"] + df["Age"])

In [87]:
df.show()

+-------+---+------+---------+------+
|Country|Age|Salary|Purchased|height|
+-------+---+------+---------+------+
| France| 44| 72000|       No|    84|
|  Spain| 27| 48000|      Yes|    67|
|Germany| 30| 54000|       No|    70|
|  Spain| 38| 61000|       No|    78|
| France| 35| 58000|      Yes|    75|
| France| 48| 79000|      Yes|    88|
|Germany| 50| 83000|       No|    90|
| France| 37| 67000|      Yes|    77|
+-------+---+------+---------+------+



# PySpark ML methods

In [None]:
# just using numericals for now
# [Age,Salary]

In [88]:
# create vector assembler
from pyspark.ml.feature import VectorAssembler 

# create instance with parameters
featureassembler = VectorAssembler(inputCols=['Age','height'], outputCol='independentVar')


In [89]:
# transform the df
output = featureassembler.transform(df)

In [90]:
output.show()

+-------+---+------+---------+------+--------------+
|Country|Age|Salary|Purchased|height|independentVar|
+-------+---+------+---------+------+--------------+
| France| 44| 72000|       No|    84|   [44.0,84.0]|
|  Spain| 27| 48000|      Yes|    67|   [27.0,67.0]|
|Germany| 30| 54000|       No|    70|   [30.0,70.0]|
|  Spain| 38| 61000|       No|    78|   [38.0,78.0]|
| France| 35| 58000|      Yes|    75|   [35.0,75.0]|
| France| 48| 79000|      Yes|    88|   [48.0,88.0]|
|Germany| 50| 83000|       No|    90|   [50.0,90.0]|
| France| 37| 67000|      Yes|    77|   [37.0,77.0]|
+-------+---+------+---------+------+--------------+



In [91]:
# select the DV and new column for model
final_data = output.select('independentVar','Salary')

In [92]:
final_data.show()

+--------------+------+
|independentVar|Salary|
+--------------+------+
|   [44.0,84.0]| 72000|
|   [27.0,67.0]| 48000|
|   [30.0,70.0]| 54000|
|   [38.0,78.0]| 61000|
|   [35.0,75.0]| 58000|
|   [48.0,88.0]| 79000|
|   [50.0,90.0]| 83000|
|   [37.0,77.0]| 67000|
+--------------+------+



# Linear Regression

In [93]:
# import regressor model
from pyspark.ml.regression import LinearRegression
# split data for future data
train_data, test_data = final_data.randomSplit([0.75,0.25])

In [94]:
# create model, determine indVars and dVars
regressor = LinearRegression(featuresCol= "independentVar", labelCol = 'Salary')

In [95]:
# fit model to training data
regressor = regressor.fit(train_data)

23/04/22 23:43:32 WARN Instrumentation: [fee0e39e] regParam is zero, which might cause numerical instability and overfitting.


# Coefficients

In [96]:
# coeff's
regressor.coefficients

DenseVector([916.0622, 200.3762])

# intercepts

In [97]:
# the intercept
regressor.intercept

10950.706312836628

# predictions

In [98]:
test_data.show()

+--------------+------+
|independentVar|Salary|
+--------------+------+
|   [37.0,77.0]| 67000|
|   [44.0,84.0]| 72000|
|   [48.0,88.0]| 79000|
|   [50.0,90.0]| 83000|
+--------------+------+



In [99]:
# creating predictins
preds = regressor.evaluate(test_data)

In [100]:
# show predicts usings the .predictions to see how accurate the model was/
preds.predictions.show()

+--------------+------+------------------+
|independentVar|Salary|        prediction|
+--------------+------+------------------+
|   [37.0,77.0]| 67000|60273.972602739705|
|   [44.0,84.0]| 72000| 68089.04109589034|
|   [48.0,88.0]| 79000| 72554.79452054785|
|   [50.0,90.0]| 83000|  74787.6712328766|
+--------------+------+------------------+



# other evaluation metrics

In [103]:
preds.meanAbsoluteError , preds.meanSquaredError

(6323.630136986378, 42379515.38750334)