In [47]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/home/ec2-user/spark-2.4.4-bin-hadoop2.7"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [48]:
from pyspark.sql import SparkSession

MAX_MEMORY = "5g"

# Creating the SparkSession object

spark = SparkSession.builder \
                    .appName('apriori')\
                    .config("spark.executor.memory", MAX_MEMORY) \
                    .config("spark.driver.memory", MAX_MEMORY) \
                    .getOrCreate()

In [49]:
##STRATEGY 1

In [50]:
#fetching assocation rule from mycsv.csv( saved earlier)
from pyspark.sql.functions import col
association_rules = spark.read.csv("mycsv.csv", header=True, inferSchema=True)

In [51]:
association_rules = association_rules.drop('_c0')
association_rules = association_rules.sort(col("Confidence").desc())

In [52]:
association_rules.show()

+--------------------+-----------------+-------------------+
|          Antecedent|       Consequent|         Confidence|
+--------------------+-----------------+-------------------+
|['ground beef', '...|['mineral water']| 0.5066666666666667|
|['ground beef', '...|['mineral water']|  0.503030303030303|
|['ground beef', '...|['mineral water']|0.47398843930635837|
|['milk', 'frozen ...|['mineral water']|0.46892655367231634|
|            ['soup']|['mineral water']|0.45646437994722955|
|['spaghetti', 'pa...|['mineral water']|  0.455026455026455|
|['spaghetti', 'ol...|['mineral water']| 0.4476744186046512|
|['spaghetti', 'mi...|['mineral water']|0.44360902255639095|
|['milk', 'chocola...|['mineral water']|0.43568464730290457|
|['spaghetti', 'gr...|['mineral water']| 0.4353741496598639|
|['spaghetti', 'fr...|['mineral water']|0.43062200956937796|
|    ['milk', 'eggs']|['mineral water']|0.42424242424242425|
|       ['olive oil']|['mineral water']| 0.4190283400809717|
|['ground beef', '...|  

In [53]:
#load the april transaction
import ast as ast
apr_df = spark.read.csv("transactions_apr.csv")
apr_lst = []
for row in apr_df.rdd.collect():
    apr_lst.append(ast.literal_eval(row[0]))

In [54]:
apr_lst

[['burgers', 'avocado'],
 ['low fat yogurt', 'salmon'],
 ['low fat yogurt'],
 ['olive oil'],
 ['turkey', 'salmon'],
 ['low fat yogurt', 'milk'],
 ['green tea'],
 ['shrimp', 'salmon'],
 ['french fries', 'cooking oil'],
 ['avocado'],
 ['red wine', 'chocolate'],
 ['champagne', 'milk'],
 ['frozen smoothie', 'fresh bread'],
 ['herb & pepper', 'green tea'],
 ['mineral water', 'burgers', 'chocolate']]

In [55]:
#Find the recommended item(Consequent)
recommended_records = []
for apr_tran in apr_lst:
    itemset = set(apr_tran)
    found = False
    recommended_item = ''
    for row in association_rules.collect():
        antecedent_list = (ast.literal_eval(row.asDict()['Antecedent']))
        antecedent_as_set = set(antecedent_list)
        if itemset == antecedent_as_set:
            #print(row)
            recommended_item = row
            found = True
            break
        
    if not found:
        #find the item with maximum confidence
        max_conf = 0
        for item in apr_tran:
            single_item_as_set = set([item])
            for row in association_rules.collect():
                trans_item = (ast.literal_eval(row.asDict()['Antecedent']))
                ruleset = set(trans_item)
                #print(single_item_as_set, ruleset)
                if single_item_as_set == ruleset:
                    confidence = row.asDict()['Confidence']
                    if(max_conf < confidence):
                        max_conf = confidence
                        recommended_item = row
    recommended_records.append(recommended_item)

In [56]:
recommended_records

[Row(Antecedent="['avocado']", Consequent="['mineral water']", Confidence=0.348),
 Row(Antecedent="['salmon']", Consequent="['mineral water']", Confidence=0.4012539184952978),
 Row(Antecedent="['low fat yogurt']", Consequent="['mineral water']", Confidence=0.313588850174216),
 Row(Antecedent="['olive oil']", Consequent="['mineral water']", Confidence=0.4190283400809717),
 Row(Antecedent="['salmon']", Consequent="['mineral water']", Confidence=0.4012539184952978),
 Row(Antecedent="['milk']", Consequent="['mineral water']", Confidence=0.3703703703703704),
 Row(Antecedent="['green tea']", Consequent="['mineral water']", Confidence=0.23511604439959638),
 Row(Antecedent="['salmon']", Consequent="['mineral water']", Confidence=0.4012539184952978),
 Row(Antecedent="['cooking oil']", Consequent="['mineral water']", Confidence=0.3942558746736292),
 Row(Antecedent="['avocado']", Consequent="['mineral water']", Confidence=0.348),
 Row(Antecedent="['red wine']", Consequent="['mineral water']", Con

In [58]:
#association rule for recommendation
for row in recommended_records:
    print(row.asDict()['Antecedent'])


['avocado']
['salmon']
['low fat yogurt']
['olive oil']
['salmon']
['milk']
['green tea']
['salmon']
['cooking oil']
['avocado']
['red wine']
['milk']
['frozen smoothie']
['herb & pepper']
['burgers']


In [59]:
item_details_df = spark.read.csv('item_details.csv', inferSchema=True, header=True)

In [60]:
item_details_df.show()

+-----------------+------------------+---------------+----------------+----+----+
|             Item|Selling Price/unit|Cost Price/unit|Share of Revenue| _c4| _c5|
+-----------------+------------------+---------------+----------------+----+----+
|          avocado|                90|             60|            1.55|null|null|
|          burgers|                90|             60|            2.27|null|null|
|             cake|               150|             70|            3.79|null|null|
|          cereals|                80|             40|            3.11|null|null|
|        champagne|               230|            170|            2.34|null|null|
|          chicken|                50|             40|            5.12|null|null|
|        chocolate|                30|             21|             1.3|null|null|
|          cookies|                25|             15|            1.34|null|null|
|      cooking oil|               160|            130|            4.37|null|null|
|             eg

In [61]:
item_details_df = item_details_df.drop('_c4')

In [62]:
item_details_df = item_details_df.drop('_c5')

In [63]:
item_details_df.show()

+-----------------+------------------+---------------+----------------+
|             Item|Selling Price/unit|Cost Price/unit|Share of Revenue|
+-----------------+------------------+---------------+----------------+
|          avocado|                90|             60|            1.55|
|          burgers|                90|             60|            2.27|
|             cake|               150|             70|            3.79|
|          cereals|                80|             40|            3.11|
|        champagne|               230|            170|            2.34|
|          chicken|                50|             40|            5.12|
|        chocolate|                30|             21|             1.3|
|          cookies|                25|             15|            1.34|
|      cooking oil|               160|            130|            4.37|
|             eggs|                65|             40|            5.56|
|         escalope|                85|             60|          

In [64]:
#total april revenue without recommendation
apr_price = []
for transaction in apr_lst:
    total_sales = 0;
    for item in transaction:
        total_sales += item_details_df.filter(col('Item') == item).first().asDict()['Selling Price/unit']
    apr_price.append(total_sales)
    print(transaction," ", total_sales)

['burgers', 'avocado']   180
['low fat yogurt', 'salmon']   75
['low fat yogurt']   15
['olive oil']   180
['turkey', 'salmon']   170
['low fat yogurt', 'milk']   65
['green tea']   50
['shrimp', 'salmon']   130
['french fries', 'cooking oil']   210
['avocado']   90
['red wine', 'chocolate']   210
['champagne', 'milk']   280
['frozen smoothie', 'fresh bread']   70
['herb & pepper', 'green tea']   75
['mineral water', 'burgers', 'chocolate']   140


In [65]:
def getExpectedIncreasedSalesOfAItem(confidence):
    if confidence >= 0.0 and confidence <=0.3:
        return 0.00
    elif confidence >= 0.31 and confidence <=0.4:
        return 0.05
    elif confidence >= 0.41 and confidence <=0.5:
        return 0.08
    elif confidence >= 0.51 and confidence <=0.6:
        return 0.10
    elif confidence >= 0.61 and confidence <=0.7:
        return 0.12
    elif confidence >= 0.71 and confidence <=0.8:
        return 0.15
    elif confidence >= 0.81 and confidence <=0.9:
        return 0.20
    elif confidence >= 0.91 and confidence <=1.0:
        return 0.25
    else:
        return 0.00

In [66]:
#Recommended item for increased sale calculation
may_recommended_item_sale = []
lst_recommended_items = []
lst_expected_perc = []
for row in recommended_records:
    new_items = ast.literal_eval(row.asDict()['Consequent'])
    lst_recommended_items.append(new_items)
    confidence = row.asDict()['Confidence']
    additional_sales = 0.0
    expected_perc = 0.0
    for item in new_items:
        expected_perc = expected_perc + getExpectedIncreasedSalesOfAItem(round(confidence, 2))
        selling_price = item_details_df.filter(col('Item') == item).first().asDict()['Selling Price/unit']
        additional_sales = additional_sales + (selling_price * getExpectedIncreasedSalesOfAItem(round(confidence, 2)))
    may_recommended_item_sale.append(additional_sales)
    lst_expected_perc.append(expected_perc)
print(may_recommended_item_sale)
print('expected perc', lst_expected_perc)

[1.0, 1.0, 1.0, 1.6, 1.0, 1.0, 0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 3.25]
expected perc [0.05, 0.05, 0.05, 0.08, 0.05, 0.05, 0.0, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05]


In [67]:
#show may transaction if recommendation was there
i = 0
for i in range(len(apr_price)):
    apr_lst[i].extend(lst_recommended_items[i])
    print(apr_lst[i]," ", apr_price[i] + may_recommended_item_sale[i])

['burgers', 'avocado', 'mineral water']   181.0
['low fat yogurt', 'salmon', 'mineral water']   76.0
['low fat yogurt', 'mineral water']   16.0
['olive oil', 'mineral water']   181.6
['turkey', 'salmon', 'mineral water']   171.0
['low fat yogurt', 'milk', 'mineral water']   66.0
['green tea', 'mineral water']   50.0
['shrimp', 'salmon', 'mineral water']   131.0
['french fries', 'cooking oil', 'mineral water']   211.0
['avocado', 'mineral water']   91.0
['red wine', 'chocolate', 'mineral water']   211.0
['champagne', 'milk', 'mineral water']   281.0
['frozen smoothie', 'fresh bread', 'mineral water']   71.0
['herb & pepper', 'green tea', 'mineral water']   76.0
['mineral water', 'burgers', 'chocolate', 'eggs']   143.25


In [68]:
#Revennue for strategy 1
after_MBA = sum(apr_price) + sum(may_recommended_item_sale) 
Actual_sales = sum(apr_price)
after_MBA - Actual_sales
    

16.84999999999991

In [69]:
##STRATEGY 2

In [70]:
#arrange associated rules in ascending order for min confidence
association_rules = association_rules.sort(col("Confidence").asc())

In [71]:
association_rules.show()

+--------------------+--------------------+-------------------+
|          Antecedent|          Consequent|         Confidence|
+--------------------+--------------------+-------------------+
|       ['chocolate']|       ['olive oil']| 0.1000813669650122|
|   ['mineral water']|  ['low fat yogurt']|0.10067114093959731|
|            ['milk']|['eggs', 'mineral...|0.10082304526748971|
|            ['milk']|  ['low fat yogurt']|0.10185185185185186|
|   ['mineral water']|         ['burgers']|0.10234899328859061|
|   ['mineral water']|        ['tomatoes']|0.10234899328859061|
|            ['milk']|            ['cake']|0.10288065843621401|
|     ['ground beef']|['eggs', 'mineral...|0.10312075983717775|
|       ['spaghetti']|            ['cake']|0.10413476263399693|
|       ['chocolate']|         ['burgers']|0.10414971521562244|
|    ['french fries']|            ['cake']| 0.1045241809672387|
|            ['eggs']|            ['cake']|0.10608308605341245|
|['frozen vegetabl...|  ['low fat yogurt

In [72]:
#load april transaction
#load the april transaction
import ast as ast
apr_df = spark.read.csv("transactions_apr.csv")
apr_lst = []
for row in apr_df.rdd.collect():
    apr_lst.append(ast.literal_eval(row[0]))

In [73]:
#Find the recommended item(Consequent)
recommended_records = []
for apr_tran in apr_lst:
    itemset = set(apr_tran)
    found = False
    recommended_item = ''
    for row in association_rules.collect():
        antecedent_list = (ast.literal_eval(row.asDict()['Antecedent']))
        antecedent_as_set = set(antecedent_list)
        if itemset == antecedent_as_set:
            #print(row)
            recommended_item = row
            found = True
            break
        
    if not found:
        #find the item with maximum confidence
        min_conf = 999
        for item in apr_tran:
            single_item_as_set = set([item])
            for row in association_rules.collect():
                trans_item = (ast.literal_eval(row.asDict()['Antecedent']))
                ruleset = set(trans_item)
                #print(single_item_as_set, ruleset)
                if single_item_as_set == ruleset:
                    confidence = row.asDict()['Confidence']
                    if(min_conf > confidence):
                        min_conf = confidence
                        recommended_item = row
    recommended_records.append(recommended_item)

In [74]:
#Consequents with low confidence score for discounting
recommended_records

[Row(Antecedent="['burgers']", Consequent="['pancakes']", Confidence=0.12079510703363913),
 Row(Antecedent="['low fat yogurt']", Consequent="['frozen vegetables']", Confidence=0.13240418118466898),
 Row(Antecedent="['low fat yogurt']", Consequent="['frozen vegetables']", Confidence=0.13240418118466898),
 Row(Antecedent="['olive oil']", Consequent="['spaghetti', 'mineral water']", Confidence=0.15587044534412955),
 Row(Antecedent="['turkey']", Consequent="['burgers']", Confidence=0.1705756929637527),
 Row(Antecedent="['milk']", Consequent="['eggs', 'mineral water']", Confidence=0.10082304526748971),
 Row(Antecedent="['green tea']", Consequent="['cake']", Confidence=0.10696266397578205),
 Row(Antecedent="['shrimp']", Consequent="['pancakes']", Confidence=0.14738805970149252),
 Row(Antecedent="['french fries']", Consequent="['cake']", Confidence=0.1045241809672387),
 Row(Antecedent="['avocado']", Consequent="['mineral water']", Confidence=0.348),
 Row(Antecedent="['chocolate']", Consequent

In [75]:
#print Consequent or discount item
lst_discount_item = []
for row in recommended_records:
    lst_discount_item.append(ast.literal_eval(row.asDict()['Consequent']))
    print(row.asDict()['Consequent'])

['pancakes']
['frozen vegetables']
['frozen vegetables']
['spaghetti', 'mineral water']
['burgers']
['eggs', 'mineral water']
['cake']
['pancakes']
['cake']
['mineral water']
['olive oil']
['eggs', 'mineral water']
['eggs']
['cake']
['olive oil']


In [76]:
#print Confidence
for row in recommended_records:
    print(row.asDict()['Confidence'])

0.12079510703363913
0.13240418118466898
0.13240418118466898
0.15587044534412955
0.1705756929637527
0.10082304526748971
0.10696266397578205
0.14738805970149252
0.1045241809672387
0.348
0.1000813669650122
0.10082304526748971
0.17473684210526316
0.10696266397578205
0.1000813669650122


In [77]:
def getExpectedIncreasedSalesOfAItemForDiscounting(confidence):
    if confidence >= 0.0 and confidence <=0.1:
        return 0.00
    elif confidence >= 0.11 and confidence <=0.2:
        return 0.05
    elif confidence >= 0.21 and confidence <=0.3:
        return 0.08
    elif confidence >= 0.31 and confidence <=0.4:
        return 0.10
    elif confidence >= 0.41 and confidence <=0.5:
        return 0.12
    elif confidence >= 0.51 and confidence <=0.6:
        return 0.15
    elif confidence >= 0.61 and confidence <=0.7:
        return 0.18
    elif confidence >= 0.71 and confidence <=0.8:
        return 0.20
    elif confidence >= 0.81 and confidence <=0.9:
        return 0.25
    elif confidence >= 0.91 and confidence <=1.0:
        return 0.30
    else:
        return 0.00

In [78]:
#print expected increase
for row in recommended_records:
    print(getExpectedIncreasedSalesOfAItemForDiscounting(row.asDict()['Confidence']))

0.05
0.05
0.05
0.05
0.05
0.0
0.0
0.05
0.0
0.1
0.0
0.0
0.05
0.0
0.0


In [79]:
#print Antecedent
for row in recommended_records:
    print(row.asDict()['Antecedent'])

['burgers']
['low fat yogurt']
['low fat yogurt']
['olive oil']
['turkey']
['milk']
['green tea']
['shrimp']
['french fries']
['avocado']
['chocolate']
['milk']
['frozen smoothie']
['green tea']
['chocolate']


In [80]:
# discounting price that could be added in the april transaction to get may transaction 
lst_discount_perc = []
lst_additional_revenue = []
for row in recommended_records:
    new_items = ast.literal_eval(row.asDict()['Consequent'])
    lst_recommended_items.append(new_items)
    confidence = row.asDict()['Confidence']
    expected_increase = getExpectedIncreasedSalesOfAItemForDiscounting(round(confidence, 2))
    additional_revenue = 0.0
    discount_items = []
    for item in new_items:      
        cost_price = item_details_df.filter(col('Item') == item).first().asDict()['Cost Price/unit']
        selling_price = item_details_df.filter(col('Item') == item).first().asDict()['Selling Price/unit']
        share_revenue = item_details_df.filter(col('Item') == item).first().asDict()['Share of Revenue']
        profit_ratio = (selling_price - cost_price) / selling_price
        discount_factor = (1.0 / share_revenue) * profit_ratio
        discount_percentage = discount_factor * (1.0 / confidence)
        discount_items.append(discount_percentage)
        additional_revenue = additional_revenue + ((expected_increase * selling_price) - (1-discount_percentage))
        #print('item ', item , 'discounting Percentage ', discount_percentage)
    lst_additional_revenue.append(additional_revenue)
    lst_discount_perc.append(discount_items)

In [81]:
#discount for discounted item
lst_discount_perc

[[1.8937701662943662],
 [0.7840101985066472],
 [0.7840101985066472],
 [0.5148944153759564, 1.329195666212241],
 [0.8608663729809102],
 [0.6861073150898435, 2.0549103619348297],
 [1.3156104279716563],
 [1.5520807479109788],
 [1.3463027868572701],
 [0.5953514955229569],
 [3.2653169668951594],
 [0.6861073150898435, 2.0549103619348297],
 [0.39588347857395273],
 [1.3156104279716563],
 [3.2653169668951594]]

In [82]:
#additional revenue for discount items
lst_additional_revenue

[5.893770166294367,
 2.284010198506647,
 2.284010198506647,
 4.344090081588197,
 4.36086637298091,
 0.7410176770246731,
 7.815610427971656,
 5.552080747910979,
 0.3463027868572701,
 1.5953514955229569,
 2.2653169668951594,
 0.7410176770246731,
 2.6458834785739525,
 7.815610427971656,
 2.2653169668951594]

In [83]:
# sum april transaction and addtional discounting revenue
apr_price

[180, 75, 15, 180, 170, 65, 50, 130, 210, 90, 210, 280, 70, 75, 140]

In [84]:
for i in range(len(apr_price)):
    print(apr_price[i] + lst_additional_revenue[i])

185.89377016629436
77.28401019850665
17.284010198506646
184.3440900815882
174.36086637298092
65.74101767702467
57.815610427971656
135.552080747911
210.34630278685728
91.59535149552296
212.26531696689517
280.7410176770247
72.64588347857395
82.81561042797165
142.26531696689517


In [86]:
#total revenue is april_transaction + addition revenue of recomendation item + additional revenue of discounting item
lst_total_revenue_after_strategy1_and_2 = []
for i in range(len(apr_price)):
    lst_total_revenue_after_strategy1_and_2.append(apr_price[i] + may_recommended_item_sale[i] + lst_additional_revenue[i])
    print(apr_price[i] + may_recommended_item_sale[i] + lst_additional_revenue[i])

186.89377016629436
78.28401019850665
18.284010198506646
185.9440900815882
175.36086637298092
66.74101767702467
57.815610427971656
136.552080747911
211.34630278685728
92.59535149552296
213.26531696689517
281.7410176770247
73.64588347857395
83.81561042797165
145.51531696689517


In [87]:
sum(lst_total_revenue_after_strategy1_and_2) - sum(apr_price)

67.80025567052508

# Comment
### There is net increase revenue of 67.80 after implmenting strategy 1 and strategy 2. Company should go for it..
