In [1]:
from pyspark.sql import SparkSession
ss = SparkSession.builder.appName('Abhijit').getOrCreate()

##Loading Data

In [3]:
bfData = sc.textFile('dbfs:/FileStore/tables/BlackFriday.csv')
bfData.cache()
bfData.count()

In [4]:
bfData.take(5)

## Filtering first line out.

In [6]:
dataLines = bfData.filter(lambda x: "User_ID" not in x)
dataLines.take(5)

### Function to convert null values to 0 in the Rdd. (Column 9 and 10)

In [8]:
def changefromnull(autoStr):
  attList = autoStr.split(",")
  if not (attList[9].isdigit()) and not (attList[10].isdigit()):
    attList[9] = 0
    attList[10] = 0
    return attList
  if not (attList[9].isdigit()):
    attList[9] = 0
    attList[10] = int(attList[10])
    return attList
  if not (attList[10].isdigit()):
    attList[9] = int(attList[9])
    attList[10] = 0
    return attList
  else:
    attList[9] = int(attList[9])
    attList[10] = int(attList[10])
    return attList

In [9]:
dataLines1 = dataLines.map(lambda x: changefromnull(x))
dataLines1.take(10)

### Mapping rdd to rows

In [11]:
from pyspark.sql import Row
bfMap = dataLines1.map(lambda p: Row(User_ID = int(p[0]),\
                                  Gender = (p[2]),\
                                  Age = p[3],\
                                  Occupation = int(p[4]),
                                  City_Category = p[5],\
                                  Stay_Current_City_Years = p[6],\
                                  Marital_Status = int(p[7]),\
                                  Product_Category_1 = int(p[8]),\
                                  Product_Category_2 = (p[9]),\
                                  Product_Category_3 = (p[10]),\
                                  Purchase = float(p[11])
                                  ))

In [12]:
bfMap.take(4)

### Converting Rdd to a DataFrame

In [14]:
bfDf = ss.createDataFrame(bfMap)
bfDf.cache()


In [15]:
bfDf.show()

In [16]:
bfDf.cache()

### Indexing categorical data for Modelling.

In [18]:
from pyspark.ml.feature import StringIndexer
si = StringIndexer(inputCol = "City_Category", outputCol="IND_City")
si_model = si.fit(bfDf)
BfNormDf = si_model.transform(bfDf)

In [19]:
from pyspark.ml.feature import StringIndexer
si = StringIndexer(inputCol = "Age", outputCol="IND_Age")
si_model = si.fit(BfNormDf)
BfNormDf = si_model.transform(BfNormDf)

In [20]:
from pyspark.ml.feature import StringIndexer
si = StringIndexer(inputCol = "Gender", outputCol="IND_Gender")
si_model = si.fit(BfNormDf)
BfNormDf = si_model.transform(BfNormDf)

In [21]:
from pyspark.ml.feature import StringIndexer
si = StringIndexer(inputCol = "Stay_Current_City_Years", outputCol="IND_Stay_In_City")
si_model = si.fit(BfNormDf)
BfNormDf = si_model.transform(BfNormDf)

### Correlation information.

In [23]:
for i in BfNormDf.columns:
  if not(isinstance(BfNormDf.select(i).take(1)[0][0],str)):
    print("Correlation to Species for", i,\
         BfNormDf.stat.corr("Purchase",i))

### Model Description

In [25]:
BfNormDf.describe().show()

Basic Visualisation

In [27]:
BfNormDf.createOrReplaceTempView("BfNormTable")

In [28]:
%sql
select * from BfNormTable

Age,City_Category,Gender,Marital_Status,Occupation,Product_Category_1,Product_Category_2,Product_Category_3,Purchase,Stay_Current_City_Years,User_ID,IND_City,IND_Age,IND_Gender,IND_Stay_In_City
0-17,A,F,0,10,3,0,0,8370.0,2,1000001,2.0,6.0,1.0,1.0
0-17,A,F,0,10,1,6,14,15200.0,2,1000001,2.0,6.0,1.0,1.0
0-17,A,F,0,10,12,0,0,1422.0,2,1000001,2.0,6.0,1.0,1.0
0-17,A,F,0,10,12,14,0,1057.0,2,1000001,2.0,6.0,1.0,1.0
55+,C,M,0,16,8,0,0,7969.0,4+,1000002,1.0,5.0,0.0,3.0
26-35,A,M,0,15,1,2,0,15227.0,3,1000003,2.0,0.0,0.0,2.0
46-50,B,M,1,7,1,8,17,19215.0,2,1000004,0.0,3.0,0.0,1.0
46-50,B,M,1,7,1,15,0,15854.0,2,1000004,0.0,3.0,0.0,1.0
46-50,B,M,1,7,1,16,0,15686.0,2,1000004,0.0,3.0,0.0,1.0
26-35,A,M,1,20,8,0,0,7871.0,1,1000005,2.0,0.0,0.0,0.0


In [29]:
%sql
select * from BfNormTable

Age,City_Category,Gender,Marital_Status,Occupation,Product_Category_1,Product_Category_2,Product_Category_3,Purchase,Stay_Current_City_Years,User_ID,IND_City,IND_Age,IND_Gender,IND_Stay_In_City
0-17,A,F,0,10,3,0,0,8370.0,2,1000001,2.0,6.0,1.0,1.0
0-17,A,F,0,10,1,6,14,15200.0,2,1000001,2.0,6.0,1.0,1.0
0-17,A,F,0,10,12,0,0,1422.0,2,1000001,2.0,6.0,1.0,1.0
0-17,A,F,0,10,12,14,0,1057.0,2,1000001,2.0,6.0,1.0,1.0
55+,C,M,0,16,8,0,0,7969.0,4+,1000002,1.0,5.0,0.0,3.0
26-35,A,M,0,15,1,2,0,15227.0,3,1000003,2.0,0.0,0.0,2.0
46-50,B,M,1,7,1,8,17,19215.0,2,1000004,0.0,3.0,0.0,1.0
46-50,B,M,1,7,1,15,0,15854.0,2,1000004,0.0,3.0,0.0,1.0
46-50,B,M,1,7,1,16,0,15686.0,2,1000004,0.0,3.0,0.0,1.0
26-35,A,M,1,20,8,0,0,7871.0,1,1000005,2.0,0.0,0.0,0.0


In [30]:
%sql
select * from BfNormTable

Age,City_Category,Gender,Marital_Status,Occupation,Product_Category_1,Product_Category_2,Product_Category_3,Purchase,Stay_Current_City_Years,User_ID,IND_City,IND_Age,IND_Gender,IND_Stay_In_City
0-17,A,F,0,10,3,0,0,8370.0,2,1000001,2.0,6.0,1.0,1.0
0-17,A,F,0,10,1,6,14,15200.0,2,1000001,2.0,6.0,1.0,1.0
0-17,A,F,0,10,12,0,0,1422.0,2,1000001,2.0,6.0,1.0,1.0
0-17,A,F,0,10,12,14,0,1057.0,2,1000001,2.0,6.0,1.0,1.0
55+,C,M,0,16,8,0,0,7969.0,4+,1000002,1.0,5.0,0.0,3.0
26-35,A,M,0,15,1,2,0,15227.0,3,1000003,2.0,0.0,0.0,2.0
46-50,B,M,1,7,1,8,17,19215.0,2,1000004,0.0,3.0,0.0,1.0
46-50,B,M,1,7,1,15,0,15854.0,2,1000004,0.0,3.0,0.0,1.0
46-50,B,M,1,7,1,16,0,15686.0,2,1000004,0.0,3.0,0.0,1.0
26-35,A,M,1,20,8,0,0,7871.0,1,1000005,2.0,0.0,0.0,0.0


In [31]:
%sql
select * from BfNormTable

Age,City_Category,Gender,Marital_Status,Occupation,Product_Category_1,Product_Category_2,Product_Category_3,Purchase,Stay_Current_City_Years,User_ID,IND_City,IND_Age,IND_Gender,IND_Stay_In_City
0-17,A,F,0,10,3,0,0,8370.0,2,1000001,2.0,6.0,1.0,1.0
0-17,A,F,0,10,1,6,14,15200.0,2,1000001,2.0,6.0,1.0,1.0
0-17,A,F,0,10,12,0,0,1422.0,2,1000001,2.0,6.0,1.0,1.0
0-17,A,F,0,10,12,14,0,1057.0,2,1000001,2.0,6.0,1.0,1.0
55+,C,M,0,16,8,0,0,7969.0,4+,1000002,1.0,5.0,0.0,3.0
26-35,A,M,0,15,1,2,0,15227.0,3,1000003,2.0,0.0,0.0,2.0
46-50,B,M,1,7,1,8,17,19215.0,2,1000004,0.0,3.0,0.0,1.0
46-50,B,M,1,7,1,15,0,15854.0,2,1000004,0.0,3.0,0.0,1.0
46-50,B,M,1,7,1,16,0,15686.0,2,1000004,0.0,3.0,0.0,1.0
26-35,A,M,1,20,8,0,0,7871.0,1,1000005,2.0,0.0,0.0,0.0


### Creating Labeled points from labels to features.

In [33]:
from pyspark.ml.linalg import Vectors
def transformToLabeledPoint(row) :
    lp = ( row["Purchase"], Vectors.dense([row["IND_Age"],\
                        row["IND_Gender"], \
                        row["IND_Stay_In_City"], \
                        row["Product_Category_1"],\
                        row["Product_Category_2"],\
                        row["Product_Category_3"],
                        row["Marital_Status"],
                        row["Occupation"]]))
    return lp
    

In [34]:
bfLabPoint = BfNormDf.rdd.map(transformToLabeledPoint)
bfDf = ss.createDataFrame(bfLabPoint,["label","features"])
bfDf.select("label","features").show(1000)

In [35]:
bfDf.cache()

### Splitting into test and training data

In [37]:
(trainingData, testData) = bfDf.randomSplit([0.8, 0.2])
trainingData.count()

In [38]:
testData.count()

### Fitting Linear Model

In [40]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=100)
lrModel = lr.fit(trainingData)

### Model coefficients and intercepts

In [42]:
#Print the metrics
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

### Predictions

In [44]:
#Predict on the test data
predictions = lrModel.transform(testData)
predictions.show(50000)

### Model accuracy using root mean square error.

In [46]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="label",metricName="r2")
evaluator.evaluate(predictions)

# Classification attempt!

In [48]:
%sql
CREATE TEMP VIEW New_Black_Friday
AS SELECT 
* FROM
(
SELECT 
    T1.User_ID,
    0 Classification
FROM BfNormTable T1
) 

In [49]:
%sql
Drop view New_Black_Friday

In [50]:
%sql
SELECT User_ID,Purchase
from BFNormTable
order by Purchase asc

User_ID,Purchase
1004227,185.0
1004048,185.0
1001968,185.0
1003391,185.0
1000889,186.0
1005039,186.0
1006025,186.0
1001630,186.0
1001198,187.0
1005120,187.0


In [51]:
%sql
create temp view newdf 
as
(select a.User_ID,a.Classification, b.Age,b.City_Category,b.Gender,b.Marital_Status,b.Occupation,b.Product_Category_1,b.Product_Category_2,b.Product_Category_3, b.Purchase, b.Stay_Current_City_Years, b.IND_City, b.IND_Age, b.IND_Gender, b.IND_Stay_In_City 
from 
New_Black_Friday a
LEFT JOIN BfNormTable b
ON a.User_ID = b.USER_ID)



In [52]:
blackFridayDf = sqlContext.table("select * from newdf")

In [53]:
%sql
select * from newdf

User_ID,Classification,Age,City_Category,Gender,Marital_Status,Occupation,Product_Category_1,Product_Category_2,Product_Category_3,Purchase,Stay_Current_City_Years,IND_City,IND_Age,IND_Gender,IND_Stay_In_City
1000012,0,26-35,C,M,0,12,1,6,0,15246.0,2,1.0,0.0,0.0,1.0
1000012,0,26-35,C,M,0,12,5,8,0,6865.0,2,1.0,0.0,0.0,1.0
1000012,0,26-35,C,M,0,12,1,6,8,19031.0,2,1.0,0.0,0.0,1.0
1000012,0,26-35,C,M,0,12,5,0,0,5196.0,2,1.0,0.0,0.0,1.0
1000012,0,26-35,C,M,0,12,10,13,16,14249.0,2,1.0,0.0,0.0,1.0
1000012,0,26-35,C,M,0,12,5,6,8,5464.0,2,1.0,0.0,0.0,1.0
1000012,0,26-35,C,M,0,12,5,0,0,3576.0,2,1.0,0.0,0.0,1.0
1000012,0,26-35,C,M,0,12,6,0,0,16599.0,2,1.0,0.0,0.0,1.0
1000012,0,26-35,C,M,0,12,8,16,0,9696.0,2,1.0,0.0,0.0,1.0
1000012,0,26-35,C,M,0,12,1,8,17,12091.0,2,1.0,0.0,0.0,1.0
