## Objectives

 - Calculate <font color='crimson'>pairwise substitutability score</font> using spark with <font color='crimson'>demo data</font>

### High-level steps

- Derive <font color='crimson'>pairwise product combination</font> per member basket <font color='BlueViolet'>[basket]</font> and member lifetime <font color='BlueViolet'>[lifetime]</font>
- Calculate <font color='crimson'>difference between unique members count</font> in <font color='BlueViolet'>[lifetime]</font> and <font color='BlueViolet'>[basket]</font> [Unique member for those who bought product A and product B in <font color='crimson'>different basket</font>]
- Calculate <font color='crimson'>substitutability score</font> by 

    <br>
    <font size = "4">Count<sub> productA+productB (Diff basket)</sub> / (Count<sub> productA</sub> + Count<sub> productB</sub> - Count<sub> productA + productB (Diff basket)</sub> - Count<sub> productA + productB (Same basket)</sub>)</font>
    <br/>
    <br/>
    <br/>
    <font size = "2" color='darkgrey'>where Count is the no of unique members in the group specified in the subscript</font>

### Detailed steps

- <font color='crimson'>Transaction data</font>
<br/>
                Member  OrderNumber  Product
                111        222           2
                111        222           3
                111        333           3
                111        555           4
                222        666           6
                222        666           7
                222        666           8
                333        777           3
                333        888           4
<br/>                
- Group product list <font color='crimson'>[Per Member, per order]</font>
<br>
                Member  OrderNumber  ProductList
                111        222           [2,3]
                111        333           [3]
                111        555           [4]
                222        666           [6,7,8]
                333        777           [3]
                333        888           [4]
<br>
- Split product list with <font color='crimson'>pairwise combination</font> (Sort first, <sub>n</sub>C<sub>2</sub>) <font color='crimson'>[Per Member, per order]</font>
<br>
                Member  OrderNumber  ProductList
                111        222           [[2,3]]
                111        333           [3]
                111        555           [4]
                222        666           [[6,7], [6,8], [7,8]]
                333        777           [3]
                333        888           [4]
<br>
- <font color='crimson'>Explode and Split</font> product pairs to different columns - <font color='BlueViolet'>Basket</font>
<br>
                Member  OrderNumber      ProductOne        ProductTwo
                111        222                2               3
                111        333                3               3
                111        555                4               4
                222        666                6               7
                222        666                6               8
                222        666                7               8
                333        777                3               3
                333        888                4               4
<br>
- <font color='crimson'>Repeat</font> the procedure with Product list <font color='crimson'>[Per Member, per lifetime]</font> - <font color='BlueViolet'>Lifetime</font>
<br>
                Member  ProductList
                111      [2,3,4]
                222      [6,7,8]
                333      [3, 4]
<br>
                Member  ProductOne      ProductTwo
                111         2               3
                111         2               4
                111         3               4
                222         6               7
                222         6               8
                222         7               8
                333         3               4
<br>
- Group by and count different members <font color='crimson'>[Per ProductOne, per ProductTwo]</font> for both <font color='BlueViolet'>[Basket]</font> and <font color='BlueViolet'>[Lifetime]</font>
<br>
<font color='BlueViolet'>[Basket]</font>
<br>
             ProductOne  ProductTwo      SameBasketCount   SameBasketMember
                2            3                 1               [111]
                3            3                 2               [111, 333]
                4            4                 2               [111, 333]
                6            7                 1               [222]
                6            8                 1               [222]
                7            8                 1               [222]
<br>
<font color='BlueViolet'>[Lifetime]</font>
<br>
             ProductOne  ProductTwo      LifeTimeCount   LifeTimeMember
                2            3                 1               [111]
                2            4                 1               [111]
                3            4                 2               [111,333]
                6            7                 1               [222]
                6            8                 1               [222]
                7            8                 1               [222]
<br>
- Left join <font color='crimson'>[Lifetime]</font> to <font color='crimson'>[Basket]</font>
<br>
             ProductOne  ProductTwo      LifeTimeCount   LifeTimeMember       SameBasketCount     SameBasketMember
                2            3                 1               [111]                1                   [111]
                2            4                 1               [111]                0                   []
                3            4                 2               [111,333]            0                   []
                6            7                 1               [222]                1                   [222]
                6            8                 1               [222]                1                   [222]
                7            8                 1               [222]                1                   [222]
<br>
- Derive <font color='crimson'>DiffBasketCount</font> <font color='BlueViolet'>(LifeTimeCount - SameBasketCount)</font> - <font color='BlueViolet'>[Result]</font>
<br>
             ProductOne  ProductTwo      LifeTimeCount   LifeTimeMember       SameBasketCount     SameBasketMember       DiffBasketCount
                2            3                 1               [111]                1                   [111]                  0
                2            4                 1               [111]                0                   []                     1
                3            4                 2               [111,333]            0                   []                     2
                6            7                 1               [222]                1                   [222]                  0
                6            8                 1               [222]                1                   [222]                  0
                7            8                 1               [222]                1                   [222]                  0
<br>
- Derive <font color='crimson'>Individual Product count</font> <font color='BlueViolet'>[By Product, distinct member count]</font> - <font color='BlueViolet'>[ProductCount]</font>
<br>
             Product  ProductMemberCount
                2            1
                3            2
                4            2
                6            1
                7            1
                8            1
- Join with total Product Count, <font color='BlueViolet'>[Result]</font> inner join <font color='BlueViolet'>[ProductCount]</font>
<br>
             ProductOne  ProductTwo      LifeTimeCount   SameBasketCount     DiffBasketCount       ProductOneCount       ProductTwoCount
                2            3                 1               1                   0                      1                     2 
                2            4                 1               0                   1                      1                     2
                3            4                 2               0                   2                      2                     2
                6            7                 1               1                   0                      1                     1
                6            8                 1               1                   0                      1                     1
                7            8                 1               1                   0                      1                     1
<br>
- Derive <font color='crimson'>Substitutabilty Score</font> <font color='BlueViolet'> [DiffBasketCount]</font> / (<font color='Orchid'>[ProductOneCount]</font> + <font color='RoyalBlue'>[ProductTwoCount]</font> - <font color='Tomato'>[SameBasketCount]</font> - <font color='BlueViolet'>[DiffBasketCount])</font>
<br>
             ProductOne  ProductTwo      LifeTimeCount   SameBasketCount     DiffBasketCount       ProductOneCount       ProductTwoCount        SubstituteScore
                2            3                 1               1                   0                      1                     2                      0 
                2            4                 1               0                   1                      1                     2                      1/(1+2-1)
                3            4                 2               0                   2                      2                     2                      2/(2+2-2)
                6            7                 1               1                   0                      1                     1                      0
                6            8                 1               1                   0                      1                     1                      0                       
                7            8                 1               1                   0                      1                     1                      0
<br>
- Derive <font color='crimson'>full product list pairwise combination</font>, then <font color='crimson'>Left join</font><br/>

- <font color='crimson'>Long to wide</font> format

|num1  |431|1322|1323|110478|134740|151038|153539|168830|169508|169512|173437|180256|400008|401459|713853|
|------|---|----|----|------|------|------|------|------|------|------|------|------|------|------|------|
|123   |0  |25  |0   |0     |0     |0     |0     |0     |0     |0     |0     |33    |20    |0     |66    |
|431   |0  |0   |0   |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |
|1322  |0  |0   |0   |0     |50    |0     |50    |50    |50    |50    |0     |0     |33    |50    |33    |
|1323  |0  |0   |0   |0     |100   |0     |100   |100   |100   |100   |0     |0     |33    |100   |0     |
|110478|0  |0   |0   |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |
|134740|0  |0   |0   |0     |0     |0     |0     |100   |0     |0     |0     |0     |33    |0     |0     |
|151038|0  |0   |0   |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |
|153539|0  |0   |0   |0     |0     |0     |0     |100   |0     |0     |0     |0     |33    |0     |0     |
|168830|0  |0   |0   |0     |0     |0     |0     |0     |100   |100   |0     |0     |0     |100   |0     |
|169508|0  |0   |0   |0     |0     |0     |0     |0     |0     |0     |0     |0     |33    |0     |0     |
|169512|0  |0   |0   |0     |0     |0     |0     |0     |0     |0     |0     |0     |33    |0     |0     |
|173437|0  |0   |0   |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |
|180256|0  |0   |0   |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |
|400008|0  |0   |0   |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |33    |25    |
|401459|0  |0   |0   |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |


In [1]:
sc

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1576167157617_0004,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.
<SparkContext master=yarn appName=remotesparkmagics>

### Library


In [2]:
# from pyspark.sql.functions import udf
from pyspark.sql.types import *
from itertools import combinations
import pyspark.sql.functions as F

### Transaction

In [3]:
df_transaction = spark.createDataFrame([("Billy", "Order_100", 110478),
                                      ("Billy", "Order_100", 173437),
                                      ("Billy", "Order_100", 151038),
                                      ("Amy", "Order_200", 401459), 
                                      ("Amy", "Order_200", 169508), 
                                      ("Amy", "Order_200", 169512), 
                                      ("Amy", "Order_200", 134740), 
                                      ("Amy", "Order_200", 153539), 
                                      ("Amy", "Order_300", 168830), 
                                      ("Amy", "Order_300", 400008), 
                                      ("Amy", "Order_400", 1322), 
                                      ("Amy", "Order_400", 1323), 
                                      ("Cathy", "Order_500", 123), 
                                      ("Cathy", "Order_500", 431), 
                                      ("Danny", "Order_600", 123), 
                                      ("Danny", "Order_700", 180256),
                                      ("Danny", "Order_700", 713853),
                                      ("Ella", "Order_800", 713853),
                                      ("Ella", "Order_900", 713853),
                                       ("Ella", "Order_1000", 123),
                                       ("Ella", "Order_1100", 1322),
                                       ("Ella", "Order_1100", 400008),
                                       ("Flora", "Order_1200", 400008)],
                             ["Member", "OrderNumber", "Product"])


In [4]:
df_transaction.show(30, False)

+------+-----------+-------+
|Member|OrderNumber|Product|
+------+-----------+-------+
|Billy |Order_100  |110478 |
|Billy |Order_100  |173437 |
|Billy |Order_100  |151038 |
|Amy   |Order_200  |401459 |
|Amy   |Order_200  |169508 |
|Amy   |Order_200  |169512 |
|Amy   |Order_200  |134740 |
|Amy   |Order_200  |153539 |
|Amy   |Order_300  |168830 |
|Amy   |Order_300  |400008 |
|Amy   |Order_400  |1322   |
|Amy   |Order_400  |1323   |
|Cathy |Order_500  |123    |
|Cathy |Order_500  |431    |
|Danny |Order_600  |123    |
|Danny |Order_700  |180256 |
|Danny |Order_700  |713853 |
|Ella  |Order_800  |713853 |
|Ella  |Order_900  |713853 |
|Ella  |Order_1000 |123    |
|Ella  |Order_1100 |1322   |
|Ella  |Order_1100 |400008 |
|Flora |Order_1200 |400008 |
+------+-----------+-------+

### Per member per order

In [5]:
df_order = df_transaction.groupby(["Member", "OrderNumber"]).agg(F.collect_set("Product").alias("ProductList")).orderBy(['Member', 'OrderNumber'])
df_order.show(50, False)

+------+-----------+----------------------------------------+
|Member|OrderNumber|ProductList                             |
+------+-----------+----------------------------------------+
|Amy   |Order_200  |[169508, 134740, 169512, 153539, 401459]|
|Amy   |Order_300  |[168830, 400008]                        |
|Amy   |Order_400  |[1322, 1323]                            |
|Billy |Order_100  |[173437, 151038, 110478]                |
|Cathy |Order_500  |[431, 123]                              |
|Danny |Order_600  |[123]                                   |
|Danny |Order_700  |[713853, 180256]                        |
|Ella  |Order_1000 |[123]                                   |
|Ella  |Order_1100 |[1322, 400008]                          |
|Ella  |Order_800  |[713853]                                |
|Ella  |Order_900  |[713853]                                |
|Flora |Order_1200 |[400008]                                |
+------+-----------+----------------------------------------+

### UDF for pairwise split

In [6]:
seventh_schema = ArrayType(StructType([
    StructField('num1', IntegerType(), nullable=True),
    StructField('num2', IntegerType(), nullable=True)])                
    , True)


def seventh(arr):
    
    if len(arr) == 1:
        return([[arr[0], arr[0]]])
    
    arr.sort()
    res = list(combinations(arr, 2))
    return res
    # return [[arr[0], arr[0]], [arr[0], arr[0]]]

seventh_udf = F.udf(seventh, seventh_schema)

In [7]:
df_order_pairwise = df_order.select("Member", "OrderNumber", "ProductList", seventh_udf("ProductList").alias('PairwiseProduct'))
df_order_pairwise.show(20, False)

+------+-----------+----------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Member|OrderNumber|ProductList                             |PairwiseProduct                                                                                                                                                                     |
+------+-----------+----------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Amy   |Order_200  |[169508, 134740, 169512, 153539, 401459]|[[134740, 153539], [134740, 169508], [134740, 169512], [134740, 401459], [153539, 169508], [153539, 169512], [153539, 401459], [169508, 169512], [169508, 401459], [169512, 401459]]|
|Amy   |Order_300  |[168830,

In [8]:
df_order_pair = df_order_pairwise.withColumn('Pairs', F.explode('PairwiseProduct'))
df_order_pair.select('Member', 'OrderNumber', 'Pairs').show(100, False)

+------+-----------+----------------+
|Member|OrderNumber|Pairs           |
+------+-----------+----------------+
|Amy   |Order_200  |[134740, 153539]|
|Amy   |Order_200  |[134740, 169508]|
|Amy   |Order_200  |[134740, 169512]|
|Amy   |Order_200  |[134740, 401459]|
|Amy   |Order_200  |[153539, 169508]|
|Amy   |Order_200  |[153539, 169512]|
|Amy   |Order_200  |[153539, 401459]|
|Amy   |Order_200  |[169508, 169512]|
|Amy   |Order_200  |[169508, 401459]|
|Amy   |Order_200  |[169512, 401459]|
|Amy   |Order_300  |[168830, 400008]|
|Amy   |Order_400  |[1322, 1323]    |
|Billy |Order_100  |[110478, 151038]|
|Billy |Order_100  |[110478, 173437]|
|Billy |Order_100  |[151038, 173437]|
|Cathy |Order_500  |[123, 431]      |
|Danny |Order_600  |[123, 123]      |
|Danny |Order_700  |[180256, 713853]|
|Ella  |Order_1000 |[123, 123]      |
|Ella  |Order_1100 |[1322, 400008]  |
|Ella  |Order_800  |[713853, 713853]|
|Ella  |Order_900  |[713853, 713853]|
|Flora |Order_1200 |[400008, 400008]|
+------+----

In [9]:
df_order_res = df_order_pair.select('Member', 'OrderNumber', 'Pairs.*').orderBy(['Member', 'OrderNumber', 'num1', 'num2'])
df_order_res.show(100, False)

+------+-----------+------+------+
|Member|OrderNumber|num1  |num2  |
+------+-----------+------+------+
|Amy   |Order_200  |134740|153539|
|Amy   |Order_200  |134740|169508|
|Amy   |Order_200  |134740|169512|
|Amy   |Order_200  |134740|401459|
|Amy   |Order_200  |153539|169508|
|Amy   |Order_200  |153539|169512|
|Amy   |Order_200  |153539|401459|
|Amy   |Order_200  |169508|169512|
|Amy   |Order_200  |169508|401459|
|Amy   |Order_200  |169512|401459|
|Amy   |Order_300  |168830|400008|
|Amy   |Order_400  |1322  |1323  |
|Billy |Order_100  |110478|151038|
|Billy |Order_100  |110478|173437|
|Billy |Order_100  |151038|173437|
|Cathy |Order_500  |123   |431   |
|Danny |Order_600  |123   |123   |
|Danny |Order_700  |180256|713853|
|Ella  |Order_1000 |123   |123   |
|Ella  |Order_1100 |1322  |400008|
|Ella  |Order_800  |713853|713853|
|Ella  |Order_900  |713853|713853|
|Flora |Order_1200 |400008|400008|
+------+-----------+------+------+

### Per member lifetime

In [10]:
df_lifetime = df_transaction.groupby(["Member"]).agg(F.collect_set("Product").alias("ProductList")).orderBy(['Member'])

In [11]:
df_lifetime.show(10, False)

+------+--------------------------------------------------------------------+
|Member|ProductList                                                         |
+------+--------------------------------------------------------------------+
|Amy   |[169508, 1322, 134740, 169512, 1323, 153539, 168830, 400008, 401459]|
|Billy |[173437, 151038, 110478]                                            |
|Cathy |[431, 123]                                                          |
|Danny |[713853, 180256, 123]                                               |
|Ella  |[713853, 1322, 400008, 123]                                         |
|Flora |[400008]                                                            |
+------+--------------------------------------------------------------------+

In [12]:
df_lifetime_pairwise = df_lifetime.select("Member", "ProductList", seventh_udf("ProductList").alias('PairwiseProduct'))

In [13]:
df_lifetime_pairwise.show(20, False)

+------+--------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Member|ProductList                                                         |PairwiseProduct                                                                                                                                                                                                                     

In [14]:
df_lifetime_pair = df_lifetime_pairwise.withColumn('Pairs', F.explode('PairwiseProduct'))
df_lifetime_pair.select('Member', 'Pairs').show(100, False)

+------+----------------+
|Member|Pairs           |
+------+----------------+
|Amy   |[1322, 1323]    |
|Amy   |[1322, 134740]  |
|Amy   |[1322, 153539]  |
|Amy   |[1322, 168830]  |
|Amy   |[1322, 169508]  |
|Amy   |[1322, 169512]  |
|Amy   |[1322, 400008]  |
|Amy   |[1322, 401459]  |
|Amy   |[1323, 134740]  |
|Amy   |[1323, 153539]  |
|Amy   |[1323, 168830]  |
|Amy   |[1323, 169508]  |
|Amy   |[1323, 169512]  |
|Amy   |[1323, 400008]  |
|Amy   |[1323, 401459]  |
|Amy   |[134740, 153539]|
|Amy   |[134740, 168830]|
|Amy   |[134740, 169508]|
|Amy   |[134740, 169512]|
|Amy   |[134740, 400008]|
|Amy   |[134740, 401459]|
|Amy   |[153539, 168830]|
|Amy   |[153539, 169508]|
|Amy   |[153539, 169512]|
|Amy   |[153539, 400008]|
|Amy   |[153539, 401459]|
|Amy   |[168830, 169508]|
|Amy   |[168830, 169512]|
|Amy   |[168830, 400008]|
|Amy   |[168830, 401459]|
|Amy   |[169508, 169512]|
|Amy   |[169508, 400008]|
|Amy   |[169508, 401459]|
|Amy   |[169512, 400008]|
|Amy   |[169512, 401459]|
|Amy   |[400

In [15]:
df_lifetime_res = df_lifetime_pair.select('Member', 'Pairs.*').orderBy(['Member', 'num1', 'num2'])
df_lifetime_res.show(1000, False)

+------+------+------+
|Member|num1  |num2  |
+------+------+------+
|Amy   |1322  |1323  |
|Amy   |1322  |134740|
|Amy   |1322  |153539|
|Amy   |1322  |168830|
|Amy   |1322  |169508|
|Amy   |1322  |169512|
|Amy   |1322  |400008|
|Amy   |1322  |401459|
|Amy   |1323  |134740|
|Amy   |1323  |153539|
|Amy   |1323  |168830|
|Amy   |1323  |169508|
|Amy   |1323  |169512|
|Amy   |1323  |400008|
|Amy   |1323  |401459|
|Amy   |134740|153539|
|Amy   |134740|168830|
|Amy   |134740|169508|
|Amy   |134740|169512|
|Amy   |134740|400008|
|Amy   |134740|401459|
|Amy   |153539|168830|
|Amy   |153539|169508|
|Amy   |153539|169512|
|Amy   |153539|400008|
|Amy   |153539|401459|
|Amy   |168830|169508|
|Amy   |168830|169512|
|Amy   |168830|400008|
|Amy   |168830|401459|
|Amy   |169508|169512|
|Amy   |169508|400008|
|Amy   |169508|401459|
|Amy   |169512|400008|
|Amy   |169512|401459|
|Amy   |400008|401459|
|Billy |110478|151038|
|Billy |110478|173437|
|Billy |151038|173437|
|Cathy |123   |431   |
|Danny |123

In [16]:
left_table = df_lifetime_res.groupBy("num1", "num2").agg(F.countDistinct("Member").alias('LifeTimeCount'), F.collect_set('Member').alias('LifeTimeMember'))

In [17]:
left_table.show(1000, False)

+------+------+-------------+--------------+
|num1  |num2  |LifeTimeCount|LifeTimeMember|
+------+------+-------------+--------------+
|1322  |168830|1            |[Amy]         |
|110478|173437|1            |[Billy]       |
|1323  |401459|1            |[Amy]         |
|134740|169512|1            |[Amy]         |
|1323  |134740|1            |[Amy]         |
|134740|168830|1            |[Amy]         |
|169508|169512|1            |[Amy]         |
|400008|400008|1            |[Flora]       |
|169508|400008|1            |[Amy]         |
|110478|151038|1            |[Billy]       |
|169508|401459|1            |[Amy]         |
|151038|173437|1            |[Billy]       |
|169512|400008|1            |[Amy]         |
|134740|169508|1            |[Amy]         |
|400008|713853|1            |[Ella]        |
|1323  |400008|1            |[Amy]         |
|123   |1322  |1            |[Ella]        |
|1322  |153539|1            |[Amy]         |
|1322  |400008|2            |[Amy, Ella]   |
|1323  |16

In [18]:
right_table = df_order_res.groupBy("num1", "num2").agg(F.countDistinct("Member").alias('SameBasketCount'), F.collect_set('Member').alias('SameBasketMember'))

In [19]:
right_table.show(1000, False)

+------+------+---------------+----------------+
|num1  |num2  |SameBasketCount|SameBasketMember|
+------+------+---------------+----------------+
|110478|173437|1              |[Billy]         |
|134740|169512|1              |[Amy]           |
|169508|169512|1              |[Amy]           |
|400008|400008|1              |[Flora]         |
|110478|151038|1              |[Billy]         |
|713853|713853|1              |[Ella]          |
|169508|401459|1              |[Amy]           |
|151038|173437|1              |[Billy]         |
|123   |123   |2              |[Danny, Ella]   |
|134740|169508|1              |[Amy]           |
|1322  |400008|1              |[Ella]          |
|169512|401459|1              |[Amy]           |
|168830|400008|1              |[Amy]           |
|180256|713853|1              |[Danny]         |
|134740|401459|1              |[Amy]           |
|153539|169512|1              |[Amy]           |
|123   |431   |1              |[Cathy]         |
|153539|169508|1    

In [20]:
df_basket = left_table.join(right_table, ['num1','num2'], how='left').fillna(0)

In [21]:
df_basket.show(1000, False)

+------+------+-------------+--------------+---------------+----------------+
|num1  |num2  |LifeTimeCount|LifeTimeMember|SameBasketCount|SameBasketMember|
+------+------+-------------+--------------+---------------+----------------+
|1322  |168830|1            |[Amy]         |0              |null            |
|110478|173437|1            |[Billy]       |1              |[Billy]         |
|1323  |401459|1            |[Amy]         |0              |null            |
|134740|169512|1            |[Amy]         |1              |[Amy]           |
|1323  |134740|1            |[Amy]         |0              |null            |
|134740|168830|1            |[Amy]         |0              |null            |
|169508|169512|1            |[Amy]         |1              |[Amy]           |
|400008|400008|1            |[Flora]       |1              |[Flora]         |
|169508|400008|1            |[Amy]         |0              |null            |
|110478|151038|1            |[Billy]       |1              |[Bil

In [22]:
df_res = df_basket.withColumn('DiffBasketCount', df_basket["LifeTimeCount"] - df_basket["SameBasketCount"])

In [23]:
df_res.orderBy('num1').show(10, False)

+----+------+-------------+--------------+---------------+----------------+---------------+
|num1|num2  |LifeTimeCount|LifeTimeMember|SameBasketCount|SameBasketMember|DiffBasketCount|
+----+------+-------------+--------------+---------------+----------------+---------------+
|123 |180256|1            |[Danny]       |0              |null            |1              |
|123 |1322  |1            |[Ella]        |0              |null            |1              |
|123 |713853|2            |[Danny, Ella] |0              |null            |2              |
|123 |400008|1            |[Ella]        |0              |null            |1              |
|123 |431   |1            |[Cathy]       |1              |[Cathy]         |0              |
|1322|168830|1            |[Amy]         |0              |null            |1              |
|1322|153539|1            |[Amy]         |0              |null            |1              |
|1322|169508|1            |[Amy]         |0              |null            |1    

In [24]:
df_final = df_res.select("num1", "num2", "SameBasketCount", "DiffBasketCount", "LifeTimeCount").orderBy('num1', 'num2')

In [25]:
df_final.show(200, False)

+------+------+---------------+---------------+-------------+
|num1  |num2  |SameBasketCount|DiffBasketCount|LifeTimeCount|
+------+------+---------------+---------------+-------------+
|123   |431   |1              |0              |1            |
|123   |1322  |0              |1              |1            |
|123   |180256|0              |1              |1            |
|123   |400008|0              |1              |1            |
|123   |713853|0              |2              |2            |
|1322  |1323  |1              |0              |1            |
|1322  |134740|0              |1              |1            |
|1322  |153539|0              |1              |1            |
|1322  |168830|0              |1              |1            |
|1322  |169508|0              |1              |1            |
|1322  |169512|0              |1              |1            |
|1322  |400008|1              |1              |2            |
|1322  |401459|0              |1              |1            |
|1322  |

### Total individual product count

In [26]:
df_item = df_transaction.groupBy("Product").agg(F.countDistinct("Member").alias('ProductMemberCount')).orderBy('Product')

In [27]:
df_item.show()

+-------+------------------+
|Product|ProductMemberCount|
+-------+------------------+
|    123|                 3|
|    431|                 1|
|   1322|                 2|
|   1323|                 1|
| 110478|                 1|
| 134740|                 1|
| 151038|                 1|
| 153539|                 1|
| 168830|                 1|
| 169508|                 1|
| 169512|                 1|
| 173437|                 1|
| 180256|                 1|
| 400008|                 3|
| 401459|                 1|
| 713853|                 2|
+-------+------------------+

### Create two dataframe for joining (product1, product2 Total Count)

In [28]:
df_num1 = df_item.selectExpr("Product as num1", "ProductMemberCount as NumOneCount")
df_num1.show(5)

+------+-----------+
|  num1|NumOneCount|
+------+-----------+
|   123|          3|
|   431|          1|
|  1322|          2|
|  1323|          1|
|110478|          1|
+------+-----------+
only showing top 5 rows

In [29]:
df_num2 = df_item.selectExpr("Product as num2", "ProductMemberCount as NumTwoCount")
df_num2.show(5)

+------+-----------+
|  num2|NumTwoCount|
+------+-----------+
|   123|          3|
|   431|          1|
|  1322|          2|
|  1323|          1|
|110478|          1|
+------+-----------+
only showing top 5 rows

### Join with Total count

In [30]:
result = df_final.join(df_num1, ['num1'], how='left').join(df_num2, ['num2'], how='left')

In [31]:
result = result.withColumn('SubstituteScore', (F.col("DiffBasketCount") * 100 / (F.col("NumOneCount") + F.col("NumTwoCount") - F.col("SameBasketCount") - F.col("DiffBasketCount"))).cast("integer"))

### Calculate Substitutability Score

In [32]:
result.orderBy("num1", "num2").show(1000, False)

+------+------+---------------+---------------+-------------+-----------+-----------+---------------+
|num2  |num1  |SameBasketCount|DiffBasketCount|LifeTimeCount|NumOneCount|NumTwoCount|SubstituteScore|
+------+------+---------------+---------------+-------------+-----------+-----------+---------------+
|431   |123   |1              |0              |1            |3          |1          |0              |
|1322  |123   |0              |1              |1            |3          |2          |25             |
|180256|123   |0              |1              |1            |3          |1          |33             |
|400008|123   |0              |1              |1            |3          |3          |20             |
|713853|123   |0              |2              |2            |3          |2          |66             |
|1323  |1322  |1              |0              |1            |2          |1          |0              |
|134740|1322  |0              |1              |1            |2          |1        

### Long to wide

In [35]:
full_product_list = df_transaction.select('Product').distinct().rdd.flatMap(lambda x: x).collect()
full_product_list.sort()

In [36]:
full_product_comb = seventh(full_product_list)

In [37]:
df_full_product = sqlContext.createDataFrame(full_product_comb, ['num1', 'num2'])
df_full_product.show()

+----+------+
|num1|  num2|
+----+------+
| 123|   431|
| 123|  1322|
| 123|  1323|
| 123|110478|
| 123|134740|
| 123|151038|
| 123|153539|
| 123|168830|
| 123|169508|
| 123|169512|
| 123|173437|
| 123|180256|
| 123|400008|
| 123|401459|
| 123|713853|
| 431|  1322|
| 431|  1323|
| 431|110478|
| 431|134740|
| 431|151038|
+----+------+
only showing top 20 rows

In [38]:
result.show(5)

+------+------+---------------+---------------+-------------+-----------+-----------+---------------+
|  num2|  num1|SameBasketCount|DiffBasketCount|LifeTimeCount|NumOneCount|NumTwoCount|SubstituteScore|
+------+------+---------------+---------------+-------------+-----------+-----------+---------------+
|  1323|  1322|              1|              0|            1|          2|          1|              0|
|400008|169508|              0|              1|            1|          1|          3|             33|
|400008|134740|              0|              1|            1|          1|          3|             33|
|400008|168830|              1|              0|            1|          1|          3|              0|
|400008|  1323|              0|              1|            1|          1|          3|             33|
+------+------+---------------+---------------+-------------+-----------+-----------+---------------+
only showing top 5 rows

### Left join from Full list (Left: Full, Right: Previous result)

In [39]:
res = df_full_product.join(result, ["num1", "num2"], how = 'left').orderBy("num1", "num2")

In [40]:
res.show(5)

+----+------+---------------+---------------+-------------+-----------+-----------+---------------+
|num1|  num2|SameBasketCount|DiffBasketCount|LifeTimeCount|NumOneCount|NumTwoCount|SubstituteScore|
+----+------+---------------+---------------+-------------+-----------+-----------+---------------+
| 123|   431|              1|              0|            1|          3|          1|              0|
| 123|  1322|              0|              1|            1|          3|          2|             25|
| 123|  1323|           null|           null|         null|       null|       null|           null|
| 123|110478|           null|           null|         null|       null|       null|           null|
| 123|134740|           null|           null|         null|       null|       null|           null|
+----+------+---------------+---------------+-------------+-----------+-----------+---------------+
only showing top 5 rows

In [41]:
result_wide = res.groupBy('num1').pivot('num2').agg(F.sum(F.col("SubstituteScore"))).fillna(0).orderBy('num1')

In [42]:
result_wide.show(1000, False)

+------+---+----+----+------+------+------+------+------+------+------+------+------+------+------+------+
|num1  |431|1322|1323|110478|134740|151038|153539|168830|169508|169512|173437|180256|400008|401459|713853|
+------+---+----+----+------+------+------+------+------+------+------+------+------+------+------+------+
|123   |0  |25  |0   |0     |0     |0     |0     |0     |0     |0     |0     |33    |20    |0     |66    |
|431   |0  |0   |0   |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |
|1322  |0  |0   |0   |0     |50    |0     |50    |50    |50    |50    |0     |0     |33    |50    |33    |
|1323  |0  |0   |0   |0     |100   |0     |100   |100   |100   |100   |0     |0     |33    |100   |0     |
|110478|0  |0   |0   |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |0     |
|134740|0  |0   |0   |0     |0     |0     |0     |100   |0     |0     |0     |0     |33    |0     |0     |
|151038|0  |0   |0   |0     |0     |0

In [43]:
print((result_wide.count(), len(result_wide.columns)))

(15, 16)