In [1]:
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import *

conf = SparkConf().setAppName("upsell-fortnight")
conf = (conf.setMaster('local[*]')
        .set('spark.executor.memory', '4G')
        .set('spark.driver.memory', '8G')
        .set('spark.driver.maxResultSize', '8G')
         .set('spark.debug.maxToStringFields', '200'))
sc = pyspark.SparkContext(conf=conf)

#sc = pyspark.SparkContext(appName="Pi")
sqlcontext = SQLContext(sc)
sqlcontext.sql("set spark.sql.shuffle.partitions=10")
import pyspark.sql.functions as F
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import *
from pyspark.ml.feature import *
from pyspark.sql import Row
from pyspark.sql.window import Window

In [2]:
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.classification import MultilayerPerceptronClassifier,MultilayerPerceptronClassificationModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.sql.types import _infer_schema, IntegerType, DateType, DoubleType
from pyspark.ml.feature import VectorAssembler, CountVectorizer
from pyspark.sql import types as T

In [3]:
import numpy as np
import pickle as pk
import pandas as pd
import csv
import json
import sys
import os
import shutil
import tempfile
import time
from datetime import date, timedelta
today = (date.today())
sys.version

'3.6.5 (v3.6.5:f59c0932b4, Mar 28 2018, 17:00:18) [MSC v.1900 64 bit (AMD64)]'

<font size="6" face="calibri">
    <p><b>DATA & ITEM</b></p>
    <p></p>
</font>

<font size= "3" color='blue' face="verdana">
    <p><i> 1. need the data  and the Item at this stage.</i> </p>
   <p><i> 2. Join the two (inner ?) by "" to get corresponding details for an item from Item Master </i></p>
   <p><i> 3. have the number, number (and other broader categories), units and other details</i> </p>
   <p><i> 4. create a fortnight feature which denotes a single integer for every fortnight till date.</i></p>
   <p><i> 5. create/train an ML model for every item. For one item, the train data will be for every single customer that bought (or didn't) that item.</i></p>
</font>


<font color='green' size='4'> Convert sales excel files to pyspark through Pandas </font>

<font color='green' size='4'> Save CSVs as parquet </font>

In [4]:
sales_data = sqlcontext.read.parquet('<DATA>')

In [1]:
sales_data.count()

<font color="green" size="4"><p>Incoming item master will be an Excel - convert it to spark df</p><p>Change: couldn't read xls format</p></font>

In [None]:
IM = sqlcontext.read.csv('IM.csv',header=True)

In [None]:
shutil.rmtree('item')
time.sleep(5)

In [None]:
IM.write.parquet('item')

<font color='green' size='4'> Creating Fortnights and saving them separately <font>

In [None]:
def CreateFortnight(salesData:"pyspark.sql.dataframe.Dataframe", today:"string", dayGap:"Integer"):
    
    #sales_data = salesData.withColumn('OnlyDate',F.split( F.date_format(F.to_date(F.col("TRANSACTION_DATE"), "MM/dd/yy"), "yyyy-MM-dd hh:mm:ss")," ")\
    #                                .getItem(0)).drop('TRANSACTION_DATE')
    
    sales_data = salesData.withColumn('OnlyDate',F.split( F.col("TRANSACTION_DATE")," ")\
                                    .getItem(0)).drop('TRANSACTION_DATE')
    #sales_data.show()
    maxDateLTtoday = sales_data.where( F.col("OnlyDate")<=F.lit(str(today)) ).select(F.max("OnlyDate").alias('mD')).collect()[0]
    
    if(maxDateLTtoday.mD):
        tmpDate = date(int(maxDateLTtoday.mD.split('-')[0]),int(maxDateLTtoday.mD.split('-')[1]),int(maxDateLTtoday.mD.split('-')[2]))
    else:
        return "There is no data before the given date"
    
    tmpDateGap = tmpDate - timedelta(days = dayGap)
    
    salesDataBeforeToday = sales_data.filter( (F.col("OnlyDate")<=(F.lit(str(tmpDate)))) & (F.col("OnlyDate")>(F.lit(str(tmpDateGap)))) )
    
    ct=1
    while len(salesDataBeforeToday.head(1))>0:
        
        #This can be any number but restricting data to past 24 fortnights
        if(ct==24):break
        
        salesDataBeforeToday = salesDataBeforeToday.withColumn('fortnight',F.lit(ct))
        salesDataBeforeToday = salesDataBeforeToday.where(F.col('UNITS_NET')>0)
        
        salesDataBeforeToday.write.parquet('Sales_data_with_latest_fortnights/'+str(ct))
        
        print("Time period: "+str(tmpDateGap)+" To "+str(tmpDate))
        
        tmpDate = tmpDateGap
        tmpDateGap = tmpDate - timedelta(days = dayGap)
        
        salesDataBeforeToday = sales_data.filter( (F.col("OnlyDate")<=(F.lit(str(tmpDate)))) & (F.col("OnlyDate")>(F.lit(str(tmpDateGap)))) )
        
        ct+=1
    

In [None]:
shutil.rmtree('S_with_latest_fortnights')
time.sleep(5)
CreateFortnight(sales_data, date(2019,1,31), 15)

<font color='green' size='4'> Get Item and set the product value field <font>

In [6]:
item_master = sqlcontext.read.parquet('item_r')

In [7]:
#This field will be used for training
product_value = 'abcd'

In [8]:
item_master = item_master.withColumnRenamed(product_value,'product_value')

In [5]:
item_master.count()
#item_master.where(F.col('product_value').isNull()).count()
#i = item_master.groupBy('ITEM_NO').agg(F.collect_set('product_value').alias('listt'))

In [10]:
item_master = item_master.orderBy('product_value',ascending=False).dropDuplicates(subset=['ITEM_NO'])\
.withColumn('product_value',F.when(F.col('product_value').isNull(),F.lit(F.col('ITEM_NO'))).otherwise(F.lit(F.col('product_value'))))

<font color='green' size='4'> 
    <p>Read the Sale Data</p>
<font>

In [11]:
SALES = sqlcontext.read.parquet('Sales_data_with_latest_fortnights/**/*')

In [12]:
SALES = SALES.withColumn('year',F.split(SALES['OnlyDate'],"-").getItem(0))
SALES = SALES.withColumn('month',F.split(SALES['OnlyDate'],"-").getItem(1))

In [None]:
SALES.write.partitionBy('year','month','fortnight').parquet("SALES_")

<font color='green' size='4'> 
    <p>Connect to item master</p>
<font>

In [2]:
SALES.count()

In [14]:
sales_item_master = SALES.join(item_master,'ITEM_NO')

In [3]:
sales_item_master.count()

<font color='green' size='4'> 
    <p>Save it</p>
<font>

In [16]:
shutil.rmtree('sales_item_master')
time.sleep(5)
sales_item_master.write.parquet('sales_item_master')

<font color='green' size='4'> 
    <p>Fortnight as features</p>
<font>

In [59]:
sales_item_master = sqlcontext.read.parquet('sales_item_master')
shutil.rmtree('sales_features_fortnight_only')
time.sleep(5)

In [54]:
sales_item_master_ = sales_item_master\
.withColumn('fortnight',F.col('fortnight').cast(StringType()))

In [55]:
sales_item_master_ = sales_item_master_.select('*')\
.groupBy('CUST_SHIP_TO_NO')\
.agg(F.collect_set('fortnight').alias('fortnight_array'))

In [60]:
#sales_item_master_.show(truncate=False)

In [None]:
#sales_item_master_ = sales_item_master_.withColumn('fortnight_array',F.array(F.col('fortnight')))

In [None]:
#cv_fortnight = CountVectorizer(inputCol='fortnight_array', outputCol='fortnight_feature')

In [None]:
#cv_f = cv_fortnight.fit(sales_item_master_)

In [None]:
#sales_item_master_ = cv_f.transform(sales_item_master_).drop('fortnight_array')

In [57]:
sales_item_master_.write.parquet('customer_fortnights')

<font size='4' color='green'> 
      <p>End of this script, below is all testing </p>
</font>

In [None]:
#sales_item_master_.show()

In [4]:
#sales_item_master_.show()

In [None]:
#sales_data1 = sales_data.withColumn('FF',F.split( F.date_format(F.to_date(F.col("TRANSACTION_DATE"), "MM/dd/yy"), "yyyy-MM-dd hh:mm:ss")," ")\
#                                    .getItem(0))\
#.withColumn("year",F.year('FF')).withColumn("month",F.month('FF'))

In [None]:
#sales_data11 = sales_data1.withColumn('FFF',F.when( (F.split(sales_data1['FF'],"-").getItem(2))<=15,
#                                                   ( str(F.split(sales_data1['FF'],"-").getItem(0))+str("_15") ))\
#                                      .otherwise( str(F.split(sales_data1['FF'],"-").getItem(0))+str("_30")))

#sales_data11 = sales_data1.withColumn('FFF',F.when( (F.split(sales_data1['FF'],"-").getItem(2))<=15,
#                                                   (F.concat( F.col('year'),F.lit("_"),F.col('month'),F.lit("_15") )))\
#                                      .otherwise( F.concat( F.col('year'),F.lit("_"),F.col('month'),F.lit("_30") )) )

In [None]:
#sales_data = sales_data11.drop('TRANSACTION_DATE','FF','year','month').withColumnRenamed('FFF','fortnight')

In [None]:
#sales_data = sales_data.where(F.col('UNITS_NET')>0).drop('TRANSACTION_DATE').dropDuplicates()

In [None]:
#sales_data.show(truncate=False)