# ETL para Redis

## Instalações

In [1]:
%pip install tqdm
%pip install pyarrow

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


## Importações

In [2]:
import os
import sys
import json

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
from pyspark.sql.functions import lit

from neo4j_consulta import generate_recommendations

## Constantes

In [3]:
DATASET_PATH = '../../dataset'

USER_PATH = os.path.join(DATASET_PATH, 'users-details-2023.csv')
USER_FILTERED_PATH = os.path.join(DATASET_PATH, 'user-filtered.csv')
ANIME_FILTERED_PATH = os.path.join(DATASET_PATH, 'anime-filtered.csv')

## Código para resolver um problema de versão encontrada pelo PySpark

A resolução do erro foi encontrada em uma resposta no [StackOverflow](https://stackoverflow.com/a/65010346)

In [4]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

## Configurando inicialização do "conector" Neo4j spark

In [5]:
spark = SparkSession.\
        builder.\
        config("spark.jars", "../../spark-redis/target/spark-redis_2.12-3.1.0-SNAPSHOT-jar-with-dependencies.jar").\
        config('spark.executor.memory', '8g').\
        getOrCreate()

## Lendo o arquivo users-details-2023.csv

In [6]:
df_user = spark.read.csv(USER_PATH, inferSchema=True, header=True, escape='"', multiLine=True)
df_user = df_user.select(['mal ID', 'username', 'gender', 'days watched', 'completed', 'dropped', 'location'])

In [7]:
df_user.dtypes

[('mal ID', 'int'),
 ('username', 'string'),
 ('gender', 'string'),
 ('days watched', 'double'),
 ('completed', 'double'),
 ('dropped', 'double'),
 ('location', 'string')]

In [8]:
df_user = df_user.withColumn("completed", col("completed").cast(IntegerType()))\
                 .withColumn("dropped", col("dropped").cast(IntegerType()))

## Lendo o arquivo anime-filtered.csv

In [9]:
df_anime_filtered = spark.read.csv(ANIME_FILTERED_PATH, inferSchema=True, header=True, escape='"', multiLine=True)
df_anime_filtered = df_anime_filtered.select(['anime_id', 'name'])

## Lendo o arquivo user_filtered.csv

In [10]:
df_user_filtered = spark.read.csv(USER_FILTERED_PATH, inferSchema=True, header=True, escape='"', multiLine=True)
df_user_filtered = df_user_filtered.select(['anime_id', 'user_id'])

## Fazendo join entre anime-filtered.csv e user_filtered.csv

In [11]:
df_consumed_animes = df_user_filtered.join(df_anime_filtered, 'anime_id')

In [None]:
df_consumed_animes = df_consumed_animes.withColumn("img", lit('img-path'))

## Gerando os agregados

### Coletando as recomendações para os usuários

In [12]:
recommendations = generate_recommendations()

In [13]:
print(recommendations[148])

[{'id': 1735, 'name': 'Naruto: Shippuuden', 'img': 'img-path'}, {'id': 1535, 'name': 'Death Note', 'img': 'img-path'}, {'id': 5114, 'name': 'Fullmetal Alchemist: Brotherhood', 'img': 'img-path'}]


### Criando os agregados iniciais

In [14]:
agregados = {}

users = df_user.collect()
qnt_users = len(users)

for user in users:
    user_id = user['mal ID']
    
    agregados[user_id] = {}
    agregados[user_id]['username'] = user['username']
    agregados[user_id]['gender'] = user['gender']
    agregados[user_id]['days watched'] = user['days watched']
    agregados[user_id]['completed'] = user['completed']
    agregados[user_id]['dropped'] = user['dropped']
    agregados[user_id]['location'] = user['location']
    
    if user_id in recommendations:
        agregados[user_id]['recomendations'] = recommendations[user_id]

100%|██████████| 731290/731290 [00:10<00:00, 68762.40it/s] 


## Adicionando os ultimos animes assistidos ao usuários

In [16]:
anime_list_schema = ArrayType(
                        StructType([
                            StructField('id', IntegerType()),
                            StructField('name', StringType()),
                            StructField('img', StringType())
                    ]))

@pandas_udf(anime_list_schema, PandasUDFType.GROUPED_AGG)  
def last_3_udf(anime_ids, anime_names, anime_imgs):
    last_animes = []
    for anime_id, anime_name, anime_img in zip(anime_ids[-3:], anime_names[-3:], anime_imgs[-3:]):
        last_animes.append({
            'id': anime_id,
            'name': anime_name,
            'img': anime_img
        })
    return last_animes

results = df_consumed_animes.groupby('user_id').agg(last_3_udf(df_consumed_animes.anime_id, df_consumed_animes.name)).collect()



In [17]:
for result in results:
    if result['user_id'] in agregados:
        agregados[result['user_id']]['watched animes'] = []
        for row in result['last_3_udf(anime_id, name)']:
            agregados[result['user_id']]['watched animes'].append({
                'id': row['id'],
                'name': row['name'],
                'img': row['img']
            })

In [18]:
agregados[1]

{'username': 'Xinil',
 'gender': 'Male',
 'days watched': 142.3,
 'completed': 233,
 'dropped': 93,
 'location': 'California',
 'recomendations': [{'id': 5114,
   'name': 'Fullmetal Alchemist: Brotherhood',
   'img': 'img-path'},
  {'id': 9253, 'name': 'Steins;Gate', 'img': 'img-path'},
  {'id': 2904,
   'name': 'Code Geass: Hangyaku no Lelouch R2',
   'img': 'img-path'}],
 'watched animes': [{'id': 3588, 'name': 'Soul Eater', 'img': 'img-path'},
  {'id': 41487,
   'name': 'Tensei shitara Slime Datta Ken 2nd Season Part 2',
   'img': 'img-path'},
  {'id': 41488,
   'name': 'Tensura Nikki: Tensei shitara Slime Datta Ken',
   'img': 'img-path'}]}

### Tranformando agregados de dicionário para DataFrame Spark

In [19]:
final_user_df = spark.createDataFrame([
    {
        'id': user_id,
        'documento': json.dumps(values)
    } 
    for user_id, values in agregados.items()
])

In [20]:
final_user_df.show()

+--------------------+---+
|           documento| id|
+--------------------+---+
|{"username": "Xin...|  1|
|{"username": "Aok...|  3|
|{"username": "Cry...|  4|
|{"username": "Arc...|  9|
|{"username": "Mad...| 18|
|{"username": "von...| 20|
|{"username": "Amu...| 23|
|{"username": "Bam...| 36|
|{"username": "bed...| 44|
|{"username": "kei...| 47|
|{"username": "Lad...| 53|
|{"username": "Hir...| 66|
|{"username": "Cru...| 70|
|{"username": "Smo...| 71|
|{"username": "Emp...| 77|
|{"username": "koa...| 80|
|{"username": "Ach...| 82|
|{"username": "jaa...| 83|
|{"username": "xic...| 90|
|{"username": "mar...| 91|
+--------------------+---+
only showing top 20 rows



In [21]:
final_user_df.dtypes

[('documento', 'string'), ('id', 'bigint')]

### Salvando os dados no Redis

In [22]:
final_user_df.write\
    .format("org.apache.spark.sql.redis")\
    .option("table", "users")\
    .option("key.column", "id")\
    .save()