# Recomendação de produtos usando Spark GraphFrames

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell'

# from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
spark = SparkSession.builder.master("local[1]").appName("gboaviagemApp").getOrCreate()

In [2]:
sqlc = SQLContext(spark.sparkContext)

Uma vez que este notebook foi executado dentro de um container, rodando a imagem `jupyter/pyspark-notebook`, o path estará diferente. Para enviar o arquivo da máquina host para o sistema de arquivos do container, basta usar o comando `sudo docker cp`.

In [3]:
wdir = "/home/jovyan/work/neo4jdata-200/"

n_users = spark.read.csv(wdir + "neo4j_node_list-Usuario.csv", header=True, inferSchema=True)
n_prods = spark.read.csv(wdir + "neo4j_node_list-Produto.csv", header=True, inferSchema=True)
e_comprou = spark.read.csv(wdir + "neo4j_edge_list-COMPROU.csv", header=True, inferSchema=True)

In [4]:
n_users.show()

+-----------------+---+
|            email| uf|
+-----------------+---+
|user010@gmail.com| CE|
|user007@gmail.com| AM|
|user008@gmail.com| MS|
|user003@gmail.com| MA|
|user001@gmail.com| PB|
|user004@gmail.com| MS|
|user002@gmail.com| PR|
|user006@gmail.com| AP|
|user005@gmail.com| PE|
|user009@gmail.com| SE|
+-----------------+---+



In [5]:
n_users.printSchema()

root
 |-- email: string (nullable = true)
 |-- uf: string (nullable = true)



In [6]:
n_prods.printSchema()

root
 |-- produto_id: string (nullable = true)



In [7]:
e_comprou.printSchema()

root
 |-- email: string (nullable = true)
 |-- produto_id: string (nullable = true)
 |-- valor_pago: double (nullable = true)
 |-- quando: string (nullable = true)



## Dando casting da coluna "quando" de `str` para `timestamp`

Isso poderia ser importante, caso fosse preciso alguma query explorando a natureza temporal das compras. Para a estratégia simples de recomendação construída aqui, isso não era necessário.

In [8]:
from pyspark.sql import functions as F

e_comprou = e_comprou.withColumn('quando', F.current_timestamp())

In [9]:
e_comprou.printSchema()

root
 |-- email: string (nullable = true)
 |-- produto_id: string (nullable = true)
 |-- valor_pago: double (nullable = true)
 |-- quando: timestamp (nullable = false)



## Criando o GraphFrame

In [10]:
nodes1 = n_users.\
    drop_duplicates(subset=['email']).\
    withColumnRenamed('email', 'id').\
    withColumn('tipo', F.lit('Usuario'))

nodes2 = n_prods.\
    drop_duplicates().\
    withColumnRenamed('produto_id', 'id').\
    withColumn('tipo', F.lit('Produto'))

In [11]:
nodes1.count()

10

In [12]:
nodes2.count()

50

In [13]:
nodes = nodes1.join(nodes2, ['id', 'tipo'], 'outer')

In [14]:
nodes.printSchema()

root
 |-- id: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- uf: string (nullable = true)



In [15]:
nodes.limit(5).show()

+---------+-------+----+
|       id|   tipo|  uf|
+---------+-------+----+
|Produto38|Produto|null|
| Produto9|Produto|null|
|Produto36|Produto|null|
| Produto8|Produto|null|
|Produto30|Produto|null|
+---------+-------+----+



In [16]:
e_comprou.count()

169

In [17]:
e_comprou.printSchema()

root
 |-- email: string (nullable = true)
 |-- produto_id: string (nullable = true)
 |-- valor_pago: double (nullable = true)
 |-- quando: timestamp (nullable = false)



In [18]:
edges = e_comprou.\
    withColumnRenamed('email', 'src').\
    withColumnRenamed('produto_id', 'dst').\
    withColumn('tipo', F.lit('COMPROU'))

In [19]:
edges.show()

+-----------------+---------+----------+--------------------+-------+
|              src|      dst|valor_pago|              quando|   tipo|
+-----------------+---------+----------+--------------------+-------+
|user010@gmail.com|Produto30|     148.0|2021-03-20 22:53:...|COMPROU|
|user007@gmail.com|Produto14|     165.0|2021-03-20 22:53:...|COMPROU|
|user010@gmail.com|Produto44|     153.0|2021-03-20 22:53:...|COMPROU|
|user008@gmail.com|Produto48|     196.0|2021-03-20 22:53:...|COMPROU|
|user003@gmail.com|Produto22|      89.0|2021-03-20 22:53:...|COMPROU|
|user001@gmail.com|Produto30|     148.0|2021-03-20 22:53:...|COMPROU|
|user004@gmail.com|Produto34|     108.0|2021-03-20 22:53:...|COMPROU|
|user001@gmail.com|Produto49|     182.0|2021-03-20 22:53:...|COMPROU|
|user002@gmail.com|Produto28|     172.0|2021-03-20 22:53:...|COMPROU|
|user007@gmail.com|Produto36|     130.0|2021-03-20 22:53:...|COMPROU|
|user006@gmail.com|Produto22|      89.0|2021-03-20 22:53:...|COMPROU|
|user005@gmail.com| 

In [20]:
from graphframes import GraphFrame

g = GraphFrame(nodes, edges)

In [21]:
q = g.find("(usuario)-[]->(prod)").withColumn('usuario', F.col('usuario').id)

In [25]:
# Top compradores
# q.groupBy('user').count().sort(F.col('count').desc(), 'user').show()
q = q.groupBy('usuario')\
    .agg(F.count('prod').alias('qtd_produtos'))\
    .sort(F.col('qtd_produtos').desc(), 'usuario')

In [26]:
q.show()

+-----------------+------------+
|          usuario|qtd_produtos|
+-----------------+------------+
|user003@gmail.com|          20|
|user005@gmail.com|          20|
|user007@gmail.com|          20|
|user002@gmail.com|          19|
|user004@gmail.com|          17|
|user006@gmail.com|          16|
|user008@gmail.com|          16|
|user009@gmail.com|          15|
|user010@gmail.com|          14|
|user001@gmail.com|          12|
+-----------------+------------+



In [None]:
q.write.csv("top_compradores")

Reference: https://sparkbyexamples.com/pyspark/pyspark-groupby-explained-with-example/

In [27]:
# Top produtos
q = g.find("(u)-[e]->(produto)") #.withColumn('user', F.col('user').id)

Para filtrar, bastaria adicionar antes do `show`

```python
where(F.col('qtd_compradores') > 10)
```

In [28]:
q = q.withColumn('valor_pago', F.col('e').valor_pago)\
    .groupBy("produto")\
    .agg(
        F.count("u").alias("qtd_compradores"),
        F.avg("valor_pago").alias("preco_medio"))\
    .withColumn('produto', F.col('produto').id)\
    .sort(F.col('qtd_compradores').desc(), 'produto')
    #.write.csv("top_produtos")

In [29]:
q.show()

+---------+---------------+-----------+
|  produto|qtd_compradores|preco_medio|
+---------+---------------+-----------+
| Produto1|              6|      153.0|
|Produto16|              6|      191.0|
|Produto18|              6|      131.0|
|Produto36|              6|      130.0|
|Produto13|              5|      166.0|
| Produto2|              5|      140.0|
|Produto20|              5|      199.0|
|Produto24|              5|       87.0|
|Produto29|              5|      157.0|
| Produto3|              5|      182.0|
|Produto33|              5|      147.0|
|Produto34|              5|      108.0|
| Produto5|              5|      124.0|
|Produto50|              5|      130.0|
|Produto10|              4|      126.0|
|Produto12|              4|      130.0|
|Produto22|              4|       89.0|
|Produto28|              4|      172.0|
|Produto38|              4|       94.0|
| Produto4|              4|      171.0|
+---------+---------------+-----------+
only showing top 20 rows



### Recomendação em grão usuário (offline)

Dado um usuário A, quais os produtos comprados por usuários B que compraram os mesmos produtos que A?

In [30]:
q = g.find("(u1)-[]->(p1); (u2)-[]->(p1); (u2)-[]->(p2)")\
    .withColumn('u1', F.col('u1').id)\
    .withColumn('u2', F.col('u2').id)\
    .withColumn('p1', F.col('p1').id)\
    .withColumn('p2', F.col('p2').id)\
    .where(
        (F.col('u1') == 'user003@gmail.com') &
        (F.col('u1') != F.col('u2')) &
        (F.col('p1') != F.col('p2')))
    

# More complex queries can be expressed by applying filters.
# motifs.filter("b.age > 30").show()

In [31]:
q.limit(5).show()

+-----------------+---------+-----------------+---------+
|               u1|       p1|               u2|       p2|
+-----------------+---------+-----------------+---------+
|user003@gmail.com|Produto11|user004@gmail.com|Produto43|
|user003@gmail.com|Produto11|user004@gmail.com| Produto1|
|user003@gmail.com|Produto11|user004@gmail.com|Produto17|
|user003@gmail.com|Produto11|user004@gmail.com|Produto44|
|user003@gmail.com|Produto11|user004@gmail.com|Produto12|
+-----------------+---------+-----------------+---------+



In [80]:
q.write.csv('graph_recomendacao_usuario003.csv')