<a href="https://www.kaggle.com/code/ankanhore545/linear-regression-using-pyspark?scriptVersionId=96938601" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# Installing Pyspark package

In [1]:
!pip install PySpark

Collecting PySpark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l- \ done
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.0/199.0 kB[0m [31m7.5 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: PySpark
  Building wheel for PySpark (setup.py) ... [?25l- \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | done
[?25h  Created wheel for PySpark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=95e186f5674c718607bfb1d0f49efded3a15400

In [2]:
import pyspark

# Creating the first Pyspark session

In [3]:
#The entry point into all functionality in PySpark is the SparkSession class. 
#To create a basic SparkSession, just use SparkSession.builder:

from pyspark.sql import SparkSession
spark= SparkSession.builder.appName('CostofLiving').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/29 18:25:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
#Checking the input file directory
import os
print(os.listdir("../input"))

['cost-of-living-index-2022']


# Read the I/P file and check the columns

In [5]:
#If we don't use the header metadata, the headers are _c0, _c1 and _c2 which is not correct.
#Therefore we can fix that using header=True.

#Similarly, without inferschema, all columns would be treated as default datatype(String)
#Therefore to fix this, we put inferSchema=true , PySpark will automatically go through the csv file and infer the schema of each column
df_train= spark.read.csv("../input/cost-of-living-index-2022/Cost_of_Living_Index_2022.csv", header=True, inferSchema=True)
df_train.show()

                                                                                

+----+--------------------+--------------------+----------+------------------------------+---------------+----------------------+----------------------------+
|Rank|             Country|Cost of Living Index|Rent Index|Cost of Living Plus Rent Index|Groceries Index|Restaurant Price Index|Local Purchasing Power Index|
+----+--------------------+--------------------+----------+------------------------------+---------------+----------------------+----------------------------+
|   1|         Afghanistan|               20.37|      2.72|                         12.09|          14.92|                 12.41|                       23.04|
|   2|             Albania|                35.5|      8.47|                         22.83|          29.32|                 25.82|                       30.19|
|   3|             Algeria|               26.87|      4.59|                         16.43|          28.82|                 14.48|                       24.63|
|   4|           Argentina|               34.6

In [6]:
df_train.printSchema()

root
 |-- Rank: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Cost of Living Index: double (nullable = true)
 |-- Rent Index: double (nullable = true)
 |-- Cost of Living Plus Rent Index: double (nullable = true)
 |-- Groceries Index: double (nullable = true)
 |-- Restaurant Price Index: double (nullable = true)
 |-- Local Purchasing Power Index: double (nullable = true)



In [7]:
df_train.columns

['Rank',
 'Country',
 'Cost of Living Index',
 'Rent Index',
 'Cost of Living Plus Rent Index',
 'Groceries Index',
 'Restaurant Price Index',
 'Local Purchasing Power Index']

In Pyspark, we take all the independant features as one input column(X) and keep the target column(Y) separate. So, in this case, all the columns(except Rank and Country as they don't play a significant role in determining the Cost of Living Index), are taken as inputcol. The Cost of Living Index column is taken as target column(Y), which the model eventually would predict after training.

**[Rent Index, Cost of Living Plus Rent Index, Groceries Index, Restaurant Price Index, Local Purchasing Power Index]--> new feature--> independant feature**

# Invoking VectorAssembler for grouping the required features

In [8]:
#VectorAssembler is a transformer that combines a given list of columns into a single vector column. 
#It is useful for combining raw features and features generated by different feature transformers 
#into a single feature vector, in order to train ML models like logistic regression and decision trees. 


#VectorAssembler accepts the following input column types: all numeric types, boolean type, and vector type. 
#In each row, the values of the input columns will be concatenated into a vector in the specified order.


from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=['Rent Index','Cost of Living Plus Rent Index','Groceries Index','Restaurant Price Index','Local Purchasing Power Index'], outputCol='Independant Features')

In [9]:
#transform each element of the input array + Independant features column
output=featureassembler.transform(df_train)

In [10]:
output.show()

+----+--------------------+--------------------+----------+------------------------------+---------------+----------------------+----------------------------+--------------------+
|Rank|             Country|Cost of Living Index|Rent Index|Cost of Living Plus Rent Index|Groceries Index|Restaurant Price Index|Local Purchasing Power Index|Independant Features|
+----+--------------------+--------------------+----------+------------------------------+---------------+----------------------+----------------------------+--------------------+
|   1|         Afghanistan|               20.37|      2.72|                         12.09|          14.92|                 12.41|                       23.04|[2.72,12.09,14.92...|
|   2|             Albania|                35.5|      8.47|                         22.83|          29.32|                 25.82|                       30.19|[8.47,22.83,29.32...|
|   3|             Algeria|               26.87|      4.59|                         16.43|          

In [11]:
output.columns

['Rank',
 'Country',
 'Cost of Living Index',
 'Rent Index',
 'Cost of Living Plus Rent Index',
 'Groceries Index',
 'Restaurant Price Index',
 'Local Purchasing Power Index',
 'Independant Features']

# Create the final output with the desired target variable

In [12]:
# The .select() is a transformation function that is used to select the columns from DataFrame and Dataset
fin_output= output.select("Independant Features", "Cost of Living Index")

In [13]:
fin_output.show()

+--------------------+--------------------+
|Independant Features|Cost of Living Index|
+--------------------+--------------------+
|[2.72,12.09,14.92...|               20.37|
|[8.47,22.83,29.32...|                35.5|
|[4.59,16.43,28.82...|               26.87|
|[7.71,22.04,28.17...|               34.69|
|[11.61,23.45,27.5...|               33.89|
|[36.84,58.57,77.4...|               77.75|
|[27.13,50.46,65.8...|               71.04|
|[7.86,19.48,26.57...|               29.73|
|[35.34,61.19,70.5...|                84.0|
|[29.22,42.79,44.5...|               54.77|
|[4.42,19.67,30.41...|               33.13|
|[21.99,59.38,87.8...|               92.37|
|[9.81,21.01,27.24...|               30.89|
|[25.79,50.67,63.3...|               72.61|
|[11.64,32.71,48.7...|                51.3|
|[98.58,123.8,148....|              146.04|
|[10.18,23.24,31.2...|               34.77|
|[6.82,22.39,31.14...|               36.12|
|[10.21,26.12,35.1...|               40.17|
|[8.27,21.54,28.16...|          

# Baseline Model Training using Linear Regression

In [14]:
from pyspark.ml.regression import LinearRegression

#train_test_split
#featuresCol will be the input column and labelCol will be the target column
train_X, test_X= fin_output.randomSplit([0.8, 0.2])
reg=LinearRegression(featuresCol='Independant Features', labelCol='Cost of Living Index')
reg=reg.fit(train_X)

22/05/29 18:25:34 WARN Instrumentation: [eb143e76] regParam is zero, which might cause numerical instability and overfitting.


In [15]:
#Getting the set of coefficients and intercepsts.
reg.coefficients

DenseVector([-0.8827, 1.883, -0.0002, -0.0002, 0.0])

In [16]:
reg.intercept

-0.002324515882577834

# Model Evaluation

In [17]:
pred=reg.evaluate(test_X)

In [18]:
pred.predictions.show()



+--------------------+--------------------+------------------+
|Independant Features|Cost of Living Index|        prediction|
+--------------------+--------------------+------------------+
|[4.59,16.43,28.82...|               26.87|26.877443908207724|
|[5.17,17.7,29.34,...|               28.75| 28.75467504356115|
|[5.32,17.3,27.36,...|               27.87| 27.87141221342993|
|[6.06,19.51,26.95...|               31.38| 31.37911933227736|
|[8.68,25.13,38.13...|               39.64|39.645330407688334|
|[9.29,18.86,26.83...|               27.31| 27.30302274397029|
|[9.59,24.18,37.84...|               37.06| 37.05466557632836|
|[11.22,23.72,36.1...|               34.74|  34.7496685300143|
|[11.64,32.71,48.7...|                51.3| 51.30335838875973|
|[11.67,32.25,42.5...|               50.41| 50.41355821086917|
|[12.87,26.72,31.7...|               38.95| 38.94150365377775|
|[13.09,24.87,29.8...|               35.26|35.263888121669794|
|[13.19,29.75,44.3...|               44.37| 44.36155284

In [19]:
pred.meanAbsoluteError, pred.meanSquaredError

(0.005779935360378658, 4.568607018254735e-05)