# Initial Setup

In [2]:
# basic imports 

import os # OS e.g directory structure
import sys
import numpy as np # linear algebra
import scipy as sc  # scientific computing
import pandas as pd # data processing, file I/O
import seaborn as sns  # visualization
import matplotlib.pyplot as plt # visualization
import warnings
warnings.filterwarnings("ignore")

In [3]:
# Spark related imports

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *

from pyspark.ml.feature import StringIndexer
from pyspark.ml.fpm import FPGrowth
from pyspark.ml.fpm import PrefixSpan
from pyspark.ml.evaluation import RegressionEvaluator


# Data Exploration

In [None]:
! echo "Oct-2019"
! head -n 5 dataset/2019-Oct.csv
! tail -n 5 dataset/2019-Oct.csv
! echo "Nov-2019"
! head -n 5 dataset/2019-Nov.csv
! tail -n 5 dataset/2019-Nov.csv

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("abd_recommendation").getOrCreate()

In [5]:
sales = spark.read.csv("dataset/2019-*-small.csv", header="true", inferSchema="true", sep=",")
#sales_nov = spark.read.csv("dataset/2019-Nov-small.csv", header="true", inferSchema="true", sep=",")

In [6]:
#sales_oct.printSchema()
#so = sales_oct.count()

#sales_nov.printSchema()
#sv = sales_nov.count()

sales.printSchema()
sales.count()

root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



199998

In [9]:
sales.groupBy('event_type').count().show(truncate=False)

+----------+------+
|event_type|count |
+----------+------+
|purchase  |3077  |
|view      |194617|
|cart      |2304  |
+----------+------+



In [7]:
# https://stackoverflow.com/questions/40163106/cannot-find-col-function-in-pyspark

sales.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in sales.columns]).show()

+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+
|         0|         0|         0|          0|        66427|30168|    0|      0|           0|
+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+



In [10]:
sales.groupBy('brand').count().show(truncate=False)

+-------------+-----+
|brand        |count|
+-------------+-----+
|yokohama     |672  |
|welss        |17   |
|tuffoni      |10   |
|tmnt         |1    |
|serebro      |14   |
|edifier      |11   |
|globo        |1    |
|tega         |1    |
|sonel        |34   |
|nutricia     |13   |
|bombbar      |4    |
|alutec       |9    |
|goo.n        |6    |
|keenway      |5    |
|sigma        |10   |
|nocnezna     |1    |
|trianglegroup|2    |
|fitwell      |2    |
|belaakalitva |2    |
|ariston      |346  |
+-------------+-----+
only showing top 20 rows



In [11]:
sales.describe("price").show()

+-------+-----------------+
|summary|            price|
+-------+-----------------+
|  count|           199998|
|   mean|284.1862630626338|
| stddev|351.4834010830769|
|    min|              0.0|
|    max|          2574.07|
+-------+-----------------+



In [None]:
no_price = sales.select('*').where(col('price')==0)
no_price.show()
no_price.count()

Groupby user session using market basket since we have not user valuation

In [None]:
# https://stackoverflow.com/questions/48406304/groupby-and-concat-array-columns-pyspark
sales.groupBy("event_type",'user_session').agg(collect_list('product_id').alias('products')).show(20, truncate=False)

In [None]:
sales.select("event_type").distinct().show(truncate=False)

In [None]:
sales.select("category_code").distinct().show()

Drop column event_time, price and category_code

In [12]:
sales_short = sales.drop("price", "event_time", "category_code")

In [None]:
sales_short.printSchema()

In [13]:
df_user_brand = sales_short.select('user_id','brand')
print(f'duplicate rows = {df_user_brand.count()-df_user_brand.dropDuplicates().count()}')
df_user_brand = df_user_brand.dropDuplicates()

#indexer = StringIndexer(inputCol="category_id", outputCol="category_id-Index")
#df_user_cat_id_indexed = indexer.fit(df_user_cat_id).transform(df_user_cat_id)

duplicate rows = 120652


recomendation based on brand, product viewd and purchased

In [19]:
df_s_b = sales_short.select("user_id", "product_id")
df_s_b = df_s_b.dropDuplicates()



df_session_basket = df_s_b.groupBy('user_id').agg(collect_list('product_id').alias('products'))
df_session_basket.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- products: array (nullable = true)
 |    |-- element: integer (containsNull = false)



In [21]:
df_s_b.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_s_b.columns]).show()

+-------+----------+
|user_id|product_id|
+-------+----------+
|      0|         0|
+-------+----------+



In [20]:
df_session_basket.show()

+---------+--------------------+
|  user_id|            products|
+---------+--------------------+
|445162060|  [2601695, 2602161]|
|509481306|           [3500116]|
|512366675|[4700634, 4700542...|
|512513760|[4802159, 2840077...|
|512552482|[22700420, 227000...|
|512697003|[17300211, 173007...|
|512818893|[15700033, 15700074]|
|512907846|          [12100020]|
|512975815|          [18100019]|
|513101609|[13103078, 50600021]|
|513136911|[1005153, 6300731...|
|513161211|          [22000069]|
|513165061|[26004578, 260059...|
|513168011|  [1004720, 1004875]|
|513172205|           [4100339]|
|513216657|           [1005117]|
|513247621|[31401133, 15100147]|
|513266555|[12300559, 123009...|
|513291250|          [12500691]|
|513360681|[15700176, 480397...|
+---------+--------------------+
only showing top 20 rows



In [15]:
df_user_basket = df_user_brand.groupBy('user_id').agg(collect_list('brand').alias('products'))
df_user_basket.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- products: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [58]:
df_user_basket.show()
df_session_basket.count()

+---------+--------------------+
|  user_id|            products|
+---------+--------------------+
|445162060|    [flama, shivaki]|
|509481306|          [moulinex]|
|512366675|[anytek, incar, i...|
|512513760|        [jbl, baden]|
|512552482|[bosch, marshal, ...|
|512697003|[lanvin, versace,...|
|512818893|                  []|
|512907846|              [zinc]|
|512975815|           [tarkett]|
|513101609|     [trebl, denzel]|
|513136911|[braun, xiaomi, v...|
|513161211|           [higashi]|
|513165061|                  []|
|513168011|   [huawei, samsung]|
|513172205|         [microsoft]|
|513216657|             [apple]|
|513247621|  [lider, pasabahce]|
|513266555|[makita, dewalt, ...|
|513291250|             [bosch]|
|513360681|  [bts, samsung, sv]|
+---------+--------------------+
only showing top 20 rows



40822

In [22]:
fpGrowth = FPGrowth(itemsCol="products", minSupport=0.001, minConfidence=0.001)
model = fpGrowth.fit(df_session_basket)

In [42]:
# session vs product? sessions vs brand?

#predictions = model.transform(dftest)
#predictions.show(truncate=False)

# Display frequent itemsets.
model.freqItemsets.show()

# Display generated association rules.
model.associationRules.show()

# transform examines the input items against all the association rules and summarize the
# consequents as prediction
model.transform(df_session_basket).show()

+------------------+----+
|             items|freq|
+------------------+----+
|         [1004655]|  57|
|         [1005115]|1233|
|         [1004856]|1229|
|[1004856, 1005115]|  46|
|         [4803879]|  57|
|         [1004767]| 999|
|[1004767, 1004856]| 167|
|[1004767, 1005115]|  42|
|        [28718083]|  57|
|         [1005105]| 769|
|[1005105, 1005115]| 241|
|         [1002099]|  57|
|         [1004249]| 641|
|[1004249, 1005105]|  62|
|[1004249, 1005115]| 159|
|         [1003709]|  57|
|         [1004870]| 607|
|[1004870, 1004767]| 149|
|[1004870, 1004856]|  81|
|         [1003712]|  57|
+------------------+----+
only showing top 20 rows

+------------------+----------+--------------------+------------------+
|        antecedent|consequent|          confidence|              lift|
+------------------+----------+--------------------+------------------+
|         [1004857]| [1004856]| 0.45161290322580644|15.000603690385574|
|         [1002544]| [1004249]| 0.11844660194174757|  7.543256

In [43]:
model.transform(df_session_basket).where(filter_len(col("prediction"),lit(1))).show()

+---------+--------------------+--------------------+
|  user_id|            products|          prediction|
+---------+--------------------+--------------------+
|513431118|  [1003312, 1004246]|  [1004249, 1004258]|
|513595928|           [1003317]|[1004249, 1005105...|
|513818082|           [1004249]|[1005105, 1005115...|
|514569249|  [1005115, 1002629]|[1004856, 1004767...|
|514714578|           [1004249]|[1005105, 1005115...|
|514764444|           [1004857]|           [1004856]|
|515687124|[3601127, 2702590...|           [3600666]|
|517267340|[1004768, 1004871...|[1004856, 1005115...|
|517341202|           [1002544]|[1004249, 1005115...|
|517450831|[1004767, 1004209...|[1004856, 1005115...|
|517531224|[1005177, 1004653...|[1004873, 1005105...|
|517644942|[1004856, 1004836...|[1005115, 1004249...|
|518413677|  [1004873, 1002544]|[1004249, 1005115...|
|518619616|[1005100, 1004857...|[1004833, 1004836...|
|518638475|  [1004873, 1004659]|[1004767, 1004870...|
|518740148|           [10048

In [62]:
#1003312
model.associationRules.where(col("antecedent")==array([lit(1004246)])).show()
model.associationRules.where(col("antecedent")==array([lit(1003312)])).show()

model.freqItemsets.where(col("items")==array([lit(1004857),lit(1004856)])).show()
model.freqItemsets.where(col("items")==array([lit(1004857)])).show()
model.freqItemsets.where(col("items")==array([lit(1004856)])).show()

+----------+----------+-------------------+------------------+
|antecedent|consequent|         confidence|              lift|
+----------+----------+-------------------+------------------+
| [1004246]| [1004249]|0.25961538461538464|16.533571342853715|
| [1004246]| [1004258]|0.21634615384615385| 28.30667529585799|
+----------+----------+-------------------+------------------+

+----------+----------+----------+----+
|antecedent|consequent|confidence|lift|
+----------+----------+----------+----+
+----------+----------+----------+----+

+------------------+----+
|             items|freq|
+------------------+----+
|[1004857, 1004856]|  70|
+------------------+----+

+---------+----+
|    items|freq|
+---------+----+
|[1004857]| 155|
+---------+----+

+---------+----+
|    items|freq|
+---------+----+
|[1004856]|1229|
+---------+----+



In [57]:
model.associationRules.orderBy(col("confidence").desc()).show()
model.associationRules.orderBy(col("lift").asc()).show()

+------------------+----------+-------------------+------------------+
|        antecedent|consequent|         confidence|              lift|
+------------------+----------+-------------------+------------------+
|         [3600666]| [3600661]| 0.5316455696202531|126.17927583161611|
|[1004833, 1004767]| [1004856]|0.46078431372549017|15.305237798943825|
|         [1004839]| [1004838]|0.45555555555555555|   67.379307568438|
|         [1004857]| [1004856]|0.45161290322580644|15.000603690385574|
|         [4804055]| [4804056]| 0.4105691056910569| 35.35918150320744|
|         [1004858]| [1004856]|              0.375| 12.45585842148088|
|         [4802036]| [4804056]|0.36904761904761907|31.783252963632712|
|         [1004768]| [1004767]|0.35428571428571426|14.477128557128555|
|         [1005098]| [1004856]| 0.3493150684931507|11.602717433708216|
|         [1005100]| [1004856]| 0.3301282051282051|10.965413824038722|
|         [1005104]| [1005115]|0.32642487046632124|10.807231193979048|
|     

In [1]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType, BooleanType

brand = [["samsung", "xiaomi"],["samsung", "apple"],["meizu", "honor"]]

a=[]
for b in brand:
    def contains_all(x):
        if x is not None:
            return set(b).issubset(set(x))

    cols = udf(contains_all, BooleanType())

    a.append([b,df_user_basket.select("*").where(cols(col("products"))).count()])

df_user_basket.withColumn("contains", cols(col("products"))).where(col("contains")=="true").show()
print(df_user_basket.withColumn("contains", cols(col("products"))).where(col("contains")=="true").count())

NameError: name 'df_user_basket' is not defined

In [29]:
from itertools import combinations
import threading

def contains_all(x,y):
    print(x)
    print(type(x))
    print(y)
    if x is not None:
        return set(y).issubset(set(x))

def get_subsets(x, c):
    return [list(i) for i in combinations(x, c )]

def filter_len(x,c):
    return len(x) >= c
    
cols = udf(contains_all, BooleanType())
subset = udf(get_subsets, ArrayType(StringType()))
filter_len = udf(filter_len, BooleanType())

def t(items, df):
    a=[]
    for b in items:
        a.append((b,df.select("*").where(cols(col("basket"),array([lit(i) for i in b]))).count()))
        print(a[len(a)-1])

    return a

def apriori(dataframe, item_column, user_column, min_support):
    #list of items
    items = dataframe.groupBy(item_column).count()

    #create basket
    df_user_basket = dataframe.groupBy(user_column).agg(collect_list(item_column).alias('basket'))
    
    accepted_items = items.select("*").where(col("count")>min_support)
    #primeira
    items_pairs = []
    items_as_array = accepted_items.select(item_column).collect()
    count = accepted_items.count()
    for i in range(count):
        for j in range(i+1, count):
            items_pairs.append([items_as_array[i].brand, items_as_array[j].brand])
    
    # remover single items
    # TODO
    basket_temp = df_user_basket.select("*").filter(size(col("basket")) > 1)
    t(items_pairs[0][0],basket_temp)
    '''x1 = threading.Thread(target=t, args=(i1, basket_temp))
    x2 = threading.Thread(target=t, args=(i2, basket_temp))
    x1.start()
    x2.start()'''
    
    


In [75]:
brands_users = sales.select("brand", "user_id").filter(sales.brand.isNotNull())
df_user_brand = brands_users.dropDuplicates()

In [116]:
#cenas = apriori(df_user_brand, "brand", "user_id", 150)
[list(i) for i in combinations(["hyundai", "magnetta", "leo"], 2 )]

[['hyundai', 'magnetta'], ['hyundai', 'leo'], ['magnetta', 'leo']]

In [125]:
basket= df_user_brand.groupBy("user_id").agg(collect_list("brand").alias('basket'))

basket.where(filter_len(col("basket"), lit(2))).withColumn("subsets",subset(col("basket"), lit(2))).show(truncate=False)

+---------+------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id  |basket                                                            |subsets                                                                                                                                                                                                                                                                                                                             

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType, BooleanType

a=[]
for b in items_pairs:
    def contains_all(x):
        if x is not None:
            return set(b).issubset(set(x))

    cols = udf(contains_all, BooleanType())

    a.append([b,basket_temp.select("*").where(cols(col("basket"))).count()])

In [None]:
schema = StructType([
    StructField('frequent_item', StringType(), True),
    StructField('Count', IntegerType(), True)
])
rdd = spark.sparkContext.parallelize(data)
df = spark.createDataFrame(rdd,schema)
print(df.schema)
df.show()

1. Como validar o output dos algoritmos?
    nao avaliar, justificar bem e descrever regras associação
2. FPgrowth vs apriori
    fpgrowth
3. exploraçao de dados
    analisar parametros do modelo
4. split dos dados validar output?
    nao resposta na 1
    
http://rasbt.github.io/mlxtend/user_guide/frequent_patterns/fpgrowth/#example-2-apriori-versus-fpgrowth