## Tratamento de dados - Spark

**Objetivo:** Realizar o pré-processamento do dataset Movielens utilizando a linguagem de programação pyspark.

**Etapas a serem desenvolvidas:**

**1.** Utilizar o Dataset recomendação.csv disponibilizado no canvas: <br/>
    Schema: user_id int, item_id int, rating int, item string

**2. ==CANCELADA==** Utilizar a ferramenta pydeequ (Links para um site externo.) para verificar se todos os dados se enquadram no schema definido acima. <br/>
    Imprimir o Data Quality Report gerado pela ferramenta. <br/>
    OBS: Para executar a ferramenta <br/>
    Fazer download do deequ-1.0.5.jar e salvar na pasta que será executado o pyspark <br/>
    Links para um site externo. <br/>
    Instalar o pydeequ via pip ou pelo git. <br/>
    Rodar o comando pyspark --driver-class-path deequ-1.0.5.jar **==CANCELADA==** <br/>

**3.** Remover do Dataset os dados incorretos.

**4.** Responder as perguntas: <br/>
**4.1.** Qual o filme mais assistido? <br/>
**4.2.** Qual o usuário que mais pontuou os filmes? <br/>
**4.3.** Quantos usuários deram nota(rating) = 5? <br/>
**4.4.** Agrupe o Dataset por user e some todas as suas notas. Qual o usuário possui a maior soma dos rating?

**5.** Gerar Dataset tratado para o algoritmo de recomendação no formato csv com o schema user_id, item_id, rating.


In [1]:
##Bibliotecas
from pyspark.sql.types import StructType, IntegerType, DateType, StringType,StructField, FloatType
from pyspark.sql.functions import col


#### 1) Utilizar o Dataset recomendação.csv disponibilizado no canvas: <br/> Schema: user_id int, item_id int, rating int, item string

In [2]:
schema = StructType([StructField('user_id', IntegerType()),
                     StructField('item_id', IntegerType()),
                     StructField('rating', IntegerType()),
                     StructField('item', StringType()),
                    ])

In [3]:
#Importando Dataset
data = spark.read \
        .schema(schema)\
        .format("csv")\
        .option("header",True)\
        .load("recomendacao.csv")

In [4]:
type(data)

pyspark.sql.dataframe.DataFrame

In [5]:
#Exibindo dataset e summary
data.show(5)
data.summary().show()

+-------+-------+------+--------------------+
|user_id|item_id|rating|                item|
+-------+-------+------+--------------------+
|    186|    302|     3|L.A. Confidential...|
|     22|    377|     1| Heavyweights (1994)|
|    244|     51|     2|Legends of the Fa...|
|    166|    346|     1| Jackie Brown (1997)|
|    298|    474|     4|Dr. Strangelove o...|
+-------+-------+------+--------------------+
only showing top 5 rows

+-------+------------------+-----------------+------------------+--------------------+
|summary|           user_id|          item_id|            rating|                item|
+-------+------------------+-----------------+------------------+--------------------+
|  count|             99999|            99999|             99999|               99547|
|   mean| 462.4874148741487|425.5319653196532|3.5298652986529864|                null|
| stddev|266.61442141060337|330.7995012117287| 1.125677980540639|                null|
|    min|                 1|           

In [6]:
data.describe()

DataFrame[summary: string, user_id: string, item_id: string, rating: string, item: string]

#### 3) Remover do Dataset os dados incorretos. <br/> Não foi possível utilizar a erramenta pydeequ. <br/> Removendo os dados nulos de cada coluna.  

In [7]:
data=data.where(col("user_id").isNotNull())

In [8]:
data=data.where(col("item_id").isNotNull())

In [9]:
data=data.where(col("rating").isNotNull())

In [10]:
data=data.where(col("item").isNotNull())

#### Dados após limpeza

In [11]:
data.show(5)
data.summary().show()

+-------+-------+------+--------------------+
|user_id|item_id|rating|                item|
+-------+-------+------+--------------------+
|    186|    302|     3|L.A. Confidential...|
|     22|    377|     1| Heavyweights (1994)|
|    244|     51|     2|Legends of the Fa...|
|    166|    346|     1| Jackie Brown (1997)|
|    298|    474|     4|Dr. Strangelove o...|
+-------+-------+------+--------------------+
only showing top 5 rows

+-------+-----------------+------------------+------------------+--------------------+
|summary|          user_id|           item_id|            rating|                item|
+-------+-----------------+------------------+------------------+--------------------+
|  count|            99547|             99547|             99547|               99547|
|   mean|462.4214692557284| 427.4595819060343|3.5282831225451297|                null|
| stddev| 266.581647089376|330.30761865180295|1.1262548341580603|                null|
|    min|                1|            

In [12]:
##Criação de view para usar o sql.
data.createOrReplaceTempView('viewdata')

In [13]:
#Mostrando 05 primeiras linhas da view criada.
spark.sql('select * from viewdata').show(5)

+-------+-------+------+--------------------+
|user_id|item_id|rating|                item|
+-------+-------+------+--------------------+
|    186|    302|     3|L.A. Confidential...|
|     22|    377|     1| Heavyweights (1994)|
|    244|     51|     2|Legends of the Fa...|
|    166|    346|     1| Jackie Brown (1997)|
|    298|    474|     4|Dr. Strangelove o...|
+-------+-------+------+--------------------+
only showing top 5 rows



#### 4) Responder as perguntas

#### 4.1) Qual o filme mais assistido?

In [14]:
#Verificação da quantidade de vezes que cada filme foi assistido em ordem decrescente. Exibindo os 10 mais.

spark.sql('select item_id , item , count(item) as qtd_assistido \
          from viewdata \
          group by item_id, item \
          order by qtd_assistido desc').show(10)


+-------+--------------------+-------------+
|item_id|                item|qtd_assistido|
+-------+--------------------+-------------+
|     50|    Star Wars (1977)|          583|
|    258|      Contact (1997)|          509|
|    100|        Fargo (1996)|          508|
|    181|Return of the Jed...|          507|
|    294|    Liar Liar (1997)|          485|
|    286|English Patient, ...|          481|
|    288|       Scream (1996)|          478|
|    300|Air Force One (1997)|          431|
|    121|Independence Day ...|          429|
|    174|Raiders of the Lo...|          420|
+-------+--------------------+-------------+
only showing top 10 rows



In [15]:
#Retorno do Filme mais assistido
spark.sql('select * \
            from (select item as Filme_Mais_Assistido, item_id as ID, count(item_id) as qtd_assistido \
                  from viewdata \
                  group by ID, Filme_Mais_Assistido) viewdata \
            where qtd_assistido== \
                (select max(contador) \
                 from (select count(item_id) as contador\
                       from viewdata \
                       group by item_id ) viewdata) ').show(truncate=False)

+--------------------+---+-------------+
|Filme_Mais_Assistido|ID |qtd_assistido|
+--------------------+---+-------------+
|Star Wars (1977)    |50 |583          |
+--------------------+---+-------------+



#### 4.2)Qual o usuário que mais pontuou os filmes?

In [16]:
#Verificação da quantidade de votos dos usuários em ordem decrescente. Exibindo os 10 que mais pontuaram os filmes

spark.sql('select user_id as ID_Usuario , count(user_id) as qtd_votos \
          from viewdata \
          group by ID_Usuario \
          order by qtd_votos desc').show(10)

+----------+---------+
|ID_Usuario|qtd_votos|
+----------+---------+
|       405|      737|
|       655|      684|
|        13|      635|
|       450|      539|
|       276|      517|
|       416|      492|
|       537|      489|
|       303|      483|
|       234|      479|
|       393|      447|
+----------+---------+
only showing top 10 rows



In [17]:
#Retorno do usuário que mais pontuou
spark.sql('select * \
            from (select user_id as ID_Usuario_Mais_Pontuou, count(user_id) as Total_Voto \
                  from viewdata \
                  group by ID_Usuario_Mais_Pontuou) viewdata \
            where Total_Voto== \
                (select max(contador) \
                 from (select count(user_id) as contador\
                       from viewdata \
                       group by user_id ) viewdata) ').show(truncate=False)

+-----------------------+----------+
|ID_Usuario_Mais_Pontuou|Total_Voto|
+-----------------------+----------+
|405                    |737       |
+-----------------------+----------+



#### 4.3) Quantos usuários deram nota(rating) = 5? 

In [18]:
#Retornar os 10 primeiros usuários que deram nota 5 em ordem decrescente
spark.sql('select distinct (user_id) as ID_Usuario, rating as Nota \
           from viewdata \
           where rating == "5" \
           order by ID_Usuario').show(10, truncate=False)

+----------+----+
|ID_Usuario|Nota|
+----------+----+
|1         |5   |
|2         |5   |
|3         |5   |
|4         |5   |
|5         |5   |
|6         |5   |
|7         |5   |
|8         |5   |
|9         |5   |
|10        |5   |
+----------+----+
only showing top 10 rows



In [19]:
#Retorno de quantidade de usuários que deram nota 5
spark.sql('select rating as Nota, count(*) as Qtde_Usuarios_que_deram_essa_nota \
            from (select distinct (user_id), rating \
                   from viewdata \
                   where rating == "5") \
            group by rating').show(truncate=False)



+----+---------------------------------+
|Nota|Qtde_Usuarios_que_deram_essa_nota|
+----+---------------------------------+
|5   |927                              |
+----+---------------------------------+



#### 4.4) Agrupe o Dataset por user e some todas as suas notas. 

In [20]:
#conferindo notas atribuída a filmes por usuário

spark.sql('select user_id as ID_Usuario, rating as Notas_atribuidas, item as Filme \
           from viewdata \
           where user_id == "19" ').show(truncate=False)

+----------+----------------+--------------------------------------------------------+
|ID_Usuario|Notas_atribuidas|Filme                                                   |
+----------+----------------+--------------------------------------------------------+
|19        |4               |Get Shorty (1995)                                       |
|19        |5               |Butch Cassidy and the Sundance Kid (1969)               |
|19        |3               |Stand by Me (1986)                                      |
|19        |4               |Fish Called Wanda, A (1988)                             |
|19        |2               |Titanic (1997)                                          |
|19        |4               |Rainmaker, The (1997)                                   |
|19        |3               |American President, The (1995)                          |
|19        |3               |Adventures of Priscilla, Queen of the Desert, The (1994)|
|19        |3               |Evil Dead II (

In [21]:
#Retorno da soma das notas dos 20 primeiros usuários

spark.sql('select user_id as ID_Usuario , sum(rating) as Soma_Notas \
          from viewdata \
          group by ID_Usuario \
          order by ID_Usuario asc').show(20)

+----------+----------+
|ID_Usuario|Soma_Notas|
+----------+----------+
|         1|       977|
|         2|       226|
|         3|       151|
|         4|       104|
|         5|       499|
|         6|       763|
|         7|      1598|
|         8|       224|
|         9|        94|
|        10|       770|
|        11|       627|
|        12|       224|
|        13|      1967|
|        14|       401|
|        15|       298|
|        16|       601|
|        17|        81|
|        18|      1070|
|        19|        71|
|        20|       146|
+----------+----------+
only showing top 20 rows



#### 4.5) Qual o usuário possui a maior soma dos rating?

In [22]:
#Validando as 3 maiores somas das notas em ordem decrescente
spark.sql('select user_id as ID_Usuario , sum(rating) as Nota_maxima \
            from viewdata \
            group by ID_Usuario order by Nota_maxima desc').show(3,truncate=False)

+----------+-----------+
|ID_Usuario|Nota_maxima|
+----------+-----------+
|450       |2083       |
|655       |1990       |
|13        |1967       |
+----------+-----------+
only showing top 3 rows



In [23]:
#Retorno do usuário com maior soma de nota
user_max = data.groupBy('user_id').sum('rating').sort('sum(rating)', ascending=False)
user_max.limit(1).toPandas().set_index('user_id').rename(columns={'sum(rating)':'Total rating'})

Unnamed: 0_level_0,Total rating
user_id,Unnamed: 1_level_1
450,2083


#### 5) Gerar Dataset tratado para o algoritmo de recomendação no formato csv com o schema user_id, item_id, rating.

In [None]:
# Existe varios meios de exportar, neste trabalho foi selecionado um deles.
data[['user_id', 'item_id', 'rating']].toPandas().to_csv('dataset.csv')

**Conferindo dataset gerado**

In [None]:
df = spark.read \
        .schema(schema)\
        .format("csv")\
        .option("header",True)\
        .load("dataset.csv")

In [None]:
type(df)
df.describe()

In [None]:
df.describe().show()