In [1]:
# Import findspark to read SPARK_HOME and HADOOP_HOME
import findspark
findspark.init()

In [2]:
from pyspark.sql.functions import collect_list, col

In [3]:
# Import required library
from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Association Rules") \
    .getOrCreate()

In [4]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x000001B9A2606EF0>


## Load data

In [5]:
# Hilangkan data yang nilainya kosong dengan .na.drop()
df_orders = spark.read.csv("olist/olist_orders_dataset.csv", header=True, inferSchema=True)
df_order_items = spark.read.csv("olist/olist_order_items_dataset.csv", header=True, inferSchema=True)
df_products = spark.read.csv("olist/olist_products_dataset.csv", header=True, inferSchema=True)

In [6]:
df_orders.count()

99441

In [7]:
df_order_items.count()

112650

In [8]:
df_products.count()

32951

In [9]:
df_orders.createOrReplaceTempView("orders")
df_order_items.createOrReplaceTempView("order_items")
df_products.createOrReplaceTempView("products")

In [10]:
df_orders.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27:45|          2018-08-13 00:00:00|
|47770eb9100c2d0c4...|41ce2a54c0b03bf34...|  

In [11]:
df_order_items.show()

+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date| price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+------+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30| 199.0|        17.87|
|00024acbcdf0a6daa...|            1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18| 12.99|        12.79|
|00042b26cf59d7ce6...|            1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51| 199.9|        18.14|
|00048cc3ae777c65d...|            1|ef92

In [12]:
df_products.show()

+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|          product_id|product_category_name|product_name_lenght|product_description_lenght|product_photos_qty|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+--------------------+---------------------+-------------------+--------------------------+------------------+----------------+-----------------+-----------------+----------------+
|1e9e8ef04dbcff454...|           perfumaria|                 40|                       287|                 1|             225|               16|               10|              14|
|3aa071139cb16b67c...|                artes|                 44|                       276|                 1|            1000|               30|               18|              20|
|96bd76ec8810374ed...|        esporte_lazer|                 46|                       250|    

In [13]:
# # Using Row Number
# https://tableplus.io/blog/2018/09/how-to-use-row-number-function-in-sql-server.html
# # Using Row Number with Distinct
# https://stackoverflow.com/questions/20644382/distinct-and-row-number-do-not-work-together

# Simplify order_id value
df_orders_distinct = spark.sql("SELECT order_id, ROW_NUMBER() OVER(ORDER BY order_id) as ID_order\
                                FROM (SELECT DISTINCT order_id\
                                        FROM orders)")

In [14]:
df_orders_distinct.show()

+--------------------+--------+
|            order_id|ID_order|
+--------------------+--------+
|00010242fe8c5a6d1...|       1|
|00018f77f2f0320c5...|       2|
|000229ec398224ef6...|       3|
|00024acbcdf0a6daa...|       4|
|00042b26cf59d7ce6...|       5|
|00048cc3ae777c65d...|       6|
|00054e8431b9d7675...|       7|
|000576fe39319847c...|       8|
|0005a1a1728c9d785...|       9|
|0005f50442cb953dc...|      10|
|00061f2a7bc09da83...|      11|
|00063b381e2406b52...|      12|
|0006ec9db01a64e59...|      13|
|0008288aa423d2a3f...|      14|
|0009792311464db53...|      15|
|0009c9a17f916a706...|      16|
|000aed2e25dbad2f9...|      17|
|000c3e6612759851c...|      18|
|000e562887b1f2006...|      19|
|000e63d38ae8c00bb...|      20|
+--------------------+--------+
only showing top 20 rows



In [15]:
# Repeat previous step to other table
# Simplify product_id value
df_order_items_distinct = spark.sql("SELECT product_id, ROW_NUMBER() OVER(ORDER BY product_id) as ID_product\
                                FROM (SELECT DISTINCT product_id\
                                        FROM order_items)")

In [16]:
df_order_items_distinct.show()

+--------------------+----------+
|          product_id|ID_product|
+--------------------+----------+
|00066f42aeeb9f300...|         1|
|00088930e925c41fd...|         2|
|0009406fd7479715e...|         3|
|000b8f95fcb9e0096...|         4|
|000d9be29b5207b54...|         5|
|0011c512eb256aa0d...|         6|
|00126f27c81360368...|         7|
|001795ec6f1b187d3...|         8|
|001b237c0e9bb435f...|         9|
|001b72dfd63e9833e...|        10|
|001c5d71ac6ad696d...|        11|
|00210e41887c2a8ef...|        12|
|002159fe700ed3521...|        13|
|0021a87d4997a48b6...|        14|
|00250175f79f584c1...|        15|
|002552c0663708129...|        16|
|002959d7a0b0990fe...|        17|
|002af88741ba70c7b...|        18|
|002c6dab60557c48c...|        19|
|002d4ea7c04739c13...|        20|
+--------------------+----------+
only showing top 20 rows



In [17]:
# # Join simplified order_id with other table
# https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dataframe%20join#pyspark.sql.DataFrame.join
# # Prodcut will be simplified too
df_join_12 = df_order_items\
                    .join(df_orders_distinct, df_orders_distinct.order_id == df_order_items.order_id, 'inner')\
                    .select(df_order_items.order_id, df_orders_distinct.ID_order, df_order_items.product_id)

In [18]:
df_join_12.show()

+--------------------+--------+--------------------+
|            order_id|ID_order|          product_id|
+--------------------+--------+--------------------+
|00010242fe8c5a6d1...|       1|4244733e06e7ecb49...|
|00018f77f2f0320c5...|       2|e5f2d52b802189ee6...|
|000229ec398224ef6...|       3|c777355d18b72b67a...|
|00024acbcdf0a6daa...|       4|7634da152a4610f15...|
|00042b26cf59d7ce6...|       5|ac6c3623068f30de0...|
|00048cc3ae777c65d...|       6|ef92defde845ab845...|
|00054e8431b9d7675...|       7|8d4f2bb7e93e6710a...|
|000576fe39319847c...|       8|557d850972a7d6f79...|
|0005a1a1728c9d785...|       9|310ae3c140ff94b03...|
|0005f50442cb953dc...|      10|4535b0e1091c278df...|
|00061f2a7bc09da83...|      11|d63c1011f49d98b97...|
|00063b381e2406b52...|      12|f177554ea93259a5b...|
|0006ec9db01a64e59...|      13|99a4788cb24856965...|
|0008288aa423d2a3f...|      14|368c6c730842d7801...|
|0008288aa423d2a3f...|      14|368c6c730842d7801...|
|0009792311464db53...|      15|8cab8abac591587

In [19]:
# Join simple order_id value with product_id value
df_join_23 = df_join_12\
                .join(df_order_items_distinct, df_join_12.product_id == df_order_items_distinct.product_id, 'inner')

In [20]:
df_join_23.createOrReplaceTempView("joined")

In [21]:
# Select only simple value id
df_join = spark.sql("SELECT distinct ID_order, id_product from joined order by id_order asc")
df_join.show()
df_join.count()

+--------+----------+
|ID_order|id_product|
+--------+----------+
|       1|      8629|
|       2|     29598|
|       3|     25668|
|       4|     15323|
|       5|     22080|
|       6|     30848|
|       7|     18182|
|       8|     11123|
|       9|      6385|
|      10|      9013|
|      11|     27628|
|      12|     31102|
|      13|     19743|
|      14|      7080|
|      15|     18083|
|      16|      8238|
|      17|     10354|
|      18|     23217|
|      19|     12287|
|      20|     11081|
+--------+----------+
only showing top 20 rows



102425

## RDD based FP-Growth
https://spark.apache.org/docs/2.3.0/mllib-frequent-pattern-mining.html

In [22]:
# https://spark.apache.org/docs/2.3.0/api/python/pyspark.mllib.html#pyspark.mllib.fpm.FPGrowth
# Import RDD FP-Growth library
from pyspark.mllib.fpm import FPGrowth

In [23]:
# # Prepare rdd data for input
# https://community.hortonworks.com/questions/96309/prepare-transactional-data-set-for-fpgrowth-in-pys.html
transactions = df_join.groupBy("ID_order")\
                .agg(collect_list("ID_product").alias("pid"))\
                .rdd\
                .map(lambda x: x.pid)

In [24]:
transactions.collect()

[[8629],
 [29598],
 [25668],
 [15323],
 [22080],
 [30848],
 [18182],
 [11123],
 [6385],
 [9013],
 [27628],
 [31102],
 [19743],
 [7080],
 [18083],
 [8238],
 [10354],
 [23217],
 [12287],
 [11081],
 [11395],
 [3716],
 [12118],
 [11708],
 [4491],
 [25143],
 [3719],
 [17663],
 [7375],
 [13335],
 [5369],
 [30035],
 [4664],
 [10514],
 [22725],
 [8410],
 [32726],
 [25577],
 [5369],
 [1415],
 [28317],
 [28328],
 [17176],
 [29660],
 [6347],
 [15483],
 [17557],
 [24377],
 [21188],
 [29064],
 [29697],
 [2629],
 [10176],
 [3247],
 [1585],
 [6942],
 [32686],
 [8734],
 [17659],
 [12989],
 [14729],
 [28775],
 [677],
 [28066],
 [15312],
 [237],
 [25051],
 [26307],
 [21385],
 [5995],
 [19504],
 [27509],
 [20428],
 [27347, 17533],
 [22321],
 [24339],
 [25874],
 [11035],
 [1041],
 [5742],
 [17486],
 [29079],
 [4187, 2572],
 [17117],
 [1994],
 [17783],
 [30384],
 [14708],
 [9882],
 [7566],
 [7413],
 [19796],
 [12748],
 [21140],
 [9010],
 [32286],
 [28301],
 [8491],
 [8302],
 [10400],
 [13102],
 [9845],
 [1

In [25]:
# # If there's uniqueness problem troubleshoot with link below
# https://stackoverflow.com/questions/37249291/fp-growth-items-in-a-transaction-must-be-unique
# # and use unique instead of normal transaction
# unique = transactions.map(lambda x: list(set(x))).cache()

In [26]:
# 1st default model according to documentation. Minimal Support : 0.3
model1 = FPGrowth.train(transactions, minSupport=0.3, numPartitions=10)
result1 = model1.freqItemsets().collect()
for fi in result1:
    print(fi)

In [27]:
# 2nd Model Minimal Support : 0.001
model2 = FPGrowth.train(transactions, minSupport=0.001, numPartitions=10)
result2 = model2.freqItemsets().collect()
for fi in result2:
    print(fi)

FreqItemset(items=[15682], freq=100)
FreqItemset(items=[3414], freq=155)
FreqItemset(items=[30403], freq=116)
FreqItemset(items=[8052], freq=255)
FreqItemset(items=[4275], freq=134)
FreqItemset(items=[19743], freq=467)
FreqItemset(items=[10751], freq=114)
FreqItemset(items=[16087], freq=225)
FreqItemset(items=[28967], freq=131)
FreqItemset(items=[22113], freq=431)
FreqItemset(items=[14069], freq=151)
FreqItemset(items=[1518], freq=100)
FreqItemset(items=[889], freq=130)
FreqItemset(items=[8614], freq=352)
FreqItemset(items=[31145], freq=150)
FreqItemset(items=[11599], freq=100)
FreqItemset(items=[28971], freq=194)
FreqItemset(items=[17663], freq=114)
FreqItemset(items=[9122], freq=144)
FreqItemset(items=[11747], freq=194)
FreqItemset(items=[11029], freq=114)
FreqItemset(items=[27040], freq=323)
FreqItemset(items=[18045], freq=129)
FreqItemset(items=[7132], freq=111)
FreqItemset(items=[24087], freq=187)
FreqItemset(items=[7365], freq=311)
FreqItemset(items=[27129], freq=122)
FreqItemset

In [28]:
# 3rd Model Minimal Support :0.01
model3 = FPGrowth.train(transactions, minSupport=0.01, numPartitions=10)
result3 = model3.freqItemsets().collect()
for fi in result3:
    print(fi)

## Dataframe based FP-Growth
https://spark.apache.org/docs/2.3.0/ml-frequent-pattern-mining.html

In [29]:
# https://spark.apache.org/docs/2.3.0/ml-frequent-pattern-mining.html
# Import FP Growth
from pyspark.ml.fpm import FPGrowth

In [30]:
# Prepare dataframe for input
transactions = df_join_23.dropDuplicates()\
                .groupBy("ID_order")\
                .agg(collect_list("ID_product").alias("pid"))

In [31]:
transactions.show()

+--------+-------+
|ID_order|    pid|
+--------+-------+
|     148|[14727]|
|     463|[18007]|
|     471|[26519]|
|     496|[32080]|
|     833| [4354]|
|    1088|[30823]|
|    1238|[11747]|
|    1342|[28512]|
|    1580| [9269]|
|    1591|[21193]|
|    1645|[31840]|
|    1829| [4046]|
|    1959|[21827]|
|    2122|[10029]|
|    2142|[25039]|
|    2366|[11209]|
|    2659| [1065]|
|    2866| [7003]|
|    3175|[20913]|
|    3749| [6847]|
+--------+-------+
only showing top 20 rows



### 1st Model Minimum Supoort : 0.005, Minimum Confidence 0.001

In [32]:
fpGrowth1 = FPGrowth(itemsCol="pid", minSupport=0.005, minConfidence=0.001)

In [33]:
model1 = fpGrowth1.fit(transactions)

In [34]:
model1.freqItemsets.show()

+-----+----+
|items|freq|
+-----+----+
+-----+----+



In [35]:
model1.associationRules.show()

+----------+----------+----------+
|antecedent|consequent|confidence|
+----------+----------+----------+
+----------+----------+----------+



In [36]:
model1.transform(transactions).show()

+--------+-------+----------+
|ID_order|    pid|prediction|
+--------+-------+----------+
|     148|[14727]|        []|
|     463|[18007]|        []|
|     471|[26519]|        []|
|     496|[32080]|        []|
|     833| [4354]|        []|
|    1088|[30823]|        []|
|    1238|[11747]|        []|
|    1342|[28512]|        []|
|    1580| [9269]|        []|
|    1591|[21193]|        []|
|    1645|[31840]|        []|
|    1829| [4046]|        []|
|    1959|[21827]|        []|
|    2122|[10029]|        []|
|    2142|[25039]|        []|
|    2366|[11209]|        []|
|    2659| [1065]|        []|
|    2866| [7003]|        []|
|    3175|[20913]|        []|
|    3749| [6847]|        []|
+--------+-------+----------+
only showing top 20 rows



### 2nd Model Minimum Supoort : 0.001, Minimum Confidence 0.001

In [37]:
fpGrowth2 = FPGrowth(itemsCol="pid", minSupport=0.001, minConfidence=0.001)

In [38]:
model2 = fpGrowth2.fit(transactions)

In [39]:
model2.freqItemsets.show()

+-------+----+
|  items|freq|
+-------+----+
|[19743]| 467|
|[22113]| 431|
| [8614]| 352|
|[27040]| 323|
| [7365]| 311|
|[10868]| 306|
| [7080]| 291|
|[10841]| 287|
| [2795]| 269|
| [5693]| 259|
| [8052]| 255|
|[16087]| 225|
|[11747]| 194|
|[28971]| 194|
|[24087]| 187|
|[21303]| 172|
| [8685]| 160|
|[21672]| 158|
|[29514]| 156|
| [6980]| 156|
+-------+----+
only showing top 20 rows



In [40]:
model2.freqItemsets.count()

53

In [41]:
model2.associationRules.show()

+----------+----------+----------+
|antecedent|consequent|confidence|
+----------+----------+----------+
+----------+----------+----------+



In [42]:
model1.transform(transactions).show()

+--------+-------+----------+
|ID_order|    pid|prediction|
+--------+-------+----------+
|     148|[14727]|        []|
|     463|[18007]|        []|
|     471|[26519]|        []|
|     496|[32080]|        []|
|     833| [4354]|        []|
|    1088|[30823]|        []|
|    1238|[11747]|        []|
|    1342|[28512]|        []|
|    1580| [9269]|        []|
|    1591|[21193]|        []|
|    1645|[31840]|        []|
|    1829| [4046]|        []|
|    1959|[21827]|        []|
|    2122|[10029]|        []|
|    2142|[25039]|        []|
|    2366|[11209]|        []|
|    2659| [1065]|        []|
|    2866| [7003]|        []|
|    3175|[20913]|        []|
|    3749| [6847]|        []|
+--------+-------+----------+
only showing top 20 rows



### 3rd Model Minimum Support : 0.0001, Minimum Confidence 0.001

In [43]:
fpGrowth3 = FPGrowth(itemsCol="pid", minSupport=0.0001, minConfidence=0.001)

In [44]:
model3 = fpGrowth3.fit(transactions)

In [45]:
model3.freqItemsets.show()

+-------+----+
|  items|freq|
+-------+----+
| [3099]|  42|
|[17906]|  26|
| [1323]|  20|
|[24455]|  16|
|[29130]|  14|
| [8683]|  12|
| [3328]|  11|
| [2531]|  10|
|[19743]| 467|
|[20058]|  16|
|[22930]|  14|
|[29287]|  12|
| [8530]|  11|
|[25331]|  10|
|[22113]| 431|
| [7904]|  42|
| [1511]|  26|
|[23938]|  20|
|[27530]|  11|
| [8614]| 352|
+-------+----+
only showing top 20 rows



In [46]:
model3.associationRules.show()

+----------+----------+-------------------+
|antecedent|consequent|         confidence|
+----------+----------+-------------------+
|    [7365]|    [8614]|0.03536977491961415|
|   [10377]|   [31536]|0.19101123595505617|
|    [6980]|   [19743]| 0.1858974358974359|
|   [19743]|    [6980]|0.06209850107066381|
|    [8614]|    [7365]|            0.03125|
|    [8228]|    [7132]|0.14814814814814814|
|   [29514]|    [7132]|0.21794871794871795|
|   [31536]|   [10377]|0.30357142857142855|
|    [7132]|   [29514]| 0.3063063063063063|
|    [7132]|    [8228]|0.10810810810810811|
+----------+----------+-------------------+



In [47]:
model3.transform(transactions).show()

+--------+-------+----------+
|ID_order|    pid|prediction|
+--------+-------+----------+
|     148|[14727]|        []|
|     463|[18007]|        []|
|     471|[26519]|        []|
|     496|[32080]|        []|
|     833| [4354]|        []|
|    1088|[30823]|        []|
|    1238|[11747]|        []|
|    1342|[28512]|        []|
|    1580| [9269]|        []|
|    1591|[21193]|        []|
|    1645|[31840]|        []|
|    1829| [4046]|        []|
|    1959|[21827]|        []|
|    2122|[10029]|        []|
|    2142|[25039]|        []|
|    2366|[11209]|        []|
|    2659| [1065]|        []|
|    2866| [7003]|        []|
|    3175|[20913]|        []|
|    3749| [6847]|        []|
+--------+-------+----------+
only showing top 20 rows



### 4th Model Minimum Support : 0.0001, Minimum Confidence 0.001

In [48]:
fpGrowth4 = FPGrowth(itemsCol="pid", minSupport=0.00001, minConfidence=0.001)

In [49]:
model4 = fpGrowth4.fit(transactions)

In [50]:
model4.freqItemsets.show()

+--------------+----+
|         items|freq|
+--------------+----+
|       [25145]|   1|
|        [3099]|  42|
|       [28246]|   1|
|       [11593]|   2|
|        [8779]|   1|
|        [3616]|   2|
|       [10667]|   2|
|       [20496]|   1|
|       [21064]|   5|
|[21064, 25547]|   1|
|        [8324]|   4|
|       [27394]|   1|
|        [5674]|   2|
|       [16396]|   1|
|       [22462]|   1|
|       [13311]|   1|
|       [22400]|   1|
|       [11614]|   1|
|        [3747]|   1|
|       [17906]|  26|
+--------------+----+
only showing top 20 rows



In [51]:
model4.associationRules.show()

+--------------------+----------+------------------+
|          antecedent|consequent|        confidence|
+--------------------+----------+------------------+
|             [25621]|   [11747]|               1.0|
|             [24509]|   [18028]|               0.5|
|             [23797]|      [78]|0.1111111111111111|
|              [6531]|   [30096]|0.3333333333333333|
|      [27108, 30036]|    [3138]|               1.0|
|      [27108, 30036]|   [19703]|               1.0|
|      [27108, 30036]|   [13723]|               1.0|
|      [27108, 30036]|    [7477]|               1.0|
|        [1956, 5827]|   [23148]|               1.0|
|        [1956, 5827]|    [1544]|               1.0|
|               [121]|   [12287]|               0.2|
|             [19390]|   [28345]|0.3333333333333333|
|[7477, 30036, 19703]|    [3138]|               1.0|
|[7477, 30036, 19703]|   [13723]|               1.0|
|[7477, 30036, 19703]|   [27108]|               1.0|
| [764, 25804, 30254]|   [24388]|             

In [52]:
model4.transform(transactions).show()

+--------+-------+--------------------+
|ID_order|    pid|          prediction|
+--------+-------+--------------------+
|     148|[14727]|                  []|
|     463|[18007]|                  []|
|     471|[26519]|             [28840]|
|     496|[32080]|                  []|
|     833| [4354]|                  []|
|    1088|[30823]|[6520, 23142, 276...|
|    1238|[11747]|      [31745, 25621]|
|    1342|[28512]|                  []|
|    1580| [9269]|      [13832, 16170]|
|    1591|[21193]|             [12820]|
|    1645|[31840]|                  []|
|    1829| [4046]|                  []|
|    1959|[21827]|[26978, 6547, 254...|
|    2122|[10029]|                  []|
|    2142|[25039]|                  []|
|    2366|[11209]|                  []|
|    2659| [1065]|             [19614]|
|    2866| [7003]|                  []|
|    3175|[20913]|                  []|
|    3749| [6847]|                  []|
+--------+-------+--------------------+
only showing top 20 rows



In [53]:
# df_try = spark.createDataFrame([
#     (0,[8614])
# ],["id","pid"])

In [54]:
# model.transform(df_try).show()

In [55]:
# model2.transform(df_try).show()

In [56]:
# model3.transform(df_try).show()

In [57]:
# model4.transform(df_try).show()