In [48]:
%config Completer.use_jedi = False

from pyspark.sql import SparkSession
import numpy
import pandas

import os
os.environ['PYSPARK_PYTHON'] = '/var/www/py_spark_ccf/PY_SPARK_CCF_ENV/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/var/www/py_spark_ccf/PY_SPARK_CCF_ENV/bin/python3'
os.getcwd()

'/var/www/py_spark_ccf/notebooks'

In [49]:
spark_session = SparkSession.builder.master("spark://costrategix-pc:7077")\
    .appName('product_prediction').getOrCreate()

In [50]:
spark_session.sparkContext.getConf().getAll()

[('spark.driver.memory', '18g'),
 ('spark.driver.port', '40647'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.name', 'product_prediction'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.id', 'app-20210404095804-0003'),
 ('spark.master', 'spark://costrategix-pc:7077'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.startTime', '1617510482955'),
 ('spark.driver.host', '192.168.0.7'),
 ('spark.ui.showConsoleProgress', 'true')]

In [51]:
audit_data_frame = spark_session.read.csv('../data/audit_data_frame_2021_04_02.csv',
    inferSchema=True, header=True)

In [52]:
audit_data_frame.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- PRIMARY_KEY: string (nullable = true)
 |-- AUDIT_CLASSIFICATION_CORRECT: integer (nullable = true)
 |-- AUDIT_CORRECTED_PACKAGE_ID: string (nullable = true)
 |-- AUDIT_KEG_FLAG_CORRECT: integer (nullable = true)
 |-- AUDIT_NEW_PACKAGE: string (nullable = true)
 |-- AUDIT_PACKAGE_CORRECT: integer (nullable = true)
 |-- AUDIT_PRODUCT_CORRECT: integer (nullable = true)
 |-- AUDIT_SUB_CLASSIFICATION_CORRECT: integer (nullable = true)
 |-- INVOICE_CASE_EQUIVALENT: double (nullable = true)
 |-- INVOICE_ITEM_CLASSIFICATION: string (nullable = true)
 |-- INVOICE_ITEM_QUANTITY_PRICE: double (nullable = true)
 |-- INVOICE_ITEM_SAC_AMOUNT: double (nullable = true)
 |-- INVOICE_ITEM_SUB_CLASSIFICATION: string (nullable = true)
 |-- INVOICE_ITEM_TOTAL_PRICE: double (nullable = true)
 |-- INVOICE_LLQ: integer (nullable = true)
 |-- INVOICE_PACKAGE_DESCRIPTION: string (nullable = true)
 |-- INVOICE_PROCESS_DATE: string (nullable = true)
 |-- INVOICE_RAW_P

In [53]:
for column in audit_data_frame.columns:
    audit_data_frame.select(column).describe().show()

+-------+------------------+
|summary|               _c0|
+-------+------------------+
|  count|             32111|
|   mean|16219.392980598548|
| stddev| 9333.034190894623|
|    min|                 0|
|    max|             32336|
+-------+------------------+

+-------+--------------------+
|summary|         PRIMARY_KEY|
+-------+--------------------+
|  count|               32111|
|   mean|                null|
| stddev|                null|
|    min|PACKAGE_2020-02-1...|
|    max|  PACKAGE_2319995064|
+-------+--------------------+

+-------+----------------------------+
|summary|AUDIT_CLASSIFICATION_CORRECT|
+-------+----------------------------+
|  count|                       32111|
|   mean|          0.9923702158138955|
| stddev|         0.08701612712820837|
|    min|                           0|
|    max|                           1|
+-------+----------------------------+

+-------+--------------------------+
|summary|AUDIT_CORRECTED_PACKAGE_ID|
+-------+-----------------------

+-------+-----------------------------+
|summary|INVOICE_VENDOR_PRODUCT_NUMBER|
+-------+-----------------------------+
|  count|                        24488|
|   mean|          2.752755590820742E9|
| stddev|         1.040340486191066...|
|    min|                           /6|
|    max|          it-vwg-cos-fr-fr-18|
+-------+-----------------------------+

+-------+--------------------------------+
|summary|INVOICE_PACKAGE_MATCH_CONFIDENCE|
+-------+--------------------------------+
|  count|                           18596|
|   mean|                84.8564524628949|
| stddev|              19.592832351997668|
|    min|                           43.55|
|    max|                           100.0|
+-------+--------------------------------+

+-------+---------------------------+
|summary|INVOICE_PACKAGE_MATCH_MODEL|
+-------+---------------------------+
|  count|                      18242|
|   mean|                       null|
| stddev|                       null|
|    min|           AUD

# feature extraction

In [54]:
from tokenizer import tokenize
from pyspark.sql.functions import split
spark_tokenize = spark_session.udf.register('tokenizer', tokenize)
audit_data_frame = audit_data_frame.withColumn('INVOICE_PACKAGE_DESCRIPTION_CLEANED',
                                               split(spark_tokenize('INVOICE_PACKAGE_DESCRIPTION'), " "))
audit_data_frame.head(1)

[Row(_c0=0, PRIMARY_KEY='PACKAGE_2020-02-10:761503452610:INV-66448:11:2347048832', AUDIT_CLASSIFICATION_CORRECT=1, AUDIT_CORRECTED_PACKAGE_ID=None, AUDIT_KEG_FLAG_CORRECT=1, AUDIT_NEW_PACKAGE=None, AUDIT_PACKAGE_CORRECT=0, AUDIT_PRODUCT_CORRECT=0, AUDIT_SUB_CLASSIFICATION_CORRECT=None, INVOICE_CASE_EQUIVALENT=0.166666555, INVOICE_ITEM_CLASSIFICATION='PRODUCT', INVOICE_ITEM_QUANTITY_PRICE=20.79, INVOICE_ITEM_SAC_AMOUNT=0.0, INVOICE_ITEM_SUB_CLASSIFICATION=None, INVOICE_ITEM_TOTAL_PRICE=41.58, INVOICE_LLQ=2, INVOICE_PACKAGE_DESCRIPTION='QUAGLIA FERNET 750ML', INVOICE_PROCESS_DATE='2021-01-25', INVOICE_RAW_PACKS_PER_CASE=0, INVOICE_RAW_QUANTITY=2.0, INVOICE_RAW_QUANTITY_PACKAGING='EA', INVOICE_SYSTEM_ID='2020-02-10:761503452610:INV-66448:11:2347048832', INVOICE_TAG_KEG_FLAG=False, INVOICE_UPC_CASE='||', INVOICE_UPC_PACK='|761503452610|', PACKAGE_DISPLAY_NAME='DUBAR FENETTI FERNET AMARGO 1/12 750 ML BOTTLE', PACKAGE_FDC_ID=3472809, UNIT_PRICE=20.79, DISTRIBUTOR_FDC_ID=None, INVOICE_VENDOR_

In [55]:
from pyspark.ml.feature import CountVectorizer, NGram, StringIndexer

In [56]:
ngram_generator = NGram(n=2, inputCol='INVOICE_PACKAGE_DESCRIPTION_CLEANED',
                        outputCol='INVOICE_PACKAGE_DESCRIPTION_NGRAM')
audit_data_frame = ngram_generator.transform(audit_data_frame)
audit_data_frame.head(1)

[Row(_c0=0, PRIMARY_KEY='PACKAGE_2020-02-10:761503452610:INV-66448:11:2347048832', AUDIT_CLASSIFICATION_CORRECT=1, AUDIT_CORRECTED_PACKAGE_ID=None, AUDIT_KEG_FLAG_CORRECT=1, AUDIT_NEW_PACKAGE=None, AUDIT_PACKAGE_CORRECT=0, AUDIT_PRODUCT_CORRECT=0, AUDIT_SUB_CLASSIFICATION_CORRECT=None, INVOICE_CASE_EQUIVALENT=0.166666555, INVOICE_ITEM_CLASSIFICATION='PRODUCT', INVOICE_ITEM_QUANTITY_PRICE=20.79, INVOICE_ITEM_SAC_AMOUNT=0.0, INVOICE_ITEM_SUB_CLASSIFICATION=None, INVOICE_ITEM_TOTAL_PRICE=41.58, INVOICE_LLQ=2, INVOICE_PACKAGE_DESCRIPTION='QUAGLIA FERNET 750ML', INVOICE_PROCESS_DATE='2021-01-25', INVOICE_RAW_PACKS_PER_CASE=0, INVOICE_RAW_QUANTITY=2.0, INVOICE_RAW_QUANTITY_PACKAGING='EA', INVOICE_SYSTEM_ID='2020-02-10:761503452610:INV-66448:11:2347048832', INVOICE_TAG_KEG_FLAG=False, INVOICE_UPC_CASE='||', INVOICE_UPC_PACK='|761503452610|', PACKAGE_DISPLAY_NAME='DUBAR FENETTI FERNET AMARGO 1/12 750 ML BOTTLE', PACKAGE_FDC_ID=3472809, UNIT_PRICE=20.79, DISTRIBUTOR_FDC_ID=None, INVOICE_VENDOR_

In [57]:
count_vec_1 = CountVectorizer(inputCol='INVOICE_PACKAGE_DESCRIPTION_CLEANED',outputCol='cnt_vec_1', minDF=4)
audit_data_frame = count_vec_1.fit(audit_data_frame).transform(audit_data_frame)
audit_data_frame.head(1)

[Row(_c0=0, PRIMARY_KEY='PACKAGE_2020-02-10:761503452610:INV-66448:11:2347048832', AUDIT_CLASSIFICATION_CORRECT=1, AUDIT_CORRECTED_PACKAGE_ID=None, AUDIT_KEG_FLAG_CORRECT=1, AUDIT_NEW_PACKAGE=None, AUDIT_PACKAGE_CORRECT=0, AUDIT_PRODUCT_CORRECT=0, AUDIT_SUB_CLASSIFICATION_CORRECT=None, INVOICE_CASE_EQUIVALENT=0.166666555, INVOICE_ITEM_CLASSIFICATION='PRODUCT', INVOICE_ITEM_QUANTITY_PRICE=20.79, INVOICE_ITEM_SAC_AMOUNT=0.0, INVOICE_ITEM_SUB_CLASSIFICATION=None, INVOICE_ITEM_TOTAL_PRICE=41.58, INVOICE_LLQ=2, INVOICE_PACKAGE_DESCRIPTION='QUAGLIA FERNET 750ML', INVOICE_PROCESS_DATE='2021-01-25', INVOICE_RAW_PACKS_PER_CASE=0, INVOICE_RAW_QUANTITY=2.0, INVOICE_RAW_QUANTITY_PACKAGING='EA', INVOICE_SYSTEM_ID='2020-02-10:761503452610:INV-66448:11:2347048832', INVOICE_TAG_KEG_FLAG=False, INVOICE_UPC_CASE='||', INVOICE_UPC_PACK='|761503452610|', PACKAGE_DISPLAY_NAME='DUBAR FENETTI FERNET AMARGO 1/12 750 ML BOTTLE', PACKAGE_FDC_ID=3472809, UNIT_PRICE=20.79, DISTRIBUTOR_FDC_ID=None, INVOICE_VENDOR_

In [58]:
count_vec_2 = CountVectorizer(inputCol='INVOICE_PACKAGE_DESCRIPTION_NGRAM',outputCol='cnt_vec_2', minDF=4)
audit_data_frame = count_vec_2.fit(audit_data_frame).transform(audit_data_frame)
audit_data_frame.head(1)

[Row(_c0=0, PRIMARY_KEY='PACKAGE_2020-02-10:761503452610:INV-66448:11:2347048832', AUDIT_CLASSIFICATION_CORRECT=1, AUDIT_CORRECTED_PACKAGE_ID=None, AUDIT_KEG_FLAG_CORRECT=1, AUDIT_NEW_PACKAGE=None, AUDIT_PACKAGE_CORRECT=0, AUDIT_PRODUCT_CORRECT=0, AUDIT_SUB_CLASSIFICATION_CORRECT=None, INVOICE_CASE_EQUIVALENT=0.166666555, INVOICE_ITEM_CLASSIFICATION='PRODUCT', INVOICE_ITEM_QUANTITY_PRICE=20.79, INVOICE_ITEM_SAC_AMOUNT=0.0, INVOICE_ITEM_SUB_CLASSIFICATION=None, INVOICE_ITEM_TOTAL_PRICE=41.58, INVOICE_LLQ=2, INVOICE_PACKAGE_DESCRIPTION='QUAGLIA FERNET 750ML', INVOICE_PROCESS_DATE='2021-01-25', INVOICE_RAW_PACKS_PER_CASE=0, INVOICE_RAW_QUANTITY=2.0, INVOICE_RAW_QUANTITY_PACKAGING='EA', INVOICE_SYSTEM_ID='2020-02-10:761503452610:INV-66448:11:2347048832', INVOICE_TAG_KEG_FLAG=False, INVOICE_UPC_CASE='||', INVOICE_UPC_PACK='|761503452610|', PACKAGE_DISPLAY_NAME='DUBAR FENETTI FERNET AMARGO 1/12 750 ML BOTTLE', PACKAGE_FDC_ID=3472809, UNIT_PRICE=20.79, DISTRIBUTOR_FDC_ID=None, INVOICE_VENDOR_

# add product_fdc_id

In [59]:
entity_package_data_frame = pandas.read_csv('../data/catalog_with_price.csv')

In [60]:
entity_package_data_frame.head()

Unnamed: 0,PACKAGE_FDC_ID,UPC_PACK,PACKAGE_ESD_DISPLAY_NAME,PRODUCT_NAME,ESD_PRODUCT_FDC_ID,BRAND_NAME,BRAND_FDC_ID,PRODUCT_CLASS,PRODUCT_TYPE,UNIT_PRICE,CASE_PRICE,PACK_PRICE
0,3462233,851538002444,10 BARREL PRAY FOR SNOW 1/6 BBL 5.1 GAL KEG,10 BARREL PRAY FOR SNOW,3792448,10 BARREL,4468240,BEER,,126.0,126.0,126.0
1,3462235,851538002451,10 BARREL PRAY FOR SNOW 1/2 BBL 15.5 GAL KEG,10 BARREL PRAY FOR SNOW,3792448,10 BARREL,4468240,BEER,,210.0,210.0,210.0
2,3414169,798449002111,ACE PERRY CIDER 4/6 12 OZ BOTTLE,ACE PERRY CIDER,3792483,ACE,4472408,BEER,,5.5,33.0,33.0
3,3474151,777770000608,ACE PERRY CIDER 1/2 BBL 15.5 GAL KEG,ACE PERRY CIDER,3792483,ACE,4472408,BEER,,210.0,210.0,210.0
4,3478299,798449002005,ACE PERRY CIDER 1/12 22 OZ BOTTLE,ACE PERRY CIDER,3792483,ACE,4472408,BEER,,3.0,36.0,36.0


In [61]:
package_id_product_id_map = entity_package_data_frame.dropna(subset=['PACKAGE_FDC_ID', 'ESD_PRODUCT_FDC_ID'])\
    .set_index('PACKAGE_FDC_ID')['ESD_PRODUCT_FDC_ID'].to_dict()

In [62]:
# package_id_product_id_map

In [63]:
from pyspark.sql.types import NullType
audit_data_frame = audit_data_frame.dropna(subset=['PACKAGE_FDC_ID'])
get_product_id = spark_session.udf.register('get_product_id',
    lambda package_id: package_id_product_id_map[package_id] \
    if package_id in package_id_product_id_map else NullType())
audit_data_frame = audit_data_frame.withColumn('PRODUCT_FDC_ID', get_product_id('PACKAGE_FDC_ID'))
audit_data_frame = audit_data_frame.dropna(subset=['PRODUCT_FDC_ID'])
audit_data_frame.head(1)

[Row(_c0=0, PRIMARY_KEY='PACKAGE_2020-02-10:761503452610:INV-66448:11:2347048832', AUDIT_CLASSIFICATION_CORRECT=1, AUDIT_CORRECTED_PACKAGE_ID=None, AUDIT_KEG_FLAG_CORRECT=1, AUDIT_NEW_PACKAGE=None, AUDIT_PACKAGE_CORRECT=0, AUDIT_PRODUCT_CORRECT=0, AUDIT_SUB_CLASSIFICATION_CORRECT=None, INVOICE_CASE_EQUIVALENT=0.166666555, INVOICE_ITEM_CLASSIFICATION='PRODUCT', INVOICE_ITEM_QUANTITY_PRICE=20.79, INVOICE_ITEM_SAC_AMOUNT=0.0, INVOICE_ITEM_SUB_CLASSIFICATION=None, INVOICE_ITEM_TOTAL_PRICE=41.58, INVOICE_LLQ=2, INVOICE_PACKAGE_DESCRIPTION='QUAGLIA FERNET 750ML', INVOICE_PROCESS_DATE='2021-01-25', INVOICE_RAW_PACKS_PER_CASE=0, INVOICE_RAW_QUANTITY=2.0, INVOICE_RAW_QUANTITY_PACKAGING='EA', INVOICE_SYSTEM_ID='2020-02-10:761503452610:INV-66448:11:2347048832', INVOICE_TAG_KEG_FLAG=False, INVOICE_UPC_CASE='||', INVOICE_UPC_PACK='|761503452610|', PACKAGE_DISPLAY_NAME='DUBAR FENETTI FERNET AMARGO 1/12 750 ML BOTTLE', PACKAGE_FDC_ID=3472809, UNIT_PRICE=20.79, DISTRIBUTOR_FDC_ID=None, INVOICE_VENDOR_

In [64]:
audit_data_frame.count()

31069

# data exploration

In [65]:
audit_data_frame.createOrReplaceTempView("table1")
spark_session.sql("""
select PRODUCT_FDC_ID from table1
group by PRODUCT_FDC_ID having count(*) > 100;
""").count()

35

In [66]:
product_row_list = spark_session.sql("""
select PRODUCT_FDC_ID from table1
group by PRODUCT_FDC_ID having count(*) > 100;
""").collect()

product_list = [row['PRODUCT_FDC_ID'] for row in product_row_list]
audit_data_frame = audit_data_frame.filter(audit_data_frame['PRODUCT_FDC_ID'].isin(product_list))
audit_data_frame.count()

10213

In [67]:
from pyspark.ml.feature import StringIndexer
str_indexer = StringIndexer(inputCol='PRODUCT_FDC_ID', outputCol='label')
audit_data_frame = str_indexer.fit(audit_data_frame).transform(audit_data_frame)
audit_data_frame.head(1)

[Row(_c0=10, PRIMARY_KEY='PACKAGE_2020-03-24:00320:553258:3:2332841234', AUDIT_CLASSIFICATION_CORRECT=1, AUDIT_CORRECTED_PACKAGE_ID=None, AUDIT_KEG_FLAG_CORRECT=1, AUDIT_NEW_PACKAGE=None, AUDIT_PACKAGE_CORRECT=1, AUDIT_PRODUCT_CORRECT=1, AUDIT_SUB_CLASSIFICATION_CORRECT=None, INVOICE_CASE_EQUIVALENT=1.999998239, INVOICE_ITEM_CLASSIFICATION='PRODUCT', INVOICE_ITEM_QUANTITY_PRICE=17.65, INVOICE_ITEM_SAC_AMOUNT=0.0, INVOICE_ITEM_SUB_CLASSIFICATION=None, INVOICE_ITEM_TOTAL_PRICE=35.3, INVOICE_LLQ=48, INVOICE_PACKAGE_DESCRIPTION='BUD 2/12 CAN', INVOICE_PROCESS_DATE='2021-01-13', INVOICE_RAW_PACKS_PER_CASE=2, INVOICE_RAW_QUANTITY=2.0, INVOICE_RAW_QUANTITY_PACKAGING='CA', INVOICE_SYSTEM_ID='2020-03-24:00320:553258:3:2332841234', INVOICE_TAG_KEG_FLAG=False, INVOICE_UPC_CASE='|00018200110474|', INVOICE_UPC_PACK='|00018200110474|', PACKAGE_DISPLAY_NAME='BUDWEISER 2/12 12 OZ CAN', PACKAGE_FDC_ID=3435433, UNIT_PRICE=None, DISTRIBUTOR_FDC_ID=None, INVOICE_VENDOR_PRODUCT_NUMBER=None, INVOICE_PACKAGE

In [68]:
from pyspark.ml.feature import VectorAssembler
vec_assembler = VectorAssembler(inputCols=['cnt_vec_1', 'cnt_vec_2'], outputCol='features')
audit_data_frame = vec_assembler.transform(audit_data_frame)
audit_data_frame.head(1)

[Row(_c0=10, PRIMARY_KEY='PACKAGE_2020-03-24:00320:553258:3:2332841234', AUDIT_CLASSIFICATION_CORRECT=1, AUDIT_CORRECTED_PACKAGE_ID=None, AUDIT_KEG_FLAG_CORRECT=1, AUDIT_NEW_PACKAGE=None, AUDIT_PACKAGE_CORRECT=1, AUDIT_PRODUCT_CORRECT=1, AUDIT_SUB_CLASSIFICATION_CORRECT=None, INVOICE_CASE_EQUIVALENT=1.999998239, INVOICE_ITEM_CLASSIFICATION='PRODUCT', INVOICE_ITEM_QUANTITY_PRICE=17.65, INVOICE_ITEM_SAC_AMOUNT=0.0, INVOICE_ITEM_SUB_CLASSIFICATION=None, INVOICE_ITEM_TOTAL_PRICE=35.3, INVOICE_LLQ=48, INVOICE_PACKAGE_DESCRIPTION='BUD 2/12 CAN', INVOICE_PROCESS_DATE='2021-01-13', INVOICE_RAW_PACKS_PER_CASE=2, INVOICE_RAW_QUANTITY=2.0, INVOICE_RAW_QUANTITY_PACKAGING='CA', INVOICE_SYSTEM_ID='2020-03-24:00320:553258:3:2332841234', INVOICE_TAG_KEG_FLAG=False, INVOICE_UPC_CASE='|00018200110474|', INVOICE_UPC_PACK='|00018200110474|', PACKAGE_DISPLAY_NAME='BUDWEISER 2/12 12 OZ CAN', PACKAGE_FDC_ID=3435433, UNIT_PRICE=None, DISTRIBUTOR_FDC_ID=None, INVOICE_VENDOR_PRODUCT_NUMBER=None, INVOICE_PACKAGE

# train test split

In [69]:
final_data = audit_data_frame[['features', 'label']]
final_data.head(1)

[Row(features=SparseVector(8163, {2: 1.0, 3: 1.0, 14: 1.0, 2811: 1.0, 2998: 1.0}), label=4.0)]

In [70]:
train_data, test_data = final_data.randomSplit([0.7, 0.3])

# model training

In [71]:
from pyspark.ml.classification import NaiveBayes

In [72]:
model = NaiveBayes()
model = model.fit(train_data)

# model evaluation

In [73]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [74]:
acc_eval = MulticlassClassificationEvaluator()

In [75]:
test_results = model.transform(test_data)

In [76]:
test_results = test_results.filter(test_results['prediction'] > 0)

In [77]:
test_results.count()

2541

In [78]:
print('F1')
acc_eval.evaluate(test_results)

F1


0.924552968780588

In [79]:
print('accuracy')
acc_eval.evaluate(test_results, {acc_eval.metricName: "accuracy"})

accuracy


0.9287682014954742