In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Missing').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/01 19:00:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/01/01 19:00:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [14]:
# Read the dataset
training = spark.read.csv('test2.csv',header=True,inferSchema=True)

In [15]:
training.show()

+-------+---+----------+------+-----------+
|   Name|age|Experience|Salary|Departments|
+-------+---+----------+------+-----------+
|Michael| 25|        10|100000|       COAF|
|    Tom| 26|         8|150000| Commercial|
| Antony| 27|         4|175000|       Card|
| George| 28|         2|200000|     Retail|
|   Bill| 29|         5|250000|      Cyber|
|   Mark| 30|         3|300000|       Card|
|Patrick| 29|         4|350000|       COAF|
+-------+---+----------+------+-----------+



In [16]:
training.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Departments: string (nullable = true)



In [17]:
training.columns

['Name', 'age', 'Experience', 'Salary', 'Departments']

[Age,Experience] ---> new feature --> independent feature

In [18]:
from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols=["age","Experience"],outputCol="Independent Features")

In [19]:
output=featureassembler.transform(training)

In [20]:
output.show()

+-------+---+----------+------+-----------+--------------------+
|   Name|age|Experience|Salary|Departments|Independent Features|
+-------+---+----------+------+-----------+--------------------+
|Michael| 25|        10|100000|       COAF|         [25.0,10.0]|
|    Tom| 26|         8|150000| Commercial|          [26.0,8.0]|
| Antony| 27|         4|175000|       Card|          [27.0,4.0]|
| George| 28|         2|200000|     Retail|          [28.0,2.0]|
|   Bill| 29|         5|250000|      Cyber|          [29.0,5.0]|
|   Mark| 30|         3|300000|       Card|          [30.0,3.0]|
|Patrick| 29|         4|350000|       COAF|          [29.0,4.0]|
+-------+---+----------+------+-----------+--------------------+



In [21]:
output.columns

['Name', 'age', 'Experience', 'Salary', 'Departments', 'Independent Features']

In [22]:
finalized_data=output.select("Independent Features","Salary")

In [23]:
finalized_data.show()

+--------------------+------+
|Independent Features|Salary|
+--------------------+------+
|         [25.0,10.0]|100000|
|          [26.0,8.0]|150000|
|          [27.0,4.0]|175000|
|          [28.0,2.0]|200000|
|          [29.0,5.0]|250000|
|          [30.0,3.0]|300000|
|          [29.0,4.0]|350000|
+--------------------+------+



In [24]:
from pyspark.ml.regression import LinearRegression
# train test split
train_data,test_data=finalized_data.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol='Independent Features',labelCol='Salary')
regressor=regressor.fit(train_data)

24/01/01 20:10:49 WARN Instrumentation: [c5d050d7] regParam is zero, which might cause numerical instability and overfitting.
24/01/01 20:10:50 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/01/01 20:10:50 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
24/01/01 20:10:50 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [25]:
# Coefficients
regressor.coefficients

DenseVector([100000.0, 25000.0])

In [26]:
# Intercepts
regressor.intercept

-2650000.0000002002

In [28]:
# Prediction
pred_results=regressor.evaluate(test_data)

In [29]:
pred_results.predictions.show()

+--------------------+------+------------------+
|Independent Features|Salary|        prediction|
+--------------------+------+------------------+
|          [26.0,8.0]|150000|149999.99999999814|
|          [27.0,4.0]|175000|149999.99999999348|
|          [29.0,5.0]|250000| 375000.0000000098|
|          [30.0,3.0]|300000| 425000.0000000112|
+--------------------+------+------------------+



In [30]:
pred_results.meanAbsoluteError,pred_results.meanSquaredError

(68750.00000000733, 7968750000.00139)

In [31]:
df = spark.read.csv('tips.csv',header=True,inferSchema=True)

In [32]:
df.show()

+----------+------+------+------+---+------+----+
|total_bill|   tip|gender|smoker|day|  time|size|
+----------+------+------+------+---+------+----+
|     16.99|  1.01|Female|    No|Sun|Dinner|   2|
|     10.34|  1.66|  Male|    No|Sun|Dinner|   3|
|     21.01|   3.5|  Male|    No|Sun|Dinner|   3|
|    161.99| 12.01|Female|    No|Sun|Dinner|  12|
|    116.99| 11.01|Female|    No|Sun|Dinner|  22|
|   1246.99| 12.01|Female|    No|Sun|Dinner|  32|
|    169.99| 19.01|Female|    No|Sun|Dinner|  20|
|    106.99| 18.01|Female|    No|Sun|Dinner|  42|
|    196.99|123.01|Female|    No|Sun|Dinner|  52|
|    316.99| 14.01|Female|    No|Sun|Dinner|  25|
|     76.99|  7.01|Female|    No|Sun| Lunch|  62|
|     96.99|  9.01|Female|    No|Sat|Dinner|   2|
+----------+------+------+------+---+------+----+



In [33]:
df.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



In [34]:
df.columns

['total_bill', 'tip', 'gender', 'smoker', 'day', 'time', 'size']

In [35]:
# Handling Categorical Features
from pyspark.ml.feature import StringIndexer

In [36]:
df.show()

+----------+------+------+------+---+------+----+
|total_bill|   tip|gender|smoker|day|  time|size|
+----------+------+------+------+---+------+----+
|     16.99|  1.01|Female|    No|Sun|Dinner|   2|
|     10.34|  1.66|  Male|    No|Sun|Dinner|   3|
|     21.01|   3.5|  Male|    No|Sun|Dinner|   3|
|    161.99| 12.01|Female|    No|Sun|Dinner|  12|
|    116.99| 11.01|Female|    No|Sun|Dinner|  22|
|   1246.99| 12.01|Female|    No|Sun|Dinner|  32|
|    169.99| 19.01|Female|    No|Sun|Dinner|  20|
|    106.99| 18.01|Female|    No|Sun|Dinner|  42|
|    196.99|123.01|Female|    No|Sun|Dinner|  52|
|    316.99| 14.01|Female|    No|Sun|Dinner|  25|
|     76.99|  7.01|Female|    No|Sun| Lunch|  62|
|     96.99|  9.01|Female|    No|Sat|Dinner|   2|
+----------+------+------+------+---+------+----+



In [37]:
indexer=StringIndexer(inputCol="gender",outputCol="gender_indexed")
df_r=indexer.fit(df).transform(df)
df_r.show()

+----------+------+------+------+---+------+----+--------------+
|total_bill|   tip|gender|smoker|day|  time|size|gender_indexed|
+----------+------+------+------+---+------+----+--------------+
|     16.99|  1.01|Female|    No|Sun|Dinner|   2|           0.0|
|     10.34|  1.66|  Male|    No|Sun|Dinner|   3|           1.0|
|     21.01|   3.5|  Male|    No|Sun|Dinner|   3|           1.0|
|    161.99| 12.01|Female|    No|Sun|Dinner|  12|           0.0|
|    116.99| 11.01|Female|    No|Sun|Dinner|  22|           0.0|
|   1246.99| 12.01|Female|    No|Sun|Dinner|  32|           0.0|
|    169.99| 19.01|Female|    No|Sun|Dinner|  20|           0.0|
|    106.99| 18.01|Female|    No|Sun|Dinner|  42|           0.0|
|    196.99|123.01|Female|    No|Sun|Dinner|  52|           0.0|
|    316.99| 14.01|Female|    No|Sun|Dinner|  25|           0.0|
|     76.99|  7.01|Female|    No|Sun| Lunch|  62|           0.0|
|     96.99|  9.01|Female|    No|Sat|Dinner|   2|           0.0|
+----------+------+------

In [44]:
indexer=StringIndexer(inputCols=["gender","smoker","day","time"],
                      outputCols=["gender_indexed","smoker_indexed","day_indexed","time_indexed"])
df_r=indexer.fit(df).transform(df)
df_r.show()

+----------+------+------+------+---+------+----+--------------+--------------+-----------+------------+
|total_bill|   tip|gender|smoker|day|  time|size|gender_indexed|smoker_indexed|day_indexed|time_indexed|
+----------+------+------+------+---+------+----+--------------+--------------+-----------+------------+
|     16.99|  1.01|Female|    No|Sun|Dinner|   2|           0.0|           0.0|        0.0|         0.0|
|     10.34|  1.66|  Male|    No|Sun|Dinner|   3|           1.0|           0.0|        0.0|         0.0|
|     21.01|   3.5|  Male|    No|Sun|Dinner|   3|           1.0|           0.0|        0.0|         0.0|
|    161.99| 12.01|Female|    No|Sun|Dinner|  12|           0.0|           0.0|        0.0|         0.0|
|    116.99| 11.01|Female|    No|Sun|Dinner|  22|           0.0|           0.0|        0.0|         0.0|
|   1246.99| 12.01|Female|    No|Sun|Dinner|  32|           0.0|           0.0|        0.0|         0.0|
|    169.99| 19.01|Female|    No|Sun|Dinner|  20|      

In [45]:
df_r.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)
 |-- gender_indexed: double (nullable = false)
 |-- smoker_indexed: double (nullable = false)
 |-- day_indexed: double (nullable = false)
 |-- time_indexed: double (nullable = false)



In [46]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(
    inputCols=['tip','size','gender_indexed','smoker_indexed',
               'day_indexed','time_indexed'],
    outputCol="Independent Features")
output=featureassembler.transform(df_r)

In [48]:
output.select('Independent Features').show()

+--------------------+
|Independent Features|
+--------------------+
|(6,[0,1],[1.01,2.0])|
|[1.66,3.0,1.0,0.0...|
|[3.5,3.0,1.0,0.0,...|
|(6,[0,1],[12.01,1...|
|(6,[0,1],[11.01,2...|
|(6,[0,1],[12.01,3...|
|(6,[0,1],[19.01,2...|
|(6,[0,1],[18.01,4...|
|(6,[0,1],[123.01,...|
|(6,[0,1],[14.01,2...|
|[7.01,62.0,0.0,0....|
|[9.01,2.0,0.0,0.0...|
+--------------------+



In [50]:
finalized_data=output.select('Independent Features','total_bill')

In [51]:
finalized_data.show()

+--------------------+----------+
|Independent Features|total_bill|
+--------------------+----------+
|(6,[0,1],[1.01,2.0])|     16.99|
|[1.66,3.0,1.0,0.0...|     10.34|
|[3.5,3.0,1.0,0.0,...|     21.01|
|(6,[0,1],[12.01,1...|    161.99|
|(6,[0,1],[11.01,2...|    116.99|
|(6,[0,1],[12.01,3...|   1246.99|
|(6,[0,1],[19.01,2...|    169.99|
|(6,[0,1],[18.01,4...|    106.99|
|(6,[0,1],[123.01,...|    196.99|
|(6,[0,1],[14.01,2...|    316.99|
|[7.01,62.0,0.0,0....|     76.99|
|[9.01,2.0,0.0,0.0...|     96.99|
+--------------------+----------+



In [53]:
from pyspark.ml.regression import LinearRegression
# train test split
train_data,test_data=finalized_data.randomSplit([0.75,0.25], seed=42)
regressor = LinearRegression(featuresCol='Independent Features', labelCol='total_bill')
regressor=regressor.fit(train_data)


24/01/02 11:24:03 WARN Instrumentation: [e3a79c5f] regParam is zero, which might cause numerical instability and overfitting.
24/01/02 11:24:03 WARN Instrumentation: [e3a79c5f] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.


In [54]:
regressor.coefficients

DenseVector([-5.2701, 14.735, 0.0, 0.0, 53.8417, -860.7991])

In [55]:
regressor.intercept

61.1614443202161

In [56]:
# Predictions
pred_results = regressor.evaluate(test_data)

In [57]:
# Final Comparison
pred_results.predictions.show()

+--------------------+----------+------------------+
|Independent Features|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,1],[12.01,1...|    161.99|174.68824694045358|
|(6,[0,1],[19.01,2...|    169.99|255.67796947930884|
|[1.66,3.0,1.0,0.0...|     10.34| 96.61819215736222|
|[3.5,3.0,1.0,0.0,...|     21.01| 86.92129322623401|
+--------------------+----------+------------------+



In [60]:
# Performance Metrics 
pred_results.r2,pred_results.meanAbsoluteError,pred_results.meanSquaredError

(0.14951932151497982, 62.64392545083966, 4822.974651386256)