In [5]:
import findspark
findspark.init()
from pyspark import SparkContext, SQLContext
import pandas as pd

In [6]:
# Create sparkContext
sc = SparkContext.getOrCreate('local')

sqlContext = SQLContext(sc)


In [7]:
# fetch training data
train = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=True).load('./data/train.csv')

train_pd = train.toPandas()

In [8]:
# check fetched data
train.head(5)

[Row(User_ID=1000001, Product_ID='P00069042', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370),
 Row(User_ID=1000001, Product_ID='P00248942', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200),
 Row(User_ID=1000001, Product_ID='P00087842', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=None, Product_Category_3=None, Purchase=1422),
 Row(User_ID=1000001, Product_ID='P00085442', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=14, Product_Category_3=None, Purchase=1057),
 Row(User_ID=1000002, Product_ID='P0

In [9]:
train.count()

550068

In [10]:
# fetch testing data
test = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=True).load('./data/test.csv')


In [11]:
# check fetched data
test.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)



In [12]:
# Check for null values
train.na.drop().count()

166821

In [13]:
test.na.drop('any').count()

71037

In [14]:
train.head(5)

[Row(User_ID=1000001, Product_ID='P00069042', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370),
 Row(User_ID=1000001, Product_ID='P00248942', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200),
 Row(User_ID=1000001, Product_ID='P00087842', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=None, Product_Category_3=None, Purchase=1422),
 Row(User_ID=1000001, Product_ID='P00085442', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=14, Product_Category_3=None, Purchase=1057),
 Row(User_ID=1000002, Product_ID='P0

In [15]:
# fill -1 for null values in train and test
train = train.fillna(-1)
test = test.fillna(-1)

In [16]:
train.select('User_ID').show()

+-------+
|User_ID|
+-------+
|1000001|
|1000001|
|1000001|
|1000001|
|1000002|
|1000003|
|1000004|
|1000004|
|1000004|
|1000005|
|1000005|
|1000005|
|1000005|
|1000005|
|1000006|
|1000006|
|1000006|
|1000006|
|1000007|
|1000008|
+-------+
only showing top 20 rows



In [17]:
train.select('User_ID','Product_ID', 'Gender', 'Age').show()

+-------+----------+------+-----+
|User_ID|Product_ID|Gender|  Age|
+-------+----------+------+-----+
|1000001| P00069042|     F| 0-17|
|1000001| P00248942|     F| 0-17|
|1000001| P00087842|     F| 0-17|
|1000001| P00085442|     F| 0-17|
|1000002| P00285442|     M|  55+|
|1000003| P00193542|     M|26-35|
|1000004| P00184942|     M|46-50|
|1000004| P00346142|     M|46-50|
|1000004|  P0097242|     M|46-50|
|1000005| P00274942|     M|26-35|
|1000005| P00251242|     M|26-35|
|1000005| P00014542|     M|26-35|
|1000005| P00031342|     M|26-35|
|1000005| P00145042|     M|26-35|
|1000006| P00231342|     F|51-55|
|1000006| P00190242|     F|51-55|
|1000006|  P0096642|     F|51-55|
|1000006| P00058442|     F|51-55|
|1000007| P00036842|     M|36-45|
|1000008| P00249542|     M|26-35|
+-------+----------+------+-----+
only showing top 20 rows



In [18]:
train.select('Product_ID').distinct().count(), test.select('Product_ID').distinct().count()

(3631, 3491)

In [19]:
diff_cat_in_test_train = test.select('Product_ID').subtract(train.select('Product_ID'))
diff_cat_in_test_train.distinct().count()  # For distinct count

46

In [20]:
diff_cat_in_train_test = train.select('Product_ID').subtract(test.select('Product_ID'))
diff_cat_in_train_test.distinct().count()

186

In [21]:
from pyspark.ml.feature import StringIndexer
plan_indexer = StringIndexer(inputCol='Product_ID', outputCol='product_ID')
labeller_train = plan_indexer.fit(train)

In [22]:
Train1 = labeller_train.transform(train)

In [23]:
Train1.show()

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|product_ID|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------+
|1000001| P00069042|     F| 0-17|        10|            A|                         2|             0|                 3|                -1|                -1|    8370|     766.0|
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|     183.0|
|1000001| P00087842|     F| 0-17|        10|            A|                         2|             0|          

In [24]:
# labeller_test = plan_indexer.fit(test)
Test1 = labeller_train.transform(test)
# Test1.show()

In [25]:
from pyspark.ml.feature import RFormula
formula = RFormula(formula="Purchase ~ Age+ Occupation +City_Category+Stay_In_Current_City_Years+Product_Category_1+Product_Category_2+ Product_Category_3 + Gender", featuresCol="features", labelCol="label")

In [26]:
t1 = formula.fit(Train1)
train1 = t1.transform(Train1)
# t2 = formula.
# test1 = t1.transform(Test1)

AnalysisException: "Reference 'Product_ID' is ambiguous, could be: Product_ID, Product_ID.;"