# Exercício: tags que mais co-ocorrem

Dado um dataset com avaliações de usuários sobre livros, vamos checar que tags co-ocorrem.

Motivação: ser capaz de dizer que tags têm afinidade entre si.

In [1]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

## Baixando o dataset

Vamos continuar usando o dataset `goodbooks-10k` criado para ser usado em problemas de recomendação. Só para lembrar, ele contém cerca de 6 milhões de avaliações para os 10 mil livros mais populares.

Leia mais no [fast-ml](http://fastml.com/goodbooks-10k-a-new-dataset-for-book-recommendations/).

Dessa vez, vamos baixar os arquivos `book_tags.csv` e `tags.csv`.

Novamente, se tiver erro na execução da célula abaixo, baixe manualmente os arquivos do [github](https://github.com/zygmuntz/goodbooks-10k) e coloque os arquivos `book_tags.csv` e `tags.csv` em uma pasta chamada `data` neste diretório.

In [2]:
# !wget -P data https://github.com/zygmuntz/goodbooks-10k/raw/master/book_tags.csv
# !wget -P data https://github.com/zygmuntz/goodbooks-10k/raw/master/tags.csv

In [None]:
path = 'data' # se local
#path = 'dbfs:/FileStore/tables' # se usar notebook databricks

## Leitura dos datasets

In [3]:
def read(filename):
    return spark.read.csv(f'{path}/{filename}', inferSchema=True, header=True)

In [4]:
books_df = read('books.csv')
ratings_df = read('ratings.csv')
book_tags_df = read('book_tags.csv')
tags_df = read('tags.csv')

## Remoção de tags que não desejamos no dataset

In [5]:
tags_df = tags_df.filter('tag_name not rlike "(read|own|favorit)"')

## Estrutura dos dataframes `book_tags_df` e `tags_df`

In [6]:
book_tags_df.printSchema()

root
 |-- goodreads_book_id: integer (nullable = true)
 |-- tag_id: integer (nullable = true)
 |-- count: integer (nullable = true)



In [7]:
tags_df.printSchema()

root
 |-- tag_id: integer (nullable = true)
 |-- tag_name: string (nullable = true)



## Parte 1: tag mais popular para cada livro

**Exercício:** una os dataframes `books_tags_df` e `tags_df`. Atenção: pela estrutura acima, é possível saber qual é a coluna que deve ser utilizada na operação de `join`.

In [8]:
book_tags_df = book_tags_df \
    .join(tags_df, on='tag_id', how='inner')

**Exercício:** filtre o dataset para que ele só permita linhas em que a coluna `tag_name` tenha tamanho estritamente maior do que 3.

In [9]:
book_tags_df = book_tags_df \
    .filter('LENGTH(tag_name) > 3')

Agora, basta ranquearmos as tags de acordo com sua popularidade para cada livro. Para isso, vamos usar novamente uma `Window`.

In [10]:
from pyspark.sql import Window

In [11]:
w = Window.partitionBy('goodreads_book_id').orderBy(F.desc('count'))

In [12]:
book_tags_df = book_tags_df \
    .withColumn('rank', F.row_number().over(w)) \
    .filter('rank = 1') \
    .drop('rank')

In [13]:
book_tags_df.show(n=5)

+------+-----------------+-----+-----------------+
|tag_id|goodreads_book_id|count|         tag_name|
+------+-----------------+-----+-----------------+
|  7563|             1591|  410|         clàssics|
| 11743|             2122| 3692|          fiction|
|  7116|             2142|   56|christian-fiction|
|  7457|             4900| 8128|         classics|
| 11305|             7993| 1290|          fantasy|
+------+-----------------+-----+-----------------+
only showing top 5 rows



### Extra: remoção de acentos

É possível ver que algumas tags têm acentos e gostaríamos de removê-los. Para isso, vamos utilizar uma função customizada, a chamada `UDF` no Spark.

In [14]:
from unicodedata import normalize

In [15]:
def strip_accents(text):
    return normalize('NFKD', text).encode('ascii', 'ignore').decode('utf-8')

**Exercício:** crie a UDF para aplicarmos a função `strip_accents`.

Dica: veja um exemplo [aqui](https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78).

Note que, em nosso caso, já temos importados os módulos `pyspark.sql.functions` como `F` e `pyspark.sql.types` como `T`.

In [16]:
strip_accents_udf = F.udf(strip_accents, T.StringType())

**Exercício:** sobrescreva a coluna `tag_name` aplicando a UDF `strip_accents_udf` nela mesma.

In [17]:
book_tags_df = book_tags_df \
    .withColumn('tag_name', strip_accents_udf('tag_name'))

In [18]:
book_tags_df.show(n=2)

+------+-----------------+-----+--------+
|tag_id|goodreads_book_id|count|tag_name|
+------+-----------------+-----+--------+
|  7563|             1591|  410|classics|
| 11743|             2122| 3692| fiction|
+------+-----------------+-----+--------+
only showing top 2 rows



## Parte 2: identificação dos livros avaliados pelas tags

**Execício:** adicione a coluna `book_id` no dataframe `book_tags_df`.

Dica: faça uma operação de `join` com o dataframe `books_df` usando como chave (o argumento do parâmetro `on`) uma coluna que ambos os dataframes têm em comum.

In [19]:
book_tags_df = book_tags_df \
    .join(books_df.select('book_id', 'goodreads_book_id') , on='goodreads_book_id', how='inner')

**Execício:** crie um novo dataframe `user_ratings_tags_df`, que é o dataframe `ratings_df`, com a adição da coluna `tag_name`.

Dica: faça uma operação de `join` com o dataframe `book_tags_df` usando como chave (o argumento do parâmetro `on`) a coluna `book_id`. Note que queremos manter o total de linhas do dataset `ratings_df`, assim, é aconselhado que o método do join (parâmetro `how`) seja `left` (caso coloque o `ratings_df` primeiro).

In [20]:
user_ratings_tags_df = ratings_df \
    .join(book_tags_df.select('book_id', 'tag_name'), on='book_id', how='left')

Caso um livro não tenha tags em `book_tags_df`, queremos que ele fique com a tag `unknown`.

In [21]:
user_ratings_tags_df = user_ratings_tags_df \
    .withColumn('tag_name', F.coalesce('tag_name', F.lit('unknown')))

## Parte 3: criação dos pares de tags de cada usuário

O primeiro passo é a agregação, para cada usuário, de todas as tags.

**Exercício:** agrupe o dataframe `user_ratings_tags_df` por usuário e use uma função de agregação que liste todas as `tag_name` com as quais esse usuário interagiu. Dê a essa coluna o nome de `tags`.

Dica: existem duas funções que listam todos valores para cada chave de agregação: `collect_set` e `collect_list`. Neste caso, estamos interessados em uma lista com objetos únicos, assim, `collect_set` é a função mais indicada.

In [22]:
user_tags_df = user_ratings_tags_df \
    .groupBy('user_id') \
    .agg(F.collect_set('tag_name').alias('tags'))

Agora, vamos montar os pares usando a função `combinations` do módulo `itertools`.

In [23]:
from itertools import combinations

In [24]:
def make_pairs(list_objs):
    return list(combinations(sorted(list_objs), 2))

Como é uma função customizada, precisamos criar uma UDF para aplicá-la!

Nesse caso, o retorno é uma lista de tuplas de `string` e, por isso, vamos criar um tipo especial para tupla.

(Alternativamente, poderíamos modificar o retorno para que ele fosse lista de lista de `string`. Quer tentar?)

In [25]:
tuple_type = T.StructType([
    T.StructField('pair_1', T.StringType(), nullable=False),
    T.StructField('pair_2', T.StringType(), nullable=False)])

**Exercício:** crie a UDF `make_pairs_udf`. Note que o tipo do retorno será dado por `T.ArrayType(tuple_type)`.

In [26]:
make_pairs_udf = F.udf(make_pairs, T.ArrayType(tuple_type))

Alternativamente, a chamada abaixo cria a UDF `make_pairs_df` e também registra a função para uso dentro de uma query Spark SQL.

```python
make_pairs_udf = spark.udf.register('make_pairs_udf', make_pairs, T.ArrayType(tuple_type))
```

**Exercício:** aplique `make_pairs_udf` na coluna `tags`, criando uma coluna chamada `pairs`. Ao final, delete a coluna `tags`, usando o comando `drop`.

In [27]:
pairs_df = user_tags_df \
    .withColumn('pairs', make_pairs_udf('tags')) \
    .drop('tags')

In [28]:
pairs_df.show(n=2)

+-------+--------------------+
|user_id|               pairs|
+-------+--------------------+
|    148|[[africa, audiobo...|
|    463|[[adventure, chic...|
+-------+--------------------+
only showing top 2 rows



In [29]:
pairs_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- pairs: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- pair_1: string (nullable = false)
 |    |    |-- pair_2: string (nullable = false)



## Parte 4: contagem dos pares

O primeiro passo aqui é transformar nosso dataframe, de forma que cada um dos elementos da lista contida na coluna `pairs` esteja em uma linha. Isso pode ser facilmente feito através do método `explode`:

In [30]:
one_pair_per_row_df = pairs_df \
    .select('user_id', F.explode('pairs').alias('pair'))

In [31]:
one_pair_per_row_df.show(n=2, truncate=False)

+-------+-------------------+
|user_id|pair               |
+-------+-------------------+
|148    |[africa, audiobook]|
|148    |[africa, book-club]|
+-------+-------------------+
only showing top 2 rows



**Exercício:** para cada par, conte a quantidade de usuários que interagiram com ele. Ao final, faça uma ordenação decrescente do valor da coluna `count`.

In [32]:
pair_count_df = one_pair_per_row_df \
    .groupBy('pair') \
    .count() \
    .orderBy(F.desc('count'))

## Parte 5: os pares mais comuns são...

In [33]:
pair_count_df.show(n=10, truncate=False)

+-----------------------------+-----+
|pair                         |count|
+-----------------------------+-----+
|[fantasy, fiction]           |48531|
|[classics, fiction]          |47076|
|[classics, fantasy]          |44802|
|[fiction, non-fiction]       |43409|
|[fiction, young-adult]       |43322|
|[fantasy, young-adult]       |42684|
|[fantasy, non-fiction]       |40720|
|[classics, non-fiction]      |40696|
|[fiction, historical-fiction]|39634|
|[classics, young-adult]      |39549|
+-----------------------------+-----+
only showing top 10 rows

