In [1]:
# Imports

import warnings
warnings.filterwarnings('ignore')
import pandas as pd
import nltk
# nltk.download('wordnet')

from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col,udf, col, lower, regexp_replace
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import lit
from sklearn import preprocessing

In [2]:
# Importing and Initializing Spark session

spark = SparkSession.builder.appName('guess_the_product').getOrCreate()

In [3]:
# Importing Required Data Files in Spark dataframe

train_data = spark.read.csv('train_set.csv',inferSchema=True,header=True)
test_data = spark.read.csv('test_set.csv',inferSchema=True,header=True)

In [4]:
# Addition of blank product  category to test data.
test_data = test_data.withColumn("Product_Category", lit('vald'))

In [5]:
train_data.columns == test_data.columns

True

In [6]:
final_df = train_data.union(test_data)

In [7]:
# final_df.Product_Category.count()

In [8]:
# # Lowercase text data.
final_df = final_df.select('*', (lower(col('Item_Description')).alias('Lower_Item_Description'))).drop('Item_Description')

In [9]:
# Representing target categorical feature to numeric.

product_categories = pd.Series(list(final_df.select('Product_Category').distinct().toPandas()['Product_Category']))
product_categories = product_categories.to_dict()
product_categories = {value : str(key) for (key, value) in product_categories.items()}

final_df = final_df.replace(product_categories,subset='Product_Category')
final_df = final_df.withColumn("label", col("Product_Category").cast(IntegerType()))

In [10]:
final_df.show(5)

+------+-----------+----------+-------+----------------+----------------------+-----+
|Inv_Id|Vendor_Code|   GL_Code|Inv_Amt|Product_Category|Lower_Item_Description|label|
+------+-----------+----------+-------+----------------+----------------------+-----+
| 15001|VENDOR-1676|GL-6100410|  83.24|              28|  artworking/typese...|   28|
| 15002|VENDOR-1883|GL-2182000|  51.18|              10|  auto leasing corp...|   10|
| 15004|VENDOR-1999|GL-6050100|  79.02|              25|  store management ...|   25|
| 15005|VENDOR-1771|GL-6101400|   48.5|              22|  store constructio...|   22|
| 15006|VENDOR-1331|GL-2182000|  63.35|              29|  jul 2015 aydin co...|   29|
+------+-----------+----------+-------+----------------+----------------------+-----+
only showing top 5 rows



In [11]:
# Assembling ML Pipeline to transform the data

# Representing categorical feature to numeric.
encoder = StringIndexer(inputCol= 'Vendor_Code', outputCol= 'Vendor_Code_index')

# Tokenization of text data for proccessing.
tokenizer = Tokenizer(inputCol="Lower_Item_Description", outputCol="token_text")

# Stopwords removal ( Cleaning phase ).
stopwords = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')

# Words by their counts.
count_vect = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')

# Term frequency * Inverse document frquency for obtaining text data weights.
tf_idf = IDF(inputCol="c_vec", outputCol="tf_idf")

# Representing target categorical feature to numeric.
# target_encoding = StringIndexer(inputCol='Product_Category',outputCol='label')

# Representing target categorical feature to numeric.
important_features = VectorAssembler(inputCols=['Vendor_Code_index','Inv_Amt','tf_idf'],outputCol='features')

In [12]:
# Initialization
transorfer_pipeline = Pipeline(stages=[encoder,tokenizer,stopwords,count_vect,tf_idf,important_features]) #target_encoding

In [13]:
# Fitting the transformation process on training data.
transformer = transorfer_pipeline.fit(final_df)

# Final transformation on training data.
final_train_data = transformer.transform(final_df)

In [14]:
# Spliting data

# ML Model data preparation.
# final_train_data.select('Product_Category').distinct().collect()

validation_key = product_categories['vald']
train_test_df = final_train_data.filter((col("Product_Category") !=validation_key ))
validation_df = final_train_data.filter((col("Product_Category") ==validation_key ))
(train_df,test_df) = train_test_df.randomSplit([0.7,0.3])

train_df = train_df.select('label','features')
test_df = test_df.select('label','features')
validation_df = validation_df.select('Inv_Id','label','features')

# train_df = train_df.select('label','features')
# test_df = test_df.select('label','features')
# validation_df = validation_df.select('label','features')

In [15]:
# Initializing Naive Bayes model.
# Better Model for Multiclassification having a NLP data.
nb = NaiveBayes(featuresCol='features',smoothing=1.0,modelType='multinomial')

product_predictor = nb.fit(train_df)
test_results = product_predictor.transform(test_df)

In [29]:
# Evaluation metric
# 

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting Product category is: {}".format(acc*100))

Accuracy of model at predicting Product category is: 90.44736809309904


In [26]:
test_results.show(2)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|   22|(2654,[0,1,10,11,...|[-435.73812815629...|[9.16052783553018...|      22.0|
|   22|(2654,[0,1,10,11,...|[-430.86398681131...|[5.06491753031984...|      22.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 2 rows



In [27]:
# Validation submission data
valid_results = product_predictor.transform(validation_df.select('label','features'))
valid_results.show(2)

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|   30|(2654,[0,1,10,21,...|[-878.30675586042...|[1.73960516366362...|      25.0|
|   30|(2654,[0,1,2,9,28...|[-568.12377705213...|[5.59753144454322...|      28.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 2 rows



In [19]:
valid_results.count(),validation_df.count()

(278, 278)

In [20]:
from pyspark.sql.functions import desc, row_number, monotonically_increasing_id
from pyspark.sql.window import Window

validation_df = validation_df.withColumn('id', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)
valid_results = valid_results.withColumn('id', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)

In [21]:
output_df = validation_df.select('id','Inv_Id').join(valid_results.select('id','prediction'),'id','outer')

In [23]:
output_df = output_df.withColumn("ProductCategory", col("prediction").cast(IntegerType())).drop('prediction')
output_df.show(2)

+---+------+---------------+
| id|Inv_Id|ProductCategory|
+---+------+---------------+
|  0| 15041|             25|
|  1| 15094|             28|
+---+------+---------------+
only showing top 2 rows



In [24]:
# Decoding Target feature
product_categories_decode = {int(value) : key for (key, value) in product_categories.items()}
map_func = udf(lambda row : product_categories_decode.get(row,row))

# Finalising output
output_df = output_df.withColumn("Product_Category", map_func(col("ProductCategory"))).drop('id','ProductCategory')
output_df.show(5)

+------+----------------+
|Inv_Id|Product_Category|
+------+----------------+
| 15041|      CLASS-1274|
| 15094|      CLASS-1963|
| 15112|      CLASS-1758|
| 15179|      CLASS-1522|
| 15212|      CLASS-1758|
+------+----------------+
only showing top 5 rows



In [25]:
# Exporting Submission CSV
output_df.write.csv('submission.csv')

In [None]:
# Saving model to disk
nb.save("N_B_classifier")