#Spark
This is a simple example of using spark to read Spark dataframes, clean & query the data and build different classification and regression models to predict a developer salary based on some features. Stack Overflow surveys ([here](https://insights.stackoverflow.com/survey)) are used from 2019 and 2020 combined togather. Although the classification and regression results presented here are not good, this is just an example of how to use Spark with relatively big datasets.


In [1]:
# This is block to download Spark in orde to use in Colab

# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

In [2]:
import findspark
findspark.init()

In [3]:
# to inite and name the Spark application
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Stack_Overflow") \
    .getOrCreate()

In [4]:
# reading survey_results_public_2020 and printing its shape
data_20 = spark.read.csv('survey_results_public_2020.csv',inferSchema=True, header=True)
from pyspark.sql.functions import lit
data_20 = data_20.withColumn('Year', lit(2020))
data_20.count(), len(data_20.columns)


(64461, 62)

In [8]:
# reading survey_results_public_2019 and printing its shape
data_19 = spark.read.csv('survey_results_public_2019.csv',inferSchema=True, header=True)
from pyspark.sql.functions import lit
data_19 = data_19.withColumn('Year', lit(2019))
data_19.count(), len(data_19.columns)

(88883, 86)

In [9]:
# These are the columns I used for the tasks here, however you can add/remove new columns
columns = ["Year","Age","Age1stCode","YearsCode","ConvertedComp", "WorkWeekHrs", "CompFreq", "Country", "LanguageWorkedWith", "DevType"]

In [10]:
data_20 = data_20.select(columns)
data_20.show(2)

+----+---+----------+---------+-------------+-----------+--------+--------------+--------------------+--------------------+
|Year|Age|Age1stCode|YearsCode|ConvertedComp|WorkWeekHrs|CompFreq|       Country|  LanguageWorkedWith|             DevType|
+----+---+----------+---------+-------------+-----------+--------+--------------+--------------------+--------------------+
|2020| NA|        13|       36|           NA|         50| Monthly|       Germany|C#;HTML/CSS;JavaS...|Developer, deskto...|
|2020| NA|        19|        7|           NA|         NA|      NA|United Kingdom|    JavaScript;Swift|Developer, full-s...|
+----+---+----------+---------+-------------+-----------+--------+--------------+--------------------+--------------------+
only showing top 2 rows



In [11]:
data_19 = data_19.select(columns)
data_19.show(2)

+----+---+----------+---------+-------------+-----------+--------+--------------------+--------------------+--------------------+
|Year|Age|Age1stCode|YearsCode|ConvertedComp|WorkWeekHrs|CompFreq|             Country|  LanguageWorkedWith|             DevType|
+----+---+----------+---------+-------------+-----------+--------+--------------------+--------------------+--------------------+
|2019| 14|        10|        4|           NA|         NA|      NA|      United Kingdom|HTML/CSS;Java;Jav...|                  NA|
|2019| 19|        17|       NA|           NA|         NA|      NA|Bosnia and Herzeg...| C++;HTML/CSS;Python|Developer, deskto...|
+----+---+----------+---------+-------------+-----------+--------+--------------------+--------------------+--------------------+
only showing top 2 rows



In [12]:
# Combine 2019 and 2020 surveys togather
merged_df = data_20.unionByName(data_19).unionByName(data_19)
merged_df.show(2)

+----+---+----------+---------+-------------+-----------+--------+--------------+--------------------+--------------------+
|Year|Age|Age1stCode|YearsCode|ConvertedComp|WorkWeekHrs|CompFreq|       Country|  LanguageWorkedWith|             DevType|
+----+---+----------+---------+-------------+-----------+--------+--------------+--------------------+--------------------+
|2020| NA|        13|       36|           NA|         50| Monthly|       Germany|C#;HTML/CSS;JavaS...|Developer, deskto...|
|2020| NA|        19|        7|           NA|         NA|      NA|United Kingdom|    JavaScript;Swift|Developer, full-s...|
+----+---+----------+---------+-------------+-----------+--------+--------------+--------------------+--------------------+
only showing top 2 rows



In [13]:
merged_df.count(), len(merged_df.columns)

(242227, 10)

In [14]:
merged_df.printSchema()

root
 |-- Year: integer (nullable = false)
 |-- Age: string (nullable = true)
 |-- Age1stCode: string (nullable = true)
 |-- YearsCode: string (nullable = true)
 |-- ConvertedComp: string (nullable = true)
 |-- WorkWeekHrs: string (nullable = true)
 |-- CompFreq: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- LanguageWorkedWith: string (nullable = true)
 |-- DevType: string (nullable = true)



In [15]:
merged_df.select("Year","Age","Age1stCode",'YearsCode','ConvertedComp') \
.show(15, truncate=False)

+----+---+----------+---------+-------------+
|Year|Age|Age1stCode|YearsCode|ConvertedComp|
+----+---+----------+---------+-------------+
|2020|NA |13        |36       |NA           |
|2020|NA |19        |7        |NA           |
|2020|NA |15        |4        |NA           |
|2020|25 |18        |7        |NA           |
|2020|31 |16        |15       |NA           |
|2020|NA |14        |6        |NA           |
|2020|NA |18        |6        |NA           |
|2020|36 |12        |17       |116000       |
|2020|30 |20        |6        |NA           |
|2020|22 |14        |8        |32315        |
|2020|23 |13        |10       |40070        |
|2020|49 |42        |7        |14268        |
|2020|53 |14        |35       |38916        |
|2020|27 |13        |5        |66000        |
|2020|NA |13        |4        |NA           |
+----+---+----------+---------+-------------+
only showing top 15 rows



In [17]:
# Show some statistics from these columns
merged_df.describe(["Year","Age","Age1stCode","YearsCode","ConvertedComp"]).show()

+-------+------------------+------------------+--------------------+-----------------+------------------+
|summary|              Year|               Age|          Age1stCode|        YearsCode|     ConvertedComp|
+-------+------------------+------------------+--------------------+-----------------+------------------+
|  count|            242227|            242227|              242227|           242227|            242227|
|   mean|2019.2661181453761|30.447582235389916|   15.46143950545869|12.03466652733875|121566.30907364654|
| stddev|0.4419276913099093| 9.272932398750234|   5.023513148217929|9.125430403926376| 271832.0152550694|
|    min|              2019|                 1|                  10|                1|                 0|
|    max|              2020|                NA|Younger than 5 years|               NA|                NA|
+-------+------------------+------------------+--------------------+-----------------+------------------+



It can be noticed easily that these columns include NA values and some texts like "Younger than 5 years". That is why these columns were considered string type (from the schema above).


In [16]:
merged_df.groupBy('Age').count().orderBy("count", ascending=False).show()

+---+-----+
|Age|count|
+---+-----+
| NA|38361|
| 25|12031|
| 28|11186|
| 26|11185|
| 27|11174|
| 24|11131|
| 30|10506|
| 23|10431|
| 29|10201|
| 22| 8600|
| 32| 7891|
| 31| 7842|
| 21| 7100|
| 33| 6953|
| 35| 6261|
| 34| 6160|
| 20| 5394|
| 36| 5287|
| 37| 4671|
| 38| 4588|
+---+-----+
only showing top 20 rows



In [18]:
merged_df.groupBy('Age1stCode').count().orderBy("count", ascending=False).show(50)

+--------------------+-----+
|          Age1stCode|count|
+--------------------+-----+
|                  15|22980|
|                  16|22729|
|                  14|22418|
|                  18|21272|
|                  12|20551|
|                  13|16941|
|                  17|15803|
|                  10|13546|
|                  19|12242|
|                  20| 9889|
|                  11| 9388|
|                  NA| 9059|
|                   8| 6566|
|                  21| 5101|
|                   9| 5083|
|                  22| 4419|
|                   7| 3768|
|                  23| 2818|
|                  25| 2464|
|                  24| 2261|
|                   6| 2222|
|Younger than 5 years| 1454|
|                   5| 1410|
|                  26| 1324|
|                  27| 1098|
|                  30| 1003|
|                  28|  938|
|                  29|  654|
|                  32|  385|
|                  35|  354|
|                  31|  349|
|             

In [19]:
merged_df.groupBy('YearsCode').count().orderBy("count", ascending=False).show(50)

+------------------+-----+
|         YearsCode|count|
+------------------+-----+
|                 5|18343|
|                10|18064|
|                 6|16192|
|                 4|14712|
|                 8|14129|
|                 7|14117|
|                 3|13309|
|                15|10470|
|                 2| 9864|
|                20| 9834|
|                12| 9513|
|                 9| 9064|
|                NA| 8667|
|                11| 6221|
|                14| 5731|
|                13| 5540|
|                18| 4889|
|                25| 4652|
|                 1| 4530|
|                16| 4389|
|                30| 4178|
|                17| 3755|
|  Less than 1 year| 3491|
|                22| 2824|
|                19| 2693|
|                35| 2345|
|                23| 2062|
|                21| 1980|
|                24| 1835|
|                40| 1529|
|                28| 1244|
|                26| 1196|
|                27| 1151|
|                32| 1149|
|

In [20]:
merged_df.groupBy('ConvertedComp').count().orderBy("count", ascending=True).show()

+-------------+-----+
|ConvertedComp|count|
+-------------+-----+
|        16250|    1|
|       134555|    1|
|        55388|    1|
|       102750|    1|
|       102684|    1|
|       729000|    1|
|        18429|    1|
|        32392|    1|
|        40512|    1|
|        47268|    1|
|       201645|    1|
|        45756|    1|
|        69348|    1|
|       615000|    1|
|        21515|    1|
|        41256|    1|
|        25350|    1|
|        74363|    1|
|        77965|    1|
|       273600|    1|
+-------------+-----+
only showing top 20 rows



In [22]:
merged_df.groupBy('CompFreq').count().orderBy("count", ascending=True).show()

+--------+-----+
|CompFreq|count|
+--------+-----+
|  Weekly| 6764|
|      NA|75622|
| Monthly|77013|
|  Yearly|82828|
+--------+-----+



# Cleaning the Data

In [23]:
# Remove the NA and the text values from the columns
condition1 = (merged_df.Age != "NA") & (merged_df.YearsCode != "NA") & (merged_df.ConvertedComp != "NA") &  (merged_df.Age1stCode != "NA") \
 &  (merged_df.WorkWeekHrs != "NA")  &  (merged_df.CompFreq != "NA")
condition2 = (merged_df.YearsCode != "Less than 1 year") &  (merged_df.YearsCode != "More than 50 years")
condition3 = (merged_df.Age1stCode != "Younger than 5 years") &  (merged_df.Age1stCode != "Older than 85")

clean_data = merged_df.filter(condition1).filter(condition2).filter(condition3)

clean_data.describe(["Year","Age","Age1stCode","YearsCode","ConvertedComp", "WorkWeekHrs"]).show()

+-------+------------------+-----------------+------------------+-----------------+------------------+-----------------+
|summary|              Year|              Age|        Age1stCode|        YearsCode|     ConvertedComp|      WorkWeekHrs|
+-------+------------------+-----------------+------------------+-----------------+------------------+-----------------+
|  count|            134831|           134831|            134831|           134831|            134831|           134831|
|   mean|2019.2203573362208|31.57038737382353|15.180448116531064|13.17738502273216|121530.25807863177|41.70010444366242|
| stddev|0.4144891491842972|8.194636903914772| 4.603424051350229|8.825225993019494| 270869.2951354562|32.62598806404635|
|    min|              2019|                1|                10|                1|                 0|                1|
|    max|              2020|               99|                 9|                9|             9e+05|               99|
+-------+------------------+----

As shown, the statistics still not proper as the types are string. So we need to change these types into integer or double.

In [24]:
from pyspark.sql.types import DoubleType
from pyspark.sql.types import IntegerType

clean_data = clean_data.withColumn("Age", clean_data["Age"].cast(IntegerType()))
clean_data = clean_data.withColumn("Age1stCode", clean_data["Age1stCode"].cast(IntegerType()))
clean_data = clean_data.withColumn("YearsCode", clean_data["YearsCode"].cast(IntegerType()))
clean_data = clean_data.withColumn("ConvertedComp", clean_data["ConvertedComp"].cast(DoubleType()))
clean_data = clean_data.withColumn("WorkWeekHrs", clean_data["WorkWeekHrs"].cast(IntegerType()))

clean_data.describe(["Year","Age","Age1stCode","YearsCode","ConvertedComp", "WorkWeekHrs"]).show()

+-------+------------------+-----------------+------------------+-----------------+------------------+-----------------+
|summary|              Year|              Age|        Age1stCode|        YearsCode|     ConvertedComp|      WorkWeekHrs|
+-------+------------------+-----------------+------------------+-----------------+------------------+-----------------+
|  count|            134831|           134831|            134831|           134831|            134831|           134831|
|   mean|2019.2203573362208|31.57018786480854|15.180448116531064|13.17738502273216|121530.25807863177|41.68452358878893|
| stddev|0.4144891491842972|8.194663780689153| 4.603424051350229|8.825225993019494| 270869.2951354562|32.62835559874103|
|    min|              2019|                1|                 5|                1|               0.0|                1|
|    max|              2020|              279|                83|               50|         2000000.0|             4850|
+-------+------------------+----

Now it is fine but we see there are some outliers, e.g. salary of 0, working hours of 1 and Ago = 1 or 279.

In [26]:
# change between ascending=True and ascending=Flase to see the outliers
clean_data.groupBy('Age').count().orderBy("Age", ascending=True).show()

+---+-----+
|Age|count|
+---+-----+
|  1|   18|
|  2|    8|
|  3|    2|
|  4|    2|
| 10|    1|
| 12|    2|
| 13|    3|
| 14|    2|
| 15|   13|
| 16|   47|
| 17|   96|
| 18|  362|
| 19|  714|
| 20| 1296|
| 21| 2476|
| 22| 4424|
| 23| 6401|
| 24| 7665|
| 25| 8581|
| 26| 8352|
+---+-----+
only showing top 20 rows



In [30]:
clean_data.groupBy('ConvertedComp').count().orderBy("ConvertedComp", ascending=True).show()

+-------------+-----+
|ConvertedComp|count|
+-------------+-----+
|          0.0|  424|
|          1.0|   10|
|          2.0|    2|
|          4.0|    8|
|          5.0|    3|
|          6.0|    1|
|          7.0|    1|
|         10.0|    4|
|         11.0|    2|
|         12.0|   13|
|         14.0|    2|
|         18.0|    1|
|         19.0|    2|
|         20.0|    2|
|         24.0|   11|
|         25.0|    1|
|         26.0|    3|
|         30.0|    2|
|         32.0|    1|
|         34.0|    4|
+-------------+-----+
only showing top 20 rows



#Query the Data

In [31]:
# This line just to download the european countries
!pip install countrygroups
from countrygroups import EUROPEAN_UNION



In [32]:
# These are set of conditions to query from the data, you can add/remove/modify them 
condition1 = (clean_data.Age > 10) & (clean_data.Age < 80)

condition2 = (clean_data.ConvertedComp > 1000)  & (clean_data.ConvertedComp < 200000)

country = ["United States", "Canada", "United Kingdom"]
condition3 = (clean_data.Country.isin(EUROPEAN_UNION.names) ) | clean_data.Country.isin(country)

condition4 = (clean_data.WorkWeekHrs > 10) & (clean_data.WorkWeekHrs < 70)

condition5 = (clean_data.LanguageWorkedWith.contains("Python")) 
#  & (clean_data.MiscTechWorkedWith.contains("Spark"))

condition6 = (clean_data.DevType.contains("Data scientist")) 

condition7 = (clean_data.CompFreq == 'Yearly')

filt_data = clean_data.filter(condition1).filter(condition2).filter(condition3).filter(condition4)\
.filter(condition5).filter(condition6).filter(condition7)
filt_data.describe(["Year","Age","Age1stCode","YearsCode","ConvertedComp", "WorkWeekHrs"]).show()

+-------+-------------------+------------------+------------------+------------------+-----------------+-----------------+
|summary|               Year|               Age|        Age1stCode|         YearsCode|    ConvertedComp|      WorkWeekHrs|
+-------+-------------------+------------------+------------------+------------------+-----------------+-----------------+
|  count|               3466|              3466|              3466|              3466|             3466|             3466|
|   mean| 2019.2290825158684|31.851413733410272|14.863242931332948|13.427005193306405|87178.74581650317|41.74466243508367|
| stddev|0.42030308668323857| 7.791149714502791| 4.806056098664165| 9.090947121786213|40947.12326223351|6.585239216807744|
|    min|               2019|                18|                 5|                 1|           2106.0|               12|
|    max|               2020|                69|                42|                48|         198480.0|               67|
+-------+-------

#Classification Problem

In [35]:
# Divide the salary column into salary ranges to make it classification problem
# Here we make 21 range i.e. class

lower = 0
higher = 200000
step = 10000

import numpy as np
splits = np.arange(lower, higher , step)
splits = splits.tolist()
splits.append(float("inf"))
# splits

labels = []
for i in splits:
  j = i + step
  L = ["[", str(i), ", ", str(j),"]"]
  L = ''.join(L)
  if(i == float("inf")):
    L = '> 200K'
  labels.append(L)

print(len(splits))
print(len(labels))
labels[:2]

21
21


['[0, 10000]', '[10000, 20000]']

In [36]:
# Divide the salary column using the ranges created above
from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import array, col, lit

bucketizer = Bucketizer(splits=splits, inputCol="ConvertedComp", outputCol="label")

filt_data_clf = bucketizer.transform(filt_data)


label_array = array(*(lit(label) for label in labels))

filt_data_clf = filt_data_clf.withColumn(
    "Salary_Ranges", label_array.getItem(col("label").cast("integer")))

filt_data_clf.select(["ConvertedComp","Salary_Ranges", "label"]).show(10, False)


+-------------+----------------+-----+
|ConvertedComp|Salary_Ranges   |label|
+-------------+----------------+-----+
|77556.0      |[70000, 80000]  |7.0  |
|74970.0      |[70000, 80000]  |7.0  |
|37816.0      |[30000, 40000]  |3.0  |
|48644.0      |[40000, 50000]  |4.0  |
|130000.0     |[130000, 140000]|13.0 |
|94539.0      |[90000, 100000] |9.0  |
|48644.0      |[40000, 50000]  |4.0  |
|54049.0      |[50000, 60000]  |5.0  |
|171000.0     |[170000, 180000]|17.0 |
|125000.0     |[120000, 130000]|12.0 |
+-------------+----------------+-----+
only showing top 10 rows



In [37]:
filt_data.count(), len(filt_data.columns)

(3466, 10)

In [38]:
# Define the inputs and output in the classification problem
# i.e. choosing the features columns and the target column
from pyspark.ml.feature import VectorAssembler
inputcols = ["Year", "Age",  "Age1stCode", "YearsCode", "WorkWeekHrs"]
assembler = VectorAssembler(inputCols= inputcols, outputCol = "features")

predictors = assembler.transform(filt_data_clf)

model_data = predictors.select("features", "label")
model_data.show(5,truncate=False)

+----------------------------+-----+
|features                    |label|
+----------------------------+-----+
|[2020.0,34.0,30.0,4.0,40.0] |7.0  |
|[2020.0,53.0,10.0,43.0,40.0]|7.0  |
|[2020.0,28.0,15.0,5.0,40.0] |3.0  |
|[2020.0,28.0,9.0,15.0,40.0] |4.0  |
|[2020.0,26.0,17.0,4.0,45.0] |13.0 |
+----------------------------+-----+
only showing top 5 rows



In [39]:
# Divide the dataset into training 80% and testing 20%
train_data,test_data = model_data.randomSplit([0.8,0.2])

**Train Decision Tree as Classifier**

In [48]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Train a DecisionTree model.
dt = DecisionTreeClassifier(featuresCol="features", labelCol="label")

# Train model.
model = dt.fit(train_data)

# Make predictions.
predictions = model.transform(test_data)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print("Test Accuracy = %g " % accuracy)
print("Test Error = %g " % (1.0 - accuracy))

Test Accuracy = 0.154286 
Test Error = 0.845714 


In [49]:
predictions.show(5)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[2020.0,19.0,14.0...|  1.0|[0.0,3.0,10.0,7.0...|[0.0,0.0625,0.208...|       2.0|
|[2020.0,22.0,11.0...|  3.0|[0.0,0.0,0.0,5.0,...|[0.0,0.0,0.0,0.09...|       7.0|
|[2020.0,22.0,14.0...|  6.0|[0.0,3.0,16.0,6.0...|[0.0,0.0181818181...|       4.0|
|[2020.0,22.0,14.0...|  9.0|[0.0,2.0,2.0,5.0,...|[0.0,0.0277777777...|       7.0|
|[2020.0,23.0,10.0...|  3.0|[0.0,0.0,0.0,5.0,...|[0.0,0.0,0.0,0.09...|       7.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 5 rows



**Train Random Forest as Classifier**

In [50]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Train a DecisionTree model.
rf = RandomForestClassifier(featuresCol="features", labelCol="label")

# Train model. 
model = rf.fit(train_data)

# Make predictions.
predictions = model.transform(test_data)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print("Test Accuracy = %g " % accuracy)
print("Test Error = %g " % (1.0 - accuracy))

Test Accuracy = 0.154286 
Test Error = 0.845714 


In [51]:
predictions.show(5)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[2020.0,19.0,14.0...|  1.0|[0.74191597437101...|[0.03709579871855...|       3.0|
|[2020.0,22.0,11.0...|  3.0|[0.05776707559516...|[0.00288835377975...|       7.0|
|[2020.0,22.0,14.0...|  6.0|[0.03832162717932...|[0.00191608135896...|       6.0|
|[2020.0,22.0,14.0...|  9.0|[0.03637050512300...|[0.00181852525615...|       7.0|
|[2020.0,23.0,10.0...|  3.0|[0.03487198419515...|[0.00174359920975...|       7.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 5 rows



**Train Naive Bayes as Classifier**

In [52]:
from pyspark.ml.classification import NaiveBayes
# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
model = nb.fit(train_data)

# select example rows to display.
predictions = model.transform(test_data)

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.12285714285714286


In [53]:
predictions.show(5)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[2020.0,19.0,14.0...|  1.0|[-395.62876661755...|[0.00276015221005...|       4.0|
|[2020.0,22.0,11.0...|  3.0|[-463.80278864731...|[0.00804233416249...|       6.0|
|[2020.0,22.0,14.0...|  6.0|[-463.48149424064...|[0.00467291326608...|       4.0|
|[2020.0,22.0,14.0...|  9.0|[-503.01238729143...|[0.00480054735755...|       5.0|
|[2020.0,23.0,10.0...|  3.0|[-477.47241382341...|[0.00968047785031...|       6.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 5 rows



**MLP as Classifier**

In [56]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [5, 5, 4, 20]

# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# train the model
model = trainer.fit(train_data)

# compute accuracy on the test set
result = model.transform(test_data)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

Test set accuracy = 0.11571428571428571


#Results
The classification results are not good. It can be better with choosing more features as classifiers inputs and play with the data query to have better dataset with better ranges.

#Regression Problem

In [59]:
# For regression
# Determine the features will be used as inputs to the regression model

from pyspark.ml.feature import VectorAssembler
inputcols = ["Year", "Age",  "Age1stCode", "YearsCode", "WorkWeekHrs"]
assembler = VectorAssembler(inputCols= inputcols,
                            outputCol = "predictors")

predictors = assembler.transform(filt_data)
# predictors.columns

model_data = predictors.select("predictors", "ConvertedComp")
model_data.show(5,truncate=False)


+----------------------------+-------------+
|predictors                  |ConvertedComp|
+----------------------------+-------------+
|[2020.0,34.0,30.0,4.0,40.0] |77556.0      |
|[2020.0,53.0,10.0,43.0,40.0]|74970.0      |
|[2020.0,28.0,15.0,5.0,40.0] |37816.0      |
|[2020.0,28.0,9.0,15.0,40.0] |48644.0      |
|[2020.0,26.0,17.0,4.0,45.0] |130000.0     |
+----------------------------+-------------+
only showing top 5 rows



In [60]:
# Splite the dataset into train and test
train_data,test_data = model_data.randomSplit([0.8,0.2])

**Linear Regression**

In [61]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(
    featuresCol = 'predictors', 
    labelCol = 'ConvertedComp')
lrModel = lr.fit(train_data)
pred = lrModel.evaluate(test_data)

In [62]:
lrModel.coefficients

DenseVector([-892.3027, 1685.5009, -188.4823, 78.8812, 1105.705])

In [63]:
lrModel.intercept

1790777.9553167804

In [64]:
pred.predictions.show(5)

+--------------------+-------------+-----------------+
|          predictors|ConvertedComp|       prediction|
+--------------------+-------------+-----------------+
|[2020.0,21.0,20.0...|      50000.0|69787.88251323183|
|[2020.0,22.0,19.0...|      85000.0|66291.10294897179|
|[2020.0,23.0,14.0...|      85000.0|69550.06514861202|
|[2020.0,23.0,18.0...|      19458.0|68401.72981504374|
|[2020.0,23.0,19.0...|      65000.0|73584.01030155248|
+--------------------+-------------+-----------------+
only showing top 5 rows



In [65]:
# Evaluator to evaluate the model performance
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(
    labelCol="ConvertedComp", 
    predictionCol="prediction", 
    metricName="rmse")

rmse = eval.evaluate(pred.predictions)
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})

print(rmse)
print(mse)
print(mae)
print(r2)

39245.70587853254
1540225429.9042838
31681.923064999613
0.10505963649537908


**Decision Tree as Regression Model**

In [66]:
from pyspark.ml.regression import DecisionTreeRegressor

#  Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="predictors",labelCol='ConvertedComp')

dtModel = dt.fit(train_data)
# Make predictions.
predictions = dtModel.transform(test_data)

In [67]:
eval = RegressionEvaluator(
    labelCol="ConvertedComp", 
    predictionCol="prediction", 
    metricName="rmse")

rmse = eval.evaluate(predictions)
mse = eval.evaluate(predictions, {eval.metricName: "mse"})
mae = eval.evaluate(predictions, {eval.metricName: "mae"})
r2 = eval.evaluate(predictions, {eval.metricName: "r2"})

print(rmse)
print(mse)
print(mae)
print(r2)

38414.009675299625
1475636139.3340135
30371.89615498184
0.14258892413027768


**Random Forest as Regression Model**

In [68]:
from pyspark.ml.regression import RandomForestRegressor

#  Train a DecisionTree model.
rf = RandomForestRegressor(featuresCol="predictors",labelCol='ConvertedComp')

rfModel = rf.fit(train_data)
# Make predictions.
predictions = rfModel.transform(test_data)

In [69]:
rmse = eval.evaluate(predictions)
mse = eval.evaluate(predictions, {eval.metricName: "mse"})
mae = eval.evaluate(predictions, {eval.metricName: "mae"})
r2 = eval.evaluate(predictions, {eval.metricName: "r2"})

print(rmse)
print(mse)
print(mae)
print(r2)

38169.53384377282
1456913313.8509188
30464.11204935421
0.15346772921838459


**GBTRegressor**

In [71]:
from pyspark.ml.regression import GBTRegressor

#  Train a DecisionTree model.
gbt = GBTRegressor(featuresCol="predictors",labelCol='ConvertedComp')

gbtModel = gbt.fit(train_data)
# Make predictions.
predictions = gbtModel.transform(test_data)

In [72]:
rmse = eval.evaluate(predictions)
mse = eval.evaluate(predictions, {eval.metricName: "mse"})
mae = eval.evaluate(predictions, {eval.metricName: "mae"})
r2 = eval.evaluate(predictions, {eval.metricName: "r2"})

print(rmse)
print(mse)
print(mae)
print(r2)

37506.27359098783
1406720558.6820314
29577.593525606662
0.18263198120644208


#Results
Similar to the classification problem, the results are not good. Again we might need to add more features and choose better ranges of the columns values.