# Apache Spark

## Instalando o ambiente

O jeito mais simples de começar a trabalhar com Spark é instalar um container com tudo pronto! No site https://hub.docker.com/r/jupyter/pyspark-notebook vemos uma imagem Docker que já vem com `pyspark` e `jupyter lab`. Instale a imagem com o comando:

```bash
docker pull jupyter/pyspark-notebook
```

Vamos iniciar o ambiente de trabalho com o comando `docker run`. Para isso precisamos tomar alguns cuidados:

1) Temos que mapear nosso diretorio local de trabalho para um diretório interno do container, de modo que alterações feitas dentro do container (nesta pasta escolhida) sejam gravadas no nosso diretorio local. No container temos um usuário padrão com *username* `jovyan`. No *homedir* desse usuario temos uma pasta vazia `work`, que vai servir como local de mapeamento do nosso diretorio local de trabalho. Podemos então fazer esse mapeamendo com a opção `-v` do comando `docker run` da seguinte forma:

```bash
-v <diretorio>:/home/jovyan/work
```

onde `<diretorio>` representa seu diretorio local de trabalho.

2) Para acessar o `jupyter notebook` e o *dashboard* do Spark a partir do nosso *browser* favorito temos que abrir algumas portas do container com a opção `-p`. As portas são `8888` (para o próprio `jupyter notebook`) e `4040` (para o *dashboard* do Spark). Ou seja, adicionaremos às opções do `docker run`o seguinte:

```bash
-p 8888:8888 -p 4040:4040
```

Desta forma, ao acessar `localhost:8888` na nossa máquina, estaremos acessando o servidor Jupyter na porta 8888 interna do container.

3) Vamos iniciar o container no modo interativo, e vamos especificar que o container deve ser encerrado ao fechar o servidor Jupyter. Faremos isso com as opções `-it` e `-rm`

Antes de executar, garanta que as portas 4040 e 8888 estão livres (sem jupyter já executando) ou altere o comando. Ainda, esteja na pasta da aula ao executar, assim apenas ela será exposta ao container.

Portanto, o comando completo que eu uso na minha máquina Linux para iniciar o container é:

```bash
docker run \
    -it \
    --rm \
    -p 8888:8888 \
    -p 4040:4040 \
    -v "`pwd`":/home/jovyan/work \
    jupyter/pyspark-notebook


```


Se estiver no Windows estes comandos, utilize:

- No Powershell: `docker run -it --rm -p 8888:8888 -p 4040:4040 -v ${PWD}:/home/jovyan/work jupyter/pyspark-notebook`

- No Prompt de comando: `docker run -it --rm -p 8888:8888 -p 4040:4040 -v %cd%:/home/jovyan/work jupyter/pyspark-notebook`


Para facilitar a vida eu coloco esse comando em um arquivo `inicia.sh`. Engenheiros, façam do jeito que preferirem!

Agora abra esse notebook lá no container!


## Iniciando o Spark

Vamos iniciar o ambiente Spark. Para isso vamos:

1) Criar um objeto de configuração do ambiente Spark. Nossa configuração será simples: vamos especificar que o nome da nossa aplicação Spark é "Minha aplicação", e que o *master node* é a máquina local, usando todos os *cores* disponíveis. Aplicações reais de Spark são configuradas de modo ligeiramente diferente: ao especificar o *master node* passamos uma URL real, com o endereço do nó gerente do *cluster* Spark.

2) Vamos criar um objeto do tipo `SparkContext` com essa configuração

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MinhaAplicacao").getOrCreate()
sc = spark.sparkContext

A partir deste momento você pode monitorar seus *jobs* Spark em http://localhost:4040

## Spark: SQL

Nas últimas aulas de Spark, aprendemos sobre RDD (Resilient Distributed Datasets), que é a estrutura de dados fundamental do Spark. Com o RDD, podemos executar operações paralelas e distribuídas em grandes conjuntos de dados em um cluster.

<img src="cluster-overview.png">

Fonte: https://spark.apache.org/docs/latest/img/cluster-overview.png

Aprendemos que os RDDs são imutáveis e podem ser criados a partir de dados armazenados em arquivos ou gerados por transformações de outros RDDs. Além disso, vimos como aplicar diferentes tipos de transformações, como `map`, `filter` e `reduce` para manipular nossos dados.

Agora, vamos aprender sobre a interface de DataFrames do PySpark. Os DataFrames são uma abstração de alto nível construída em cima dos RDDs, que permitem a manipulação de dados estruturados de forma mais eficiente. Eles fornecem uma API mais fácil e intuitiva para trabalhar com dados tabulares, além de permitir a execução de consultas SQL diretamente nos dados.

Sim, você não leu errado, conseguiremos utilizar SQL! Em nossas aulas, discutimos como geralmente servidores de banco de dados são otimizados para IO (leitura e escrita de dados) e não para processamento. Com o uso do Spark e a interface SQL, conseguiremos aplicar nossos conhecimentos em um ambiente otimizado para processamento de dados em larga escala enquanto utilizamos uma interface mais amigável, permitindo que as tarefas de análise sejam realizadas de forma mais rápida e eficiente.

Com os DataFrames, podemos executar tarefas como filtragem, agregação, junção e ordenação dos dados com muito mais facilidade. Além disso, eles oferecem suporte a diversos formatos de arquivos, incluindo CSV, Parquet, JSON e avro.

**Obs**: apesar de serem DataFrames, não são Pandas DataFrames! Apesar de podermos transformar estes DataFrames em Pandas, esta é uma operação que deve ser evitada ao máximo, pois assim perdemos a característica distribuída do Spark!

## Base de Dados

Iremos utilizar a base de dados **SF Bay Area Bike Share** do [Kaggle](https://www.kaggle.com/datasets/benhamner/sf-bay-area-bike-share?resource=download). Para fazer o download, acesse https://www.kaggle.com/datasets/benhamner/sf-bay-area-bike-share?resource=download

Provavelmente o download pelo Kaggle irá demorar. Enquanto ele não finalize, utilize como alternativa o zip disponível no Blackboard. Ele possui os mesmos CSVs, exceto um gigantesco (`status.csv`)!

## Primeiro exemplo

Vamos abrir o arquivo `station.csv`. Deixe ele em uma pasta `data` dentro da pasta da aula.

In [2]:
df_station = spark.read.csv("data/station.csv", header=True, inferSchema=True)

Vamos exibir algumas linhas

In [3]:
df_station.show(5)

+---+--------------------+------------------+-------------------+----------+--------+-----------------+
| id|                name|               lat|               long|dock_count|    city|installation_date|
+---+--------------------+------------------+-------------------+----------+--------+-----------------+
|  2|San Jose Diridon ...|         37.329732|-121.90178200000001|        27|San Jose|         8/6/2013|
|  3|San Jose Civic Ce...|         37.330698|        -121.888979|        15|San Jose|         8/5/2013|
|  4|Santa Clara at Al...|         37.333988|        -121.894902|        11|San Jose|         8/6/2013|
|  5|    Adobe on Almaden|         37.331415|          -121.8932|        19|San Jose|         8/5/2013|
|  6|    San Pedro Square|37.336721000000004|        -121.894074|        15|San Jose|         8/7/2013|
+---+--------------------+------------------+-------------------+----------+--------+-----------------+
only showing top 5 rows



Uma maneira alternativa de exibir de forma mais bonita! **Cuidado**: transformações para Pandas em geral devem ser evitadas!

In [None]:
df_station.limit(3).toPandas()

Para ver o *schema* do DataFrame, utilize:

In [None]:
df_station.printSchema()

Podemos utilizar `count` e `columns` para descobrir o *shape* do DataFrame:

In [None]:
df_station.count()

In [None]:
df_station.columns

In [None]:
print('{} linhas e {} colunas'.format(df_station.count(), len(df_station.columns)))

Vamos encontrar todas as cidades diferentes nas quais temos estações em nossa base de dados

In [None]:
from pyspark.sql.functions import col

distinct_cities = df_station.select(col("city")).distinct()

distinct_cities.show()

Acesse a documentação do `select` em 
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.select.html
e do `col` em https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.col.html

## Criar Databases

Vamos tentar trabalhar de forma mais semelhante ao praticado no MySQL!

Será que conseguimos criar nossas próprias databases e tabelas?!

Sim! Para criar uma database `bike`, podemos fazer:

In [None]:
spark.sql("CREATE DATABASE IF NOT EXISTS bike")

## Criar Tabelas

Em um ambiente de processamento em larga escala como o Spark, as tabelas podem surgir tanto em um processo de ingestão, quando arquivos externos a base são ingeridos para disponibilizar dados aos cientistas ou analistas de dados, quanto quando tabelas são reprocessadas para gerar visões necessárias para satisfazer algum projeto (Ex: um projeto Machine Learning para predição de custos).

Para exportar um DataFrame como tabela, podemos fazer

In [None]:
df_station.write.mode('overwrite').saveAsTable("bike.station")

Para criar uma view, utilize

In [None]:
df_station.createOrReplaceTempView("view_station")

Vá até seu navegador de arquivos (Windows Explorer, Nautilius). Verifique que foi criada a pasta `spark-warehouse`. Acesse esta pasta e confira seu conteúdo!

Perceba que nossas tabelas foram salvas em arquivos `.parquet`. O Apache Parquet é um formato de arquivo open-source projetado para armazenar dados em colunas. Ele é otimizado para consulta e processamento eficiente de grandes quantidades de dados estruturados e semi-estruturados, especialmente em ambientes de big data. Ao contrário de outros formatos de arquivo que armazenam dados em linhas, o Parquet armazena dados em colunas, o que permite uma compressão mais eficiente e um desempenho de consulta mais rápido.

<img src="parquet.gif">
Fonte: https://parquet.apache.org/images/FileLayout.gif

O Parquet foi projetado para trabalhar bem com frameworks distribuídos como Hadoop, Spark e Hive, permitindo que os usuários consultem e processem dados em escala. Ele também suporta tipos de dados complexos, como arrays e estruturas aninhadas, o que o torna flexível o suficiente para lidar com uma ampla variedade de casos de uso. Em resumo, o Apache Parquet é uma ferramenta útil para gerenciar e processar grandes volumes de dados de forma eficiente e escalável.

Veja mais em https://parquet.apache.org/

Também podemos especificar partições na criação das tabelas. Considere um caso em que a tabela de `station` será dividida pelas cidades

In [None]:
df_station.write.partitionBy("city").mode("overwrite").saveAsTable("bike.station")

Volte ao navegador de arquivos e perceba as alterações. Deve ter sido criada uma pasta por cidade.

## SQL Query

Pronto, agora podemos utilizar queries como fizemos no MySQL!

In [None]:
minha_query = """
SELECT *
  FROM bike.station
 LIMIT 2
 """

df_exemplo = spark.sql(minha_query)
df_exemplo.show(5)

Utilizando a view...

In [None]:
minha_query = """
SELECT s.name,
       s.city,
       s.dock_count
  FROM view_station s
 LIMIT 3
 """

df_exemplo = spark.sql(minha_query)
df_exemplo.show(5)

**Exercício 1**: Crie na base `bike` uma tabela `weather` a partir do arquivo `weather.csv`

In [None]:
# Seu código AQUI!

**Exercício 2**: Crie na base `bike` uma tabela `trip` a partir do arquivo `trip.csv`

In [None]:
# Seu código AQUI!

**Exercício 3**: Crie na base `bike` uma tabela `status` a partir do arquivo `status.csv`

In [None]:
# Seu código AQUI!

**Exercício 4**: Conte a quantidade de linhas em cada tabela.

In [None]:
# Seu código AQUI!

**Exercício 5**: Conte a quantidade de corridas (`trip`) com cada estação como **origem**.

In [None]:
# Seu código AQUI!

**Exercício 6**: Conte a quantidade de corridas (`trip`) com cada estação como **destino**.

In [None]:
# Seu código AQUI!

### Joins

Podemos utilizar **join** e os demais recursos (funções de agregação, agrupamentos...) que utilizávamos no MySQL. Veja um exemplo onde retornaremos algumas informações da estação fazendo um **INNER JOIN** e retornando as informações de `status` da estação.

In [None]:
minha_query = """
SELECT s.name,
       s.city,
       s.dock_count,
       t.*
  FROM bike.station s,
       bike.status t
WHERE t.station_id = s.id
 LIMIT 3
 """

df_info_st = spark.sql(minha_query)
df_info_st.show(8)

Você irá perceber uma certa demora para retorno dos resultados. Estamos em um ambiente que propicia processamento em larga escala de forma distribuída. Conseguimos recuperar, agrupar, resumir e até treinar modelos de Machine Learning, mas isto virá com um custo!

**Exercício 7**: Crie um DataFrame a partir de uma **Query SQL** que retorne o `id`, `name`, `city` além da quantidade média de bicicletas disponíveis nos `status` de cada estação.

Dicas: junção e agrupamento!

In [None]:
# Seu código AQUI!

### Resolvendo com API DataFrame

Vamos ver como resolver o Exercício 4 utilizando a API de DataFrames.

Inicialmente, iremos ler a tabela de status. Já temos um DataFrame para esta tabela, entretanto, iremos criar outro para fins ditáticos! Perceba que desta vez iremos fazer o processo inverso e ler a partir da tabela! 

In [None]:
df_status1 = spark.read.table("bike.status")
df_status1.show(2)

Então, lemos a tabela de estações em um DataFrame

In [None]:
df_station1 = spark.read.table("bike.station")
df_station1.show(2)

Para fazer o join, utilizamos:

In [None]:
df_join = df_station1.join(df_status1, col("id") == col("station_id"), "inner")
df_join.show(2)

E utilizamos `avg` como função de agregação para obter os resultados desejados:

In [None]:
from pyspark.sql.functions import avg

df_join.groupBy("id", "name", "city").agg(avg("bikes_available")).show(10)

Perceba que os resultados serão os mesmos da opção resolvida com SQL.

Em geral, a escolha pela interface de SQL ou DataFrame em um ambiente de trabalho com Spark levará em consideração questões como padronização e a familiaridade dos desenvolvedores com uma interface ou outra. Seja qual a escolha, o tempo de processamento também deverá ser muito parecido, uma vez que o Spark realiza uma "tradução" no momento de execução.