## Calcular modelo de `Who X also X`
Calculo do modelo de estudo paseado na interação de visualização de produtos num e-commerce, assim  criando um modelo de recomendação `quem-viu-tambem-viu`

In [1]:
# Importação para carregar o Spark
import findspark
findspark.init()

### Importação da bibliotecas


In [2]:
import pandas as pd

from pyspark.sql.functions import *
from pyspark.sql import SparkSession
 
from pyspark.sql.types import DoubleType
from pyspark.sql.window import Window

spark = (
    SparkSession.builder\
    .master("local")
    .appName("bart-calcullate-wxax")\
    .getOrCreate()
)

### Calculando Recomendações

In [3]:
# Constants
action_type = 1
attributes = ['source_item_id']
min_occurrence = 2
max_recommendations = 200

In [4]:
# Carregando Datasets
actions = spark.createDataFrame(
    pd.read_csv("../datasets/interactions.csv")
)
actions.printSchema()

root
 |-- Unnamed: 0: long (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- action_id: long (nullable = true)
 |-- timestamp: double (nullable = true)



In [5]:
# Inputs
source_actions = actions.filter(actions.action_id == action_type).alias(
    'source'
)
target_actions = actions.filter(actions.action_id == action_type).alias(
    'target'
)

raw_recommendations = (
    source_actions.join(target_actions, 'customer_id', 'inner')
    .filter('source.product_id <> target.product_id')
    .groupBy('source.product_id', 'target.product_id')
    .count()
    .filter(col('count') > min_occurrence)
    .selectExpr(
        'source.product_id as source_item_id',
        'target.product_id as recommended_item_id',
        'count as score',
    )
)

### Recomendação de Conteúdo para o Usuário

In [6]:
# Filter recommendations
window = Window.partitionBy(*attributes).orderBy(
    raw_recommendations.score.cast(DoubleType()).desc()
)
limit_recommendations = raw_recommendations.withColumn(
    'rank', dense_rank().over(window)
).filter(f'rank <= {max_recommendations}')

In [7]:
# Groub by recommendations
group_by = limit_recommendations.groupBy(*attributes)
group_recommendations = group_by.agg(
    collect_set(
        struct(
            limit_recommendations.recommended_item_id,
            limit_recommendations.score.cast(DoubleType()).alias('score'),
        )
    ).alias('recommendations')
)

In [8]:
# Recommended User Product
group_recommendations.show()

+--------------+--------------------+
|source_item_id|     recommendations|
+--------------+--------------------+
|      Dce730CE|[[39D90E8F, 1351....|
|      a827F4CD|[[2E305BAe, 1547....|
|      24d1E961|[[4b15bf94, 1517....|
|      aFaAe1F4|[[CCcd595A, 1464....|
|      3cDfe90c|[[FA1CacBa, 1465....|
|      CCcd595A|[[BfDd34cA, 1431....|
|      e895dE1F|[[03cd416f, 1503....|
|      DFcAD8bB|[[5cFCEC19, 1471....|
|      B2CBd81B|[[99E50C86, 1488....|
|      bEDFB29C|[[d596D579, 1465....|
|      cDF93A2f|[[d0E68bDD, 1499....|
|      a2b9FfdB|[[2E305BAe, 1424....|
|      Fa0FeA74|[[F3aCFE4f, 1484....|
|      c47DeCf5|[[60cDbc95, 1304....|
|      d0E68bDD|[[a71bB9E3, 1410....|
|      42D0EBaF|[[9af8e012, 1491....|
|      20DD1ea3|[[dfe00AC7, 1488....|
|      8BABfAcE|[[11e0bdf8, 1470....|
|      A8312a0A|[[faeDe05d, 1347....|
|      0dd89C7A|[[dfe00AC7, 1385....|
+--------------+--------------------+
only showing top 20 rows



In [9]:
partition = 20 # Numero de arquivos (Partiçoes) geradas
group_recommendations.repartition(partition)\
    .write.mode("overwrite")\
    .parquet("/tmp/recommendations_wxax")

In [10]:
! ls -lhs /tmp/recommendations_wxax/ | awk '{print $6,$10}'

 
4,6K part-00000-e65af357-04ed-4af0-8904-f5596bd56250-c000.snappy.parquet
3,2K part-00001-e65af357-04ed-4af0-8904-f5596bd56250-c000.snappy.parquet
3,9K part-00002-e65af357-04ed-4af0-8904-f5596bd56250-c000.snappy.parquet
3,8K part-00003-e65af357-04ed-4af0-8904-f5596bd56250-c000.snappy.parquet
3,5K part-00004-e65af357-04ed-4af0-8904-f5596bd56250-c000.snappy.parquet
4,4K part-00005-e65af357-04ed-4af0-8904-f5596bd56250-c000.snappy.parquet
4,1K part-00006-e65af357-04ed-4af0-8904-f5596bd56250-c000.snappy.parquet
4,3K part-00007-e65af357-04ed-4af0-8904-f5596bd56250-c000.snappy.parquet
4,0K part-00008-e65af357-04ed-4af0-8904-f5596bd56250-c000.snappy.parquet
3,5K part-00009-e65af357-04ed-4af0-8904-f5596bd56250-c000.snappy.parquet
5,0K part-00010-e65af357-04ed-4af0-8904-f5596bd56250-c000.snappy.parquet
3,9K part-00011-e65af357-04ed-4af0-8904-f5596bd56250-c000.snappy.parquet
3,9K part-00012-e65af357-04ed-4af0-8904-f5596bd56250-c000.snappy.parquet
4,1K part-00013-e65af357-04ed-4af0-