In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName('Practice').getOrCreate()


In [2]:
path = "hdfs://localhost:9000/tips.csv"

In [3]:
df=spark.read.format("csv").option("header",True).option("seperator",",").load(path)
    

In [4]:
df.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77|   2|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26|   5|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43|   3|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [5]:
df.printSchema()

root
 |-- total_bill: string (nullable = true)
 |-- tip: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: string (nullable = true)



In [6]:
# df.columns
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,BooleanType,DateType,IntegerType,DoubleType
df = df.withColumn("total_bill",col("total_bill").cast(DoubleType()))
df = df.withColumn("size",col("size").cast(IntegerType()))
df = df.withColumn("tip",col("tip").cast(DoubleType()))


In [7]:
df.columns

['total_bill', 'tip', 'sex', 'smoker', 'day', 'time', 'size']

In [8]:
### HANDLING CATEGORICAL FEATURES
from pyspark.ml.feature import StringIndexer

In [9]:
indexer = StringIndexer(inputCol="sex",outputCol="sex_indexed")
df_r =indexer.fit(df).transform(df)
df_r.show()

+----------+----+------+------+---+------+----+-----------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|
+----------+----+------+------+---+------+----+-----------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|        0.0|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|        0.0|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|        0.0|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|        0.0|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|        1.0|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|        0.0|
|     18.43| 3.0|  Male|    No|Sun|Dinne

In [10]:
indexer = StringIndexer(inputCols=["sex","smoker","day","time"],outputCols=["sex_index","smoker_index","day_index","time_index"])
df_r =indexer.fit(df_r).transform(df_r)


In [11]:
df_r.show()

+----------+----+------+------+---+------+----+-----------+------------+---------+----------+
|total_bill| tip|   sex|smoker|day|  time|size|sex_indexed|smoker_index|day_index|time_index|
+----------+----+------+------+---+------+----+-----------+------------+---------+----------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|        1.0|         0.0|      1.0|       0.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|        0.0|         0.0|      1.0|       0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|        0.0|         0.0|      1.0|       0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|        0.0|         0.0|      1.0|       0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|        1.0|         0.0|      1.0|       0.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|        0.0|         0.0|      1.0|       0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|        0.0|         0.0|      1.0|       0.0|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|        0.0| 

In [12]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=['tip','size','sex_indexed','smoker_index','day_index','time_index'],outputCol='Independent Features')

In [13]:
output = featureassembler.transform(df_r)

In [14]:
output.select('Independent Features').show()

+--------------------+
|Independent Features|
+--------------------+
|[1.01,2.0,1.0,0.0...|
|[1.66,3.0,0.0,0.0...|
|[3.5,3.0,0.0,0.0,...|
|[3.31,2.0,0.0,0.0...|
|[3.61,4.0,1.0,0.0...|
|[4.71,4.0,0.0,0.0...|
|[2.0,2.0,0.0,0.0,...|
|[3.12,4.0,0.0,0.0...|
|[1.96,2.0,0.0,0.0...|
|[3.23,2.0,0.0,0.0...|
|[1.71,2.0,0.0,0.0...|
|[5.0,4.0,1.0,0.0,...|
|[1.57,2.0,0.0,0.0...|
|[3.0,4.0,0.0,0.0,...|
|[3.02,2.0,1.0,0.0...|
|[3.92,2.0,0.0,0.0...|
|[1.67,3.0,1.0,0.0...|
|[3.71,3.0,0.0,0.0...|
|[3.5,3.0,1.0,0.0,...|
|(6,[0,1],[3.35,3.0])|
+--------------------+
only showing top 20 rows



In [15]:
finaldata=output.select("Independent Features","total_bill")
finaldata.show()

+--------------------+----------+
|Independent Features|total_bill|
+--------------------+----------+
|[1.01,2.0,1.0,0.0...|     16.99|
|[1.66,3.0,0.0,0.0...|     10.34|
|[3.5,3.0,0.0,0.0,...|     21.01|
|[3.31,2.0,0.0,0.0...|     23.68|
|[3.61,4.0,1.0,0.0...|     24.59|
|[4.71,4.0,0.0,0.0...|     25.29|
|[2.0,2.0,0.0,0.0,...|      8.77|
|[3.12,4.0,0.0,0.0...|     26.88|
|[1.96,2.0,0.0,0.0...|     15.04|
|[3.23,2.0,0.0,0.0...|     14.78|
|[1.71,2.0,0.0,0.0...|     10.27|
|[5.0,4.0,1.0,0.0,...|     35.26|
|[1.57,2.0,0.0,0.0...|     15.42|
|[3.0,4.0,0.0,0.0,...|     18.43|
|[3.02,2.0,1.0,0.0...|     14.83|
|[3.92,2.0,0.0,0.0...|     21.58|
|[1.67,3.0,1.0,0.0...|     10.33|
|[3.71,3.0,0.0,0.0...|     16.29|
|[3.5,3.0,1.0,0.0,...|     16.97|
|(6,[0,1],[3.35,3.0])|     20.65|
+--------------------+----------+
only showing top 20 rows



In [16]:
from pyspark.ml.regression import LinearRegression
train_data,test_data=finaldata.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol='Independent Features',labelCol='total_bill')
model=regressor.fit(train_data)

In [17]:
### PREDICTIONS
pred_results=model.evaluate(test_data)
pred_results.predictions.show()

+--------------------+----------+------------------+
|Independent Features|total_bill|        prediction|
+--------------------+----------+------------------+
|(6,[0,1],[1.45,2.0])|      9.55| 13.34498405371758|
|(6,[0,1],[1.97,2.0])|     12.02|14.903790313821965|
|(6,[0,1],[2.01,2.0])|     20.23|15.023698487676148|
| (6,[0,1],[2.5,4.0])|     18.35|23.571782708547495|
| (6,[0,1],[3.0,2.0])|      14.0| 17.99142579056719|
|(6,[0,1],[3.18,2.0])|     19.82| 18.53101257291102|
|(6,[0,1],[3.35,3.0])|     20.65|22.580226857370096|
|(6,[0,1],[4.08,2.0])|     17.92| 21.22894648463015|
|(6,[0,1],[5.92,3.0])|     29.03| 30.28432702750139|
|[1.0,2.0,0.0,1.0,...|      12.6|14.611984359594619|
|[1.0,2.0,1.0,1.0,...|      5.75|12.809450850251162|
|[1.1,2.0,1.0,1.0,...|      12.9|13.499954753048376|
|[1.25,2.0,1.0,0.0...|      8.51| 9.839772956866799|
|[1.44,2.0,0.0,1.0...|      7.74|15.930974271990639|
|[1.5,2.0,0.0,0.0,...|     12.46|13.104135802873554|
|[1.5,2.0,0.0,1.0,...|     11.59|16.1108365327

In [26]:
model_path = "hdfs://localhost:9000/models/linear_regression_model"
model.save(model_path)


In [31]:
from pyspark.ml.regression import LinearRegressionModel
model_path ="hdfs://localhost:9000/models/linear_regression_model"
loaded_model = LinearRegressionModel.load(model_path)


In [153]:
path = "hdfs://localhost:9000/random_tips_data.csv"
from pyspark.sql.functions import monotonically_increasing_id, row_number

randomtips=spark.read.format("csv").option("header",True).option("seperator",",").load(path)


# df.columns
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,BooleanType,DateType,IntegerType,DoubleType
randomtips = randomtips.withColumn("total_bill",col("total_bill").cast(DoubleType()))
randomtips = randomtips.withColumn("size",col("size").cast(IntegerType()))
randomtips = randomtips.withColumn("tip",col("tip").cast(DoubleType()))
randomtips.printSchema()
randomtips.show()

root
 |-- total_bill: double (nullable = true)
 |-- tip: double (nullable = true)
 |-- sex: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- day: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     28.79|3.23|Female|    No|Thu| Lunch|   6|
|     36.61|6.93|  Male|   Yes|Thu| Lunch|   4|
|     31.33|3.93|Female|    No|Mon|Dinner|   4|
|     28.61|2.46|  Male|   Yes|Fri| Lunch|   4|
|     22.91|1.26|  Male|   Yes|Fri|Dinner|   1|
|     33.36|2.12|Female|   Yes|Fri|Dinner|   1|
|     23.57|4.38|Female|    No|Sun|Dinner|   6|
|     44.91|6.32|  Male|   Yes|Sat|Dinner|   5|
|     48.29| 7.6|  Male|    No|Sun|Dinner|   3|
|     21.02|4.82|Female|    No|Sun|Dinner|   3|
|     40.21|9.97|Female|   Yes|Sat| Lunch|   2|
|     27.86| 2.6|Female|   Yes|Sat| Lunch|   5|
|      29.7|5.42

In [154]:
randomtips = randomtips.withColumn("id", monotonically_increasing_id())


In [155]:
randomtips.show()

+----------+----+------+------+---+------+----+---+
|total_bill| tip|   sex|smoker|day|  time|size| id|
+----------+----+------+------+---+------+----+---+
|     28.79|3.23|Female|    No|Thu| Lunch|   6|  0|
|     36.61|6.93|  Male|   Yes|Thu| Lunch|   4|  1|
|     31.33|3.93|Female|    No|Mon|Dinner|   4|  2|
|     28.61|2.46|  Male|   Yes|Fri| Lunch|   4|  3|
|     22.91|1.26|  Male|   Yes|Fri|Dinner|   1|  4|
|     33.36|2.12|Female|   Yes|Fri|Dinner|   1|  5|
|     23.57|4.38|Female|    No|Sun|Dinner|   6|  6|
|     44.91|6.32|  Male|   Yes|Sat|Dinner|   5|  7|
|     48.29| 7.6|  Male|    No|Sun|Dinner|   3|  8|
|     21.02|4.82|Female|    No|Sun|Dinner|   3|  9|
|     40.21|9.97|Female|   Yes|Sat| Lunch|   2| 10|
|     27.86| 2.6|Female|   Yes|Sat| Lunch|   5| 11|
|      29.7|5.42|  Male|   Yes|Wed|Dinner|   1| 12|
|      46.5|4.77|  Male|    No|Thu| Lunch|   1| 13|
|      6.34|0.34|  Male|   Yes|Thu| Lunch|   4| 14|
|       7.1|1.43|  Male|    No|Tue|Dinner|   5| 15|
|      3.95|

In [156]:
from pyspark.ml.feature import VectorAssembler
indexer = StringIndexer(inputCols=["sex","smoker","day","time"],outputCols=["sex_index","smoker_index","day_index","time_index"])
randomtips2=indexer.fit(randomtips).transform(randomtips)
featureassembler=VectorAssembler(inputCols=['tip','size','sex_index','smoker_index','day_index','time_index'],outputCol='Independent Features')
output = featureassembler.transform(randomtips2)
finalrandomtips2=output.select("Independent Features","total_bill")
finalrandomtips2.show()

+--------------------+----------+
|Independent Features|total_bill|
+--------------------+----------+
|[3.23,6.0,1.0,1.0...|     28.79|
|[6.93,4.0,0.0,0.0...|     36.61|
|[3.93,4.0,1.0,1.0...|     31.33|
|[2.46,4.0,0.0,0.0...|     28.61|
|(6,[0,1],[1.26,1.0])|     22.91|
|[2.12,1.0,1.0,0.0...|     33.36|
|[4.38,6.0,1.0,1.0...|     23.57|
|[6.32,5.0,0.0,0.0...|     44.91|
|[7.6,3.0,0.0,1.0,...|     48.29|
|[4.82,3.0,1.0,1.0...|     21.02|
|[9.97,2.0,1.0,0.0...|     40.21|
|[2.6,5.0,1.0,0.0,...|     27.86|
|[5.42,1.0,0.0,0.0...|      29.7|
|[4.77,1.0,0.0,1.0...|      46.5|
|[0.34,4.0,0.0,0.0...|      6.34|
|[1.43,5.0,0.0,1.0...|       7.1|
|[0.45,5.0,0.0,1.0...|      3.95|
|[5.34,1.0,1.0,1.0...|     42.13|
|[6.63,5.0,0.0,0.0...|     39.57|
|[9.49,4.0,0.0,1.0...|     43.89|
+--------------------+----------+
only showing top 20 rows



In [157]:
from pyspark.ml.regression import LinearRegressionModel
model_path ="hdfs://localhost:9000/models/linear_regression_model"
loaded_model = LinearRegressionModel.load(model_path)


In [158]:
predictions = loaded_model.transform(finalrandomtips2)
predictions.show()

+--------------------+----------+------------------+
|Independent Features|total_bill|        prediction|
+--------------------+----------+------------------+
|[3.23,6.0,1.0,1.0...|     28.79| 32.02863504915167|
|[6.93,4.0,0.0,0.0...|     36.61| 34.83676481895115|
|[3.93,4.0,1.0,1.0...|     31.33|28.802178165614954|
|[2.46,4.0,0.0,0.0...|     28.61|22.218493327069652|
|(6,[0,1],[1.26,1.0])|     22.91|  9.23581568233141|
|[2.12,1.0,1.0,0.0...|     33.36|10.402041379014653|
|[4.38,6.0,1.0,1.0...|     23.57|36.969865233857604|
|[6.32,5.0,0.0,0.0...|     44.91|38.171884389039064|
|[7.6,3.0,0.0,1.0,...|     48.29|37.415459633564694|
|[4.82,3.0,1.0,1.0...|     21.02|27.670041509517237|
|[9.97,2.0,1.0,0.0...|     40.21| 35.84951036769156|
|[2.6,5.0,1.0,0.0,...|     27.86|24.375242971794638|
|[5.42,1.0,0.0,0.0...|      29.7|21.576021273779247|
|[4.77,1.0,0.0,1.0...|      46.5|20.358877055825456|
|[0.34,4.0,0.0,0.0...|      6.34|15.081893176474415|
|[1.43,5.0,0.0,1.0...|       7.1| 25.868588418

In [159]:
randomtips.show()

+----------+----+------+------+---+------+----+---+
|total_bill| tip|   sex|smoker|day|  time|size| id|
+----------+----+------+------+---+------+----+---+
|     28.79|3.23|Female|    No|Thu| Lunch|   6|  0|
|     36.61|6.93|  Male|   Yes|Thu| Lunch|   4|  1|
|     31.33|3.93|Female|    No|Mon|Dinner|   4|  2|
|     28.61|2.46|  Male|   Yes|Fri| Lunch|   4|  3|
|     22.91|1.26|  Male|   Yes|Fri|Dinner|   1|  4|
|     33.36|2.12|Female|   Yes|Fri|Dinner|   1|  5|
|     23.57|4.38|Female|    No|Sun|Dinner|   6|  6|
|     44.91|6.32|  Male|   Yes|Sat|Dinner|   5|  7|
|     48.29| 7.6|  Male|    No|Sun|Dinner|   3|  8|
|     21.02|4.82|Female|    No|Sun|Dinner|   3|  9|
|     40.21|9.97|Female|   Yes|Sat| Lunch|   2| 10|
|     27.86| 2.6|Female|   Yes|Sat| Lunch|   5| 11|
|      29.7|5.42|  Male|   Yes|Wed|Dinner|   1| 12|
|      46.5|4.77|  Male|    No|Thu| Lunch|   1| 13|
|      6.34|0.34|  Male|   Yes|Thu| Lunch|   4| 14|
|       7.1|1.43|  Male|    No|Tue|Dinner|   5| 15|
|      3.95|

In [160]:
predictions.select('prediction').show()

+------------------+
|        prediction|
+------------------+
| 32.02863504915167|
| 34.83676481895115|
|28.802178165614954|
|22.218493327069652|
|  9.23581568233141|
|10.402041379014653|
|36.969865233857604|
|38.171884389039064|
|37.415459633564694|
|27.670041509517237|
| 35.84951036769156|
|24.375242971794638|
|21.576021273779247|
|20.358877055825456|
|15.081893176474415|
| 25.86858841832723|
|21.697456951276077|
|22.540372146625778|
|39.361661715183494|
| 45.51758867551726|
+------------------+
only showing top 20 rows



In [161]:

#predictions = predictions.withColumn("id", monotonically_increasing_id())

In [162]:
import pyspark.sql.functions as f
randomtips.show()
test3 = randomtips.join(predictions,"total_bill","left")

+----------+----+------+------+---+------+----+---+
|total_bill| tip|   sex|smoker|day|  time|size| id|
+----------+----+------+------+---+------+----+---+
|     28.79|3.23|Female|    No|Thu| Lunch|   6|  0|
|     36.61|6.93|  Male|   Yes|Thu| Lunch|   4|  1|
|     31.33|3.93|Female|    No|Mon|Dinner|   4|  2|
|     28.61|2.46|  Male|   Yes|Fri| Lunch|   4|  3|
|     22.91|1.26|  Male|   Yes|Fri|Dinner|   1|  4|
|     33.36|2.12|Female|   Yes|Fri|Dinner|   1|  5|
|     23.57|4.38|Female|    No|Sun|Dinner|   6|  6|
|     44.91|6.32|  Male|   Yes|Sat|Dinner|   5|  7|
|     48.29| 7.6|  Male|    No|Sun|Dinner|   3|  8|
|     21.02|4.82|Female|    No|Sun|Dinner|   3|  9|
|     40.21|9.97|Female|   Yes|Sat| Lunch|   2| 10|
|     27.86| 2.6|Female|   Yes|Sat| Lunch|   5| 11|
|      29.7|5.42|  Male|   Yes|Wed|Dinner|   1| 12|
|      46.5|4.77|  Male|    No|Thu| Lunch|   1| 13|
|      6.34|0.34|  Male|   Yes|Thu| Lunch|   4| 14|
|       7.1|1.43|  Male|    No|Tue|Dinner|   5| 15|
|      3.95|

In [168]:
test3.show()
test4 = test3.select("id","total_bill","tip","sex","smoker","day","time","size","prediction")

+----------+----+------+------+---+------+----+---+--------------------+------------------+
|total_bill| tip|   sex|smoker|day|  time|size| id|Independent Features|        prediction|
+----------+----+------+------+---+------+----+---+--------------------+------------------+
|     28.79|3.23|Female|    No|Thu| Lunch|   6|  0|[3.23,6.0,1.0,1.0...| 32.02863504915167|
|     36.61|6.93|  Male|   Yes|Thu| Lunch|   4|  1|[6.93,4.0,0.0,0.0...| 34.83676481895115|
|     31.33|3.93|Female|    No|Mon|Dinner|   4|  2|[3.93,4.0,1.0,1.0...|28.802178165614954|
|     28.61|2.46|  Male|   Yes|Fri| Lunch|   4|  3|[2.46,4.0,0.0,0.0...|22.218493327069652|
|     22.91|1.26|  Male|   Yes|Fri|Dinner|   1|  4|(6,[0,1],[1.26,1.0])|  9.23581568233141|
|     33.36|2.12|Female|   Yes|Fri|Dinner|   1|  5|[2.12,1.0,1.0,0.0...|10.402041379014653|
|     23.57|4.38|Female|    No|Sun|Dinner|   6|  6|[4.38,6.0,1.0,1.0...|36.969865233857604|
|     44.91|6.32|  Male|   Yes|Sat|Dinner|   5|  7|[6.32,5.0,0.0,0.0...|38.17188

In [171]:
test4.write.option("header", "true").csv("hdfs://localhost:9000/randomtips.csv")
