# Analisando dados com PySpark

Neste notebook, utilizaremos o PySpark para exemplificar como:

- Extrair dados de um arquivo
- Transformar os dados para extrair as informações relevantes
- Analisar as estatísticas de interesse

Para isso, utilizaremos o [MovieLens](https://grouplens.org/datasets/movielens/), um banco de dados com avaliações de usuários para filmes cujos arquivos necessários para este notebook se encontram na pasta `./data/input`.

Com base no dataset, queremos responder a seguinte pergunta: 

> **Quantos filmes foram lançados em cada ano do dataset?**

In [1]:
import os
import pyspark
from pyspark.sql import SparkSession

In [2]:
DATA_FOLDER = './data/'

# Spark

## Inicializando uma SparkSession

A `SparkSession` encapsula uma série de funcionalidades e contextos para utilizarmos nas aplicações do Spark, como:

- SparkContext
- SQLContext
- HiveContext
- Streaming Application

Para criar uma `SparkSession`, utilizaremos os seguintes métodos:
1. **Builder()** para definir as configurações da sessão
2. **appName(**_nome_**)** para definir um nome da sessão
3. **getOrCreate()** para criar uma sessão com tais configurações (caso uma sessão com as mesmas configurações já exista, ela será retornada)

In [3]:
spark = SparkSession.Builder()\
    .appName('EscolaDNC.dados')\
    .getOrCreate()

## Extraindo os dados

Os dados de entrada se encontram na pasta `input`. Note que os dados estão em múltiplos arquivos csv. Para isso, utilizaremos os seguintes métodos do módulo `SparkSession.read`:
1. **format(**_formato_**)**: para definir o formato do arquivo 
2. **option(**_chave_, _valor_**)**: para definir possíveis configurações de leitura
3. **load(**_caminho_do_arquivo_**)**: para carregar o arquivo

In [4]:
df_movies = spark.read\
    .format("csv")\
    .option("header", "true")\
    .load(os.path.join(DATA_FOLDER, 'input', '*.csv'))

In [5]:
df_movies.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
| 129717|My Best Friend's ...|              Comedy|
| 207796|The Aeronauts (2019)|Action|Adventure|...|
| 142322|The Ark of the Su...|    Action|Adventure|
| 196447|  Wolf's Hole (1987)|Adventure|Drama|H...|
| 167090|A Year and Change...|        Comedy|Drama|
+-------+--------------------+--------------------+
only showing top 5 rows



## Transformando os dados

Agora que temos um dataframe em mãos, podemos fazer transformações em suas colunas. Em particular, utilizaremos os seguintes métodos:

1. **withColumnRenamed(**_nome_antigo_, _novo_nome_**)** para renomear a coluna `movieId` para `movie_id`
2. **withColumn(**_nova_coluna_, _funcao_**)** para aplicar uma função em uma coluna
3. **pyspark.sql.functions.regexp_extract(**_coluna_, _regexp_**)** para extrair o ano dofilme da coluna `title` (utilize a expressão regular `\((\d{4})\)$`)



In [6]:
import pyspark.sql.functions as f

In [7]:
df_movies = df_movies\
    .withColumnRenamed('movieId', 'movie_id')\
    .withColumn("year", f.regexp_extract("title", '\((\d{4})\)$', 1))

In [8]:
df_movies.show(5)

+--------+--------------------+--------------------+----+
|movie_id|               title|              genres|year|
+--------+--------------------+--------------------+----+
|  129717|My Best Friend's ...|              Comedy|1998|
|  207796|The Aeronauts (2019)|Action|Adventure|...|2019|
|  142322|The Ark of the Su...|    Action|Adventure|1984|
|  196447|  Wolf's Hole (1987)|Adventure|Drama|H...|1987|
|  167090|A Year and Change...|        Comedy|Drama|2015|
+--------+--------------------+--------------------+----+
only showing top 5 rows



## Analisando os dados

Uma das funcionalidades que facilita a análise de dados utilizando o Spark é o `SparkSQL`, em que podemos fazer consultas nos dados como se estivéssmos utilizando um banco de dados relacional.

A utilização do `SparkSQL` pode ser feita através de 2 métodos:

1. **DataFrame.createOrReplaceTempView(**_nome_**)** para criarmos uma view consultável a partir do dataframe
2. **SessionSpark.sql(**_query_**)** para realizar a consulta

In [9]:
df_movies.createOrReplaceTempView("MOVIES")

In [10]:
sql_string = """
    SELECT 
        year, 
        COUNT(DISTINCT movie_id) AS n_movies 
    FROM MOVIES
    GROUP BY year
    ORDER BY year DESC
"""
spark.sql(sql_string).show()

+----+--------+
|year|n_movies|
+----+--------+
|2019|     993|
|2018|    2032|
|2017|    2373|
|2016|    2488|
|2015|    2512|
|2014|    2406|
|2013|    2173|
|2012|    1978|
|2011|    1838|
|2010|    1691|
|2009|    1724|
|2008|    1632|
|2007|    1498|
|2006|    1446|
|2005|    1255|
|2004|    1171|
|2003|    1028|
|2002|    1023|
|2001|     971|
|2000|     929|
+----+--------+
only showing top 20 rows

