
# 1. Bibliotecas

In [0]:
from pyspark.ml.fpm import FPGrowth
from datetime import datetime
from pyspark.sql.types import DateType, StringType, IntegerType
from pyspark.sql.functions import size, col, collect_list, collect_set, concat_ws, lit, udf

---
# 2. Importando Dados

In [0]:
df_order_items = spark.table('datum.silver.olist_order_items')
df_products = spark.table('datum.silver.olist_products')
df_products = df_products.dropna(subset=['product_category_name'])

---
# 3. Aplicando modelo de ML

Foi usado o seguinte artigo como base para aplicação de um modelo já existente dentro do PySpark chamado de FPGrowth

<a href="https://www.databricks.com/blog/2018/09/18/simplify-market-basket-analysis-using-fp-growth-on-databricks.html">Marker Basket</a>

In [0]:
df_join = (df_order_items.join(
    df_products, 
    df_order_items.product_id == df_products.product_id, 
    how='left').
           drop(df_products.product_id))

In [0]:
df_grouped = (df_join
              .groupBy('order_id')
              .agg(collect_set('product_category_name').alias('products')))

In [0]:
fpGrowth = FPGrowth(itemsCol='products', minSupport=0.00001, minConfidence=0.001)
model = fpGrowth.fit(df_grouped)

In [0]:
frequent_itemsets = model.freqItemsets
df_most_common_product_pair = frequent_itemsets.filter(size(frequent_itemsets['items']) == 2)

In [0]:
association_rules = model.associationRules

In [0]:
association_rules.limit(15).display()

antecedent,consequent,confidence,lift,support
List(consoles_games),List(brinquedos),0.0018832391713747,0.0478156654870979,2.0270407232481305e-05
List(relogios_presentes),List(audio),0.0010668563300142,0.3007498475919529,6.08112216974439e-05
List(relogios_presentes),List(moveis_decoracao),0.0012446657183499,0.0190426713857519,7.094642531368455e-05
List(artes_e_artesanato),List(ferramentas_jardim),0.0434782608695652,1.2193934300615468,1.0135203616240653e-05
List(artes_e_artesanato),List(brinquedos),0.0434782608695652,1.103918190158652,1.0135203616240653e-05
List(cine_foto),List(esporte_lazer),0.0153846153846153,0.1966241530490235,1.0135203616240653e-05
List(cine_foto),List(telefonia),0.0153846153846153,0.36149999084031,1.0135203616240653e-05
List(cine_foto),List(cool_stuff),0.0153846153846153,0.4179345984412064,1.0135203616240653e-05
"List(relogios_presentes, utilidades_domesticas)",List(moveis_decoracao),0.5,7.649713133819198,1.0135203616240653e-05
"List(moveis_escritorio, cama_mesa_banho)",List(moveis_decoracao),0.2,3.0598852535276797,1.0135203616240653e-05



Conforme vemos no artigo o modelo procura prever qual a próxima categoria de produto que será comprada

In [0]:
df_most_common_product_pair = df_most_common_product_pair.withColumn(
    'categoria_produtos', concat_ws(', ', 'items')).select('categoria_produtos', 'freq')

In [0]:
df_most_common_product_pair = df_most_common_product_pair.withColumn('date_ref_carga', lit(datetime.now()).cast(DateType())).withColumn('freq', col('freq').cast(StringType()))

In [0]:
def order_category(text:str) -> str:
    final = text.split(',')
    final = ', '.join(sorted([x.strip() for x in list(set(final))]))
    return final

udf_order_category = udf(lambda x: order_category(str(x)), StringType())

In [0]:
df_most_common_product_pair = (df_most_common_product_pair
                               .withColumn('freq', col('freq').cast(IntegerType()))\
                               .orderBy('freq', ascending=False)\
                               .withColumn('categoria_produtos', udf_order_category('categoria_produtos')))

In [0]:
df_most_common_product_pair.limit(10).display()

categoria_produtos,freq,date_ref_carga
"cama_mesa_banho, moveis_decoracao",70,2024-04-14
"cama_mesa_banho, casa_conforto",43,2024-04-14
"moveis_decoracao, utilidades_domesticas",24,2024-04-14
"cama_mesa_banho, utilidades_domesticas",20,2024-04-14
"bebes, cool_stuff",20,2024-04-14
"bebes, brinquedos",19,2024-04-14
"bebes, cama_mesa_banho",17,2024-04-14
"ferramentas_jardim, moveis_decoracao",17,2024-04-14
"beleza_saude, esporte_lazer",14,2024-04-14
"casa_construcao, moveis_decoracao",13,2024-04-14


---
# 4. Agrupando dados para termos a mesma visualização do modelo mas com os dados disponíveis

In [0]:
_sql = """
WITH
orders AS (
  SELECT DISTINCT order_id, product_id
  FROM datum.silver.olist_order_items
),

products AS (
  SELECT DISTINCT product_id, product_category_name
  FROM datum.silver.olist_products
  ORDER BY product_category_name
),

agrupado AS (
  SELECT order_id, 
         COLLECT_SET(product_category_name) AS product_category_name
  FROM orders AS o
    LEFT JOIN products AS p
      ON o.product_id = p.product_id
  GROUP BY order_id
  HAVING SIZE(COLLECT_SET(p.product_category_name)) == 2)

SELECT REPLACE(REPLACE(CAST(product_category_name AS STRING), '[', ''), ']', '') AS categoria_produtos, 
       CAST(COUNT(*) AS INTEGER) AS total,
       CAST(NOW() AS DATE) AS date_ref_carga
FROM agrupado
GROUP BY product_category_name
ORDER BY COUNT(*) DESC
"""

pares_mais_comuns_atuais = spark.sql(_sql)

In [0]:
pares_mais_comuns_atuais.limit(15).display()

categoria_produtos,total,date_ref_carga
"cama_mesa_banho, moveis_decoracao",67,2024-04-14
"cama_mesa_banho, casa_conforto",42,2024-04-14
"utilidades_domesticas, moveis_decoracao",22,2024-04-14
"utilidades_domesticas, cama_mesa_banho",20,2024-04-14
"brinquedos, bebes",19,2024-04-14
"bebes, cool_stuff",19,2024-04-14
"ferramentas_jardim, moveis_decoracao",16,2024-04-14
"bebes, cama_mesa_banho",15,2024-04-14
"beleza_saude, esporte_lazer",13,2024-04-14
"perfumaria, beleza_saude",12,2024-04-14


---
# 5. Delta Lake

In [0]:
%sql

USE CATALOG datum

In [0]:
%sql

USE DATABASE gold

In [0]:
%sql

CREATE TABLE IF NOT EXISTS olist_ml_pares_produtos
(
  categoria_produtos STRING,
  freq               INTEGER,
  date_ref_carga     DATE
)
USING DELTA
LOCATION 'abfss://unity-datum@datumunity.dfs.core.windows.net/gold/olist_ml_pares_produtos'
PARTITIONED BY (date_ref_carga)

In [0]:
if df_most_common_product_pair.count() != 0 and df_most_common_product_pair is not None:
    df_most_common_product_pair.write.format('delta').mode('overwrite').save('abfss://unity-datum@datumunity.dfs.core.windows.net/gold/olist_ml_pares_produtos')

In [0]:
%sql

CREATE TABLE IF NOT EXISTS olist_pares_produtos
(
  categoria_produtos STRING,
  total              INTEGER,
  date_ref_carga     DATE
)
USING DELTA
LOCATION 'abfss://unity-datum@datumunity.dfs.core.windows.net/gold/olist_pares_produtos'
PARTITIONED BY (date_ref_carga)

In [0]:
if pares_mais_comuns_atuais.count() != 0 and pares_mais_comuns_atuais is not None:
    pares_mais_comuns_atuais.write.format('delta').mode('overwrite').save('abfss://unity-datum@datumunity.dfs.core.windows.net/gold/olist_pares_produtos')

In [0]:
del df_most_common_product_pair, pares_mais_comuns_atuais