In [1]:
from pyspark.sql import SparkSession
session = SparkSession.builder.appName('MLlib Intro').getOrCreate()

In [20]:
df = session.read.csv('dataset/modifiedBigdata.csv',header=True,inferSchema=True).limit(10000)
df.show(10)
df.count()
df.printSchema()
df.columns

+---+--------+---+-----+----+-----+----+-----+
|_c0|    date|day|month|year|store|item|sales|
+---+--------+---+-----+----+-----+----+-----+
|  0|02-01-13|  2|    1|  13|    1|   1|   11|
|  1|02-02-13|  2|    2|  13|    1|   1|   21|
|  2|02-03-13|  2|    3|  13|    1|   1|   15|
|  3|02-04-13|  2|    4|  13|    1|   1|   14|
|  4|02-05-13|  2|    5|  13|    1|   1|    9|
|  5|02-06-13|  2|    6|  13|    1|   1|   10|
|  6|02-07-13|  2|    7|  13|    1|   1|   13|
|  7|02-08-13|  2|    8|  13|    1|   1|   11|
|  8|02-09-13|  2|    9|  13|    1|   1|   14|
|  9|02-10-13|  2|   10|  13|    1|   1|   11|
+---+--------+---+-----+----+-----+----+-----+
only showing top 10 rows

root
 |-- _c0: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- store: integer (nullable = true)
 |-- item: integer (nullable = true)
 |-- sales: integer (nullable = true)



['_c0', 'date', 'day', 'month', 'year', 'store', 'item', 'sales']

## Data value analysis

In [18]:
## year magnitude is very bigger than other feature's magnitude. so we need to normalized them
max_year = df.agg({'year':'max'}).collect()
print(max_year[0][0]) #to get value
max_month = df.agg({'month':"max"}).collect()
print(max_month[0][0])
max_sell = df.agg({'sales':'max'}).collect()
print(max_sell[0][0])

17
12
58


In [None]:
## Data nomazitaion
df.withColumn('nor')

## at frist we need to create vector of each row befor train ML 

In [39]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['day', 'month', 'year', 'store', 'item',],outputCol='Independent Features')
output = assembler.transform(df) 
output.select('Independent Features').show()

+--------------------+
|Independent Features|
+--------------------+
|[2.0,1.0,13.0,1.0...|
|[2.0,2.0,13.0,1.0...|
|[2.0,3.0,13.0,1.0...|
|[2.0,4.0,13.0,1.0...|
|[2.0,5.0,13.0,1.0...|
|[2.0,6.0,13.0,1.0...|
|[2.0,7.0,13.0,1.0...|
|[2.0,8.0,13.0,1.0...|
|[2.0,9.0,13.0,1.0...|
|[2.0,10.0,13.0,1....|
|[2.0,11.0,13.0,1....|
|[2.0,12.0,13.0,1....|
|[13.0,2.0,13.0,1....|
|[14.0,2.0,13.0,1....|
|[15.0,2.0,13.0,1....|
|[16.0,2.0,13.0,1....|
|[17.0,2.0,13.0,1....|
|[18.0,2.0,13.0,1....|
|[19.0,2.0,13.0,1....|
|[20.0,2.0,13.0,1....|
+--------------------+
only showing top 20 rows



10000

## data normalize by  minmax scaler

In [43]:
from pyspark.ml.feature import MinMaxScaler
scaler = MinMaxScaler(inputCol='Independent Features',outputCol='Scaled Independent')
scaler_output = scaler.fit(output)
scaler_output = scaler_output.transform(output)
scaler_output.select('Scaled Independent').show(10,truncate=False)
scaler_output.count()

+-----------------------------------------------------+
|Scaled Independent                                   |
+-----------------------------------------------------+
|(5,[0,4],[0.03333333333333333,0.5])                  |
|[0.03333333333333333,0.09090909090909091,0.0,0.0,0.5]|
|[0.03333333333333333,0.18181818181818182,0.0,0.0,0.5]|
|[0.03333333333333333,0.2727272727272727,0.0,0.0,0.5] |
|[0.03333333333333333,0.36363636363636365,0.0,0.0,0.5]|
|[0.03333333333333333,0.4545454545454546,0.0,0.0,0.5] |
|[0.03333333333333333,0.5454545454545454,0.0,0.0,0.5] |
|[0.03333333333333333,0.6363636363636364,0.0,0.0,0.5] |
|[0.03333333333333333,0.7272727272727273,0.0,0.0,0.5] |
|[0.03333333333333333,0.8181818181818182,0.0,0.0,0.5] |
+-----------------------------------------------------+
only showing top 10 rows



10000

In [44]:
final_df = scaler_output.select('Scaled Independent','sales')
final_df.show()

+--------------------+-----+
|  Scaled Independent|sales|
+--------------------+-----+
|(5,[0,4],[0.03333...|   11|
|[0.03333333333333...|   21|
|[0.03333333333333...|   15|
|[0.03333333333333...|   14|
|[0.03333333333333...|    9|
|[0.03333333333333...|   10|
|[0.03333333333333...|   13|
|[0.03333333333333...|   11|
|[0.03333333333333...|   14|
|[0.03333333333333...|   11|
|[0.03333333333333...|   16|
|[0.03333333333333...|   11|
|[0.4,0.0909090909...|   14|
|[0.43333333333333...|   10|
|[0.46666666666666...|   11|
|[0.5,0.0909090909...|    7|
|[0.53333333333333...|   11|
|[0.56666666666666...|   10|
|[0.6,0.0909090909...|   10|
|[0.63333333333333...|    7|
+--------------------+-----+
only showing top 20 rows



## apply ML algo

In [45]:
from pyspark.ml.regression import LinearRegression
train_data,test_data = final_df.randomSplit([0.75,0.25])
reg = LinearRegression(featuresCol='Scaled Independent',labelCol='sales')
reg = reg.fit(train_data)

In [46]:
reg.coefficients

DenseVector([0.5716, 2.3129, 6.7902, -6.6832, 0.0])

In [47]:
reg.intercept

20.346033325486363

In [48]:
## prediction
pred = reg.evaluate(test_data)

In [49]:
pred.predictions.show()

+--------------------+-----+------------------+
|  Scaled Independent|sales|        prediction|
+--------------------+-----+------------------+
|(5,[0,4],[0.26666...|   25| 20.49846961388405|
| (5,[3,4],[0.8,0.5])|   11|14.999466397248295|
|[0.0,0.0,0.25,0.2...|   19|20.706949258082894|
|[0.0,0.0,0.25,1.0...|   13|15.360382329844825|
|[0.0,0.0,0.5,0.4,...|   15|21.067865190679427|
|[0.0,0.0,0.5,0.60...|   20| 19.73122345861991|
|[0.0,0.0,0.75,0.4...|   23|22.765422855335476|
|[0.0,0.0,0.75,0.6...|   17|21.428781123275957|
|[0.0,0.0,1.0,0.4,...|   20|24.462980519991525|
|[0.0,0.0909090909...|    8|17.883015424504954|
|[0.0,0.0909090909...|   12| 16.54637369244544|
|[0.0,0.0909090909...|   17|19.580573089161003|
|[0.0,0.0909090909...|    8| 16.90728962504197|
|[0.0,0.0909090909...|   17| 22.61477248587657|
|[0.0,0.0909090909...|   16|21.278130753817052|
|[0.0,0.0909090909...|   20|23.336604351069635|
|[0.0,0.0909090909...|    5|21.999962619010116|
|[0.0,0.1818181818...|   16|19.790838652

## performance matrix



In [50]:
pred.r2, pred.meanAbsoluteError,pred.meanSquaredError

(0.1651134948649542, 5.970780626976752, 57.37820653111907)