## PySpark Data Preprocessing, Regression

In [1]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('practice').getOrCreate()
spark

In [2]:
df=spark.read.csv('tips.csv',header=True,inferSchema=True)
df.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [3]:
df.printSchema()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)



### Handle category features

In [4]:
#Import String Indexer to handle category features
from pyspark.ml.feature import StringIndexer

In [5]:
#perform ordinary encodeing for category colum
indexer=StringIndexer(inputCol='sex',outputCol='sex_indexed')
df_1=indexer.fit(df).transform(df)
df_1.show()

+----------+----+------+------+---+------+----+-----------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|
+----------+----+------+------+---+------+----+-----------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|        0.0|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|        0.0|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|        0.0|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|        0.0|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|        1.0|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|        0.0|
|     18.43| 3.0|  Male|    No|Sun|Dinne

In [6]:
#perform ordinary encodeing for multiple category columns
indexer=StringIndexer(inputCols=['sex','day','time'],outputCols=['sex_indexed','day_indexed','time_indexed'])
df_2=indexer.fit(df).transform(df)
df_2.show()

+----------+----+------+------+---+------+----+-----------+-----------+------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|day_indexed|time_indexed|
+----------+----+------+------+---+------+----+-----------+-----------+------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|        1.0|         0.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|        1.0|         0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|        1.0|         0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|        1.0|         0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|        1.0|         0.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|        1.0|         0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|        1.0|         0.0|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|        0.0|        1.0|         0.0|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|        0.0|      

In [7]:
#assemble indipendent feautres in to group using Vector Assembler
from pyspark.ml.feature import VectorAssembler
featureAssembler=VectorAssembler(inputCols=['tip','size','sex_indexed','day_indexed','time_indexed'],outputCol="Independent Features")
output=featureAssembler.transform(df_2)

In [9]:
output.show()

+----------+----+------+------+---+------+----+-----------+-----------+------------+--------------------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|day_indexed|time_indexed|Independent Features|
+----------+----+------+------+---+------+----+-----------+-----------+------------+--------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|        1.0|         0.0|[1.01,2.0,1.0,1.0...|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|        1.0|         0.0|[1.66,3.0,0.0,1.0...|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|        1.0|         0.0|[3.5,3.0,0.0,1.0,...|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|        1.0|         0.0|[3.31,2.0,0.0,1.0...|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|        1.0|         0.0|[3.61,4.0,1.0,1.0...|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|        1.0|         0.0|[4.71,4.0,0.0,1.0...|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2

In [10]:
final_df=output.select('Independent Features','total_bill')
final_df.show()

+--------------------+----------+
|Independent Features|total_bill|
+--------------------+----------+
|[1.01,2.0,1.0,1.0...|     16.99|
|[1.66,3.0,0.0,1.0...|     10.34|
|[3.5,3.0,0.0,1.0,...|     21.01|
|[3.31,2.0,0.0,1.0...|     23.68|
|[3.61,4.0,1.0,1.0...|     24.59|
|[4.71,4.0,0.0,1.0...|     25.29|
|[2.0,2.0,0.0,1.0,...|      8.77|
|[3.12,4.0,0.0,1.0...|     26.88|
|[1.96,2.0,0.0,1.0...|     15.04|
|[3.23,2.0,0.0,1.0...|     14.78|
|[1.71,2.0,0.0,1.0...|     10.27|
|[5.0,4.0,1.0,1.0,...|     35.26|
|[1.57,2.0,0.0,1.0...|     15.42|
|[3.0,4.0,0.0,1.0,...|     18.43|
|[3.02,2.0,1.0,1.0...|     14.83|
|[3.92,2.0,0.0,1.0...|     21.58|
|[1.67,3.0,1.0,1.0...|     10.33|
|[3.71,3.0,0.0,1.0...|     16.29|
|[3.5,3.0,1.0,1.0,...|     16.97|
|(5,[0,1],[3.35,3.0])|     20.65|
+--------------------+----------+
only showing top 20 rows



In [11]:
from pyspark.ml.regression import LinearRegression
train_data,test_data=final_df.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol='Independent Features',labelCol='total_bill')
regressor=regressor.fit(train_data)

In [12]:
regressor.coefficients

DenseVector([3.4436, 2.988, -1.4709, 0.198, -1.2901])

In [13]:
regressor.intercept

2.5772833793876204

In [14]:
pred_results=regressor.evaluate(test_data)
pred_results.predictions.show()



+--------------------+----------+------------------+
|Independent Features|total_bill|        prediction|
+--------------------+----------+------------------+
|(5,[0,1],[1.17,2.0])|     32.83|12.582175755278332|
|(5,[0,1],[1.47,2.0])|     10.77| 13.61524786537155|
|(5,[0,1],[1.64,2.0])|     15.36|14.200655394424377|
|(5,[0,1],[1.75,2.0])|     17.82|14.579448501458558|
|(5,[0,1],[1.97,2.0])|     12.02|15.337034715526919|
| (5,[0,1],[2.0,2.0])|     12.69|15.440341926536242|
| (5,[0,1],[2.0,3.0])|     16.31|18.428297499799818|
| (5,[0,1],[2.0,3.0])|     30.06|18.428297499799818|
|(5,[0,1],[2.09,2.0])|     15.01|15.750263559564207|
|(5,[0,1],[2.72,2.0])|     13.28| 17.91971499075997|
| (5,[0,1],[3.0,2.0])|      14.0|18.883915626846974|
| (5,[0,1],[3.0,4.0])|     38.01|24.859826773374127|
|(5,[0,1],[3.27,2.0])|     17.78|19.813680525930874|
|(5,[0,1],[3.39,2.0])|     11.61|20.226909369968165|
|(5,[0,1],[4.08,2.0])|     17.92|22.602975223182572|
| (5,[0,1],[5.0,3.0])|     31.27|28.7590186007

In [15]:
pred_results.meanAbsoluteError, pred_results.meanSquaredError

(4.207579924819375, 32.476560899589145)

In [16]:
pred_results.r2

0.448655053225852