# PBDA Project
## Analysis and Forecasting of Black Friday Sales

Tejas Shetty - trs389@nyu.edu

Sanjana Jangnure - sbj286@nyu.edu

Aishwarya Kore - adk497@nyu.edu

Richanshu Jha - rj1469@nyu.edu

--- 

### Required Python modules
1. PySpark
2. findSpark

### To Run:
1. Please ensure the required Python modules are installed.
2. This ```.ipynb``` file can be run as a jupyter notebook.

In [1]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

spark = SparkSession\
        .builder\
        .appName("PBDA_Project")\
        .getOrCreate()

# Prepare training and test data.
data = spark.read.format("csv")\
    .option("header","true")\
    .option("inferSchema", "true")\
    .load("E:\#Notes and Study Stuff\PBDA\Final Project\data\\train.csv")

df = data

In [2]:
GenderF = F.udf(lambda x : '1' if x=='F' else 0)

df=df.withColumn("newgender",GenderF(df.Gender).cast(IntegerType()))
df.select("Gender","newgender").show()

productidF=F.udf(lambda x:str(x).strip('P'))

df=df.withColumn("newpid",productidF(df.Product_ID).cast(IntegerType()))
df.select("Product_ID","newpid").show()

cityCategoryF=F.udf(lambda x : 0 if x=='A' else(1 if x=='B' else 2))

df=df.withColumn("newcityCategory",cityCategoryF(df.City_Category).cast(IntegerType()))
df.select("City_Category","newcityCategory").show()

df.printSchema()

+------+---------+
|Gender|newgender|
+------+---------+
|     F|        1|
|     F|        1|
|     F|        1|
|     F|        1|
|     M|        0|
|     M|        0|
|     M|        0|
|     M|        0|
|     M|        0|
|     M|        0|
|     M|        0|
|     M|        0|
|     M|        0|
|     M|        0|
|     F|        1|
|     F|        1|
|     F|        1|
|     F|        1|
|     M|        0|
|     M|        0|
+------+---------+
only showing top 20 rows

+----------+------+
|Product_ID|newpid|
+----------+------+
| P00069042| 69042|
| P00248942|248942|
| P00087842| 87842|
| P00085442| 85442|
| P00285442|285442|
| P00193542|193542|
| P00184942|184942|
| P00346142|346142|
|  P0097242| 97242|
| P00274942|274942|
| P00251242|251242|
| P00014542| 14542|
| P00031342| 31342|
| P00145042|145042|
| P00231342|231342|
| P00190242|190242|
|  P0096642| 96642|
| P00058442| 58442|
| P00036842| 36842|
| P00249542|249542|
+----------+------+
only showing top 20 rows

+-----------

In [3]:
from pyspark.ml.feature import Word2Vec
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import RegexTokenizer, CountVectorizer
from pyspark.sql import functions as F

merged = F.udf(lambda a,b,c,d,e,f,g,h,i,j: 'a'+str(a)+' b'+str(b)+' c'+str(c)+ \
               ' d'+str(d)+' e'+str(e)+' f'+str(f)+' g'+str(g)+' h'+str(h)+' i'+str(i)+ \
               ' j'+str(j))


df = df.withColumn('X',merged(df['Age'], df['newgender'], df['Occupation'], \
                              df['newpid'], df['newcityCategory'], \
                              df['Stay_In_Current_City_Years'], df['Marital_Status'], \
                              df['Product_Category_1'], df['Product_Category_2'], df['Product_Category_3']))

df.select('Age', 'newgender', 'Occupation', 'X').show(10)

+-----+---------+----------+--------------------+
|  Age|newgender|Occupation|                   X|
+-----+---------+----------+--------------------+
| 0-17|        1|        10|a0-17 b1 c10 d690...|
| 0-17|        1|        10|a0-17 b1 c10 d248...|
| 0-17|        1|        10|a0-17 b1 c10 d878...|
| 0-17|        1|        10|a0-17 b1 c10 d854...|
|  55+|        0|        16|a55+ b0 c16 d2854...|
|26-35|        0|        15|a26-35 b0 c15 d19...|
|46-50|        0|         7|a46-50 b0 c7 d184...|
|46-50|        0|         7|a46-50 b0 c7 d346...|
|46-50|        0|         7|a46-50 b0 c7 d972...|
|26-35|        0|        20|a26-35 b0 c20 d27...|
+-----+---------+----------+--------------------+
only showing top 10 rows



In [4]:
#Initializing Tokenizer
regexTokenizer = RegexTokenizer(\
                    inputCol="X", \
                    outputCol="X2", \
                    pattern=" ")

df = regexTokenizer.transform(df)
df.select('Age', 'newgender', 'Occupation','X', 'X2').show(10)

+-----+---------+----------+--------------------+--------------------+
|  Age|newgender|Occupation|                   X|                  X2|
+-----+---------+----------+--------------------+--------------------+
| 0-17|        1|        10|a0-17 b1 c10 d690...|[a0-17, b1, c10, ...|
| 0-17|        1|        10|a0-17 b1 c10 d248...|[a0-17, b1, c10, ...|
| 0-17|        1|        10|a0-17 b1 c10 d878...|[a0-17, b1, c10, ...|
| 0-17|        1|        10|a0-17 b1 c10 d854...|[a0-17, b1, c10, ...|
|  55+|        0|        16|a55+ b0 c16 d2854...|[a55+, b0, c16, d...|
|26-35|        0|        15|a26-35 b0 c15 d19...|[a26-35, b0, c15,...|
|46-50|        0|         7|a46-50 b0 c7 d184...|[a46-50, b0, c7, ...|
|46-50|        0|         7|a46-50 b0 c7 d346...|[a46-50, b0, c7, ...|
|46-50|        0|         7|a46-50 b0 c7 d972...|[a46-50, b0, c7, ...|
|26-35|        0|        20|a26-35 b0 c20 d27...|[a26-35, b0, c20,...|
+-----+---------+----------+--------------------+--------------------+
only s

In [5]:
countVectorizer = CountVectorizer(inputCol="X2",  outputCol="features")
cvModel = countVectorizer.fit(df)
df = cvModel.transform(df)

df.select('X2','features').show(10)

+--------------------+--------------------+
|                  X2|            features|
+--------------------+--------------------+
|[a0-17, b1, c10, ...|(3725,[1,2,7,10,1...|
|[a0-17, b1, c10, ...|(3725,[2,10,11,12...|
|[a0-17, b1, c10, ...|(3725,[1,2,7,10,1...|
|[a0-17, b1, c10, ...|(3725,[1,2,10,12,...|
|[a55+, b0, c16, d...|(3725,[0,1,2,7,8,...|
|[a26-35, b0, c15,...|(3725,[0,1,2,5,10...|
|[a46-50, b0, c7, ...|(3725,[0,3,4,11,1...|
|[a46-50, b0, c7, ...|(3725,[0,1,3,4,11...|
|[a46-50, b0, c7, ...|(3725,[0,1,3,4,11...|
|[a26-35, b0, c20,...|(3725,[0,1,4,5,6,...|
+--------------------+--------------------+
only showing top 10 rows



In [6]:
#REDUCING NO OF LABELS BY PUTTING PRICE INTO BUCKETS
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType

df = df.withColumnRenamed('Purchase','label')
df = df.withColumn('labelCategory',((df['label']/1000).cast(IntegerType())).cast(FloatType()))
df.select('label','labelCategory').show(10)

+-----+-------------+
|label|labelCategory|
+-----+-------------+
| 8370|          8.0|
|15200|         15.0|
| 1422|          1.0|
| 1057|          1.0|
| 7969|          7.0|
|15227|         15.0|
|19215|         19.0|
|15854|         15.0|
|15686|         15.0|
| 7871|          7.0|
+-----+-------------+
only showing top 10 rows



In [7]:
# CREATING RANDOM SPLITS
dfFinal = df.select('features','labelCategory') \
            .withColumnRenamed('labelCategory','label')

(trainData, testData) = dfFinal.randomSplit([0.7, 0.3], seed = 888)
print('Training split')
trainData.groupBy('features').count().show(10)
print('Testing split')
testData.groupBy('features').count().show(10)

print('Train Dataset Count : ' + str(trainData.count()))
print('Test Dataset Count  : ' + str(testData.count()))

Training split
+--------------------+-----+
|            features|count|
+--------------------+-----+
|(3725,[0,1,2,3,5,...|    2|
|(3725,[0,1,2,3,5,...|    1|
|(3725,[0,1,2,3,5,...|    1|
|(3725,[0,1,2,3,5,...|    1|
|(3725,[0,1,2,3,5,...|    2|
|(3725,[0,1,2,3,5,...|    1|
|(3725,[0,1,2,3,5,...|    1|
|(3725,[0,1,2,3,5,...|    1|
|(3725,[0,1,2,3,5,...|    1|
|(3725,[0,1,2,3,5,...|    1|
+--------------------+-----+
only showing top 10 rows

Testing split
+--------------------+-----+
|            features|count|
+--------------------+-----+
|(3725,[0,1,2,3,5,...|    1|
|(3725,[0,1,2,3,5,...|    1|
|(3725,[0,1,2,3,5,...|    1|
|(3725,[0,1,2,3,5,...|    1|
|(3725,[0,1,2,3,5,...|    1|
|(3725,[0,1,2,3,5,...|    1|
|(3725,[0,1,2,3,5,...|    1|
|(3725,[0,1,2,3,5,...|    2|
|(3725,[0,1,2,3,5,...|    1|
|(3725,[0,1,2,3,6,...|    1|
+--------------------+-----+
only showing top 10 rows

Train Dataset Count : 384838
Test Dataset Count  : 165230


In [8]:
#Training
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#trainData.printSchema()
#testData.printSchema()

lr = LogisticRegression()
lrModel = lr.fit(trainData)
predictions = lrModel.transform(testData)
print('Model Training Complete')

Model Training Complete


In [9]:
predictions.groupBy('prediction').count().orderBy('prediction').show(31)

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0| 3148|
|       1.0| 2968|
|       2.0| 3648|
|       3.0| 2824|
|       4.0| 6231|
|       5.0|24835|
|       6.0| 3442|
|       7.0|23683|
|       8.0|17188|
|       9.0|14803|
|      10.0| 3395|
|      11.0| 9261|
|      12.0| 2291|
|      13.0| 2951|
|      14.0|  169|
|      15.0|26111|
|      16.0| 6884|
|      17.0|  211|
|      18.0|  403|
|      19.0| 5526|
|      20.0| 3258|
|      21.0|  705|
|      23.0| 1295|
+----------+-----+



In [10]:
predictions = predictions.withColumn('predictedLabelCategory', predictions['prediction'])
dfPredicted = df.join(predictions, df.features == predictions.features)
dfPredicted.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- newgender: integer (nullable = true)
 |-- newpid: integer (nullable = true)
 |-- newcityCategory: integer (nullable = true)
 |-- X: string (nullable = true)
 |-- X2: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- labelCategory: float (nullable = true)
 |-- features: vector (nullable = true)
 |-- label: float (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- pro

In [11]:
from  pyspark.sql.functions import abs

dfPredicted = dfPredicted.withColumn('PriceRange', dfPredicted['labelCategory']*1000)
dfPredicted = dfPredicted.withColumn('PredictedPriceRange', dfPredicted['predictedLabelCategory']*1000)

pp = dfPredicted.groupBy('PriceRange').count().count()
print('No. of Price Points = %i'%(pp))

# +1 to deal with the NULL values 
dfPredicted = dfPredicted.withColumn('error', abs(((dfPredicted['PriceRange']) - dfPredicted['PredictedPriceRange'])))
dfPredicted.select(['PriceRange','PredictedPriceRange','error']).show()

statistics = dfPredicted.select('error').summary()

err = statistics.collect()[1]['error']
print('Average Error = %0.4f'%(float(err)))

statistics.show()

No. of Price Points = 24
+----------+-------------------+------+
|PriceRange|PredictedPriceRange| error|
+----------+-------------------+------+
|    1000.0|             1000.0|   0.0|
|    7000.0|             7000.0|   0.0|
|   19000.0|            19000.0|   0.0|
|   19000.0|            19000.0|   0.0|
|    3000.0|             7000.0|4000.0|
|   15000.0|            15000.0|   0.0|
|    5000.0|             5000.0|   0.0|
|   19000.0|            15000.0|4000.0|
|    5000.0|             7000.0|2000.0|
|    5000.0|             7000.0|2000.0|
|    7000.0|             5000.0|2000.0|
|   12000.0|             9000.0|3000.0|
|    8000.0|             5000.0|3000.0|
|   15000.0|            11000.0|4000.0|
|   10000.0|             8000.0|2000.0|
|   19000.0|            19000.0|   0.0|
|    8000.0|             9000.0|1000.0|
|   12000.0|            11000.0|1000.0|
|    5000.0|             7000.0|2000.0|
|   19000.0|            15000.0|4000.0|
+----------+-------------------+------+
only showing to

In [12]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark.sparkContext)

jdf=df.join(predictions,df.features==predictions.features)
jdf.show(5)
jdf.registerTempTable("table1")
spark.sql("select * from table1").show()

predcount=spark.sql("SELECT Product_ID, \
                     prediction, \
                     count(prediction) as countpred from table1 group by Product_ID, \
                     prediction order by Product_ID,countpred desc")
predcount.registerTempTable("table2")

predcount2=spark.sql("SELECT Product_ID, \
                      max(countpred) as maxpred from table2 group by Product_ID order by Product_ID")
predcount2.registerTempTable("table3")

ppp=spark.sql("SELECT table2.Product_ID, \
               table2.prediction, \
               table3.maxpred from table2 inner join table3 on \
                  table2.Product_ID=table3.Product_ID \
                  and table2.countpred = table3.maxpred ")

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+-----+---------+------+---------------+--------------------+--------------------+--------------------+-------------+--------------------+-----+--------------------+--------------------+----------+----------------------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|label|newgender|newpid|newcityCategory|                   X|                  X2|            features|labelCategory|            features|label|       rawPrediction|         probability|prediction|predictedLabelCategory|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+-----+---------+------+---------------+--------------------+--------------------+--------

In [16]:
ppp = ppp.withColumn('PredictedTotalSalesAmountInMillions', (ppp['prediction']*1000*ppp['maxpred'])/1000000)
ppp.select(['product_ID', \
            'prediction', \
            'maxpred', \
            'PredictedTotalSalesAmountInMillions']) \
                .orderBy('PredictedTotalSalesAmountInMillions', ascending = False) \
                .show(40)

+----------+----------+-------+-----------------------------------+
|product_ID|prediction|maxpred|PredictedTotalSalesAmountInMillions|
+----------+----------+-------+-----------------------------------+
| P00025442|      19.0|   1238|                             23.522|
| P00110742|      19.0|   1062|                             20.178|
| P00010742|      19.0|   1048|                             19.912|
| P00237542|      19.0|   1020|                              19.38|
| P00110942|      19.0|   1007|                             19.133|
| P00184942|      19.0|    999|                             18.981|
| P00255842|      20.0|    905|                               18.1|
| P00046742|      15.0|   1138|                              17.07|
| P00059442|      20.0|    804|                              16.08|
| P00110842|      19.0|    792|                             15.048|
| P00057642|      15.0|    976|                              14.64|
| P00028842|      20.0|    727|                 