<a href="https://colab.research.google.com/github/gabrielgsv/ClinicaBack/blob/master/%5BMBA_ES_27%5D_Data_Engineering.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark



In [None]:
# Download dos datasets e arquivos de exemplos
!wget https://datasets-aulas.s3.amazonaws.com/cars.json
!wget https://datasets-aulas.s3.amazonaws.com/cidades.json
!wget https://datasets-aulas.s3.amazonaws.com/dados+cota%C3%A7%C3%B5es/BBDC4_10y.csv
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data
!wget https://datasets-aulas.s3.amazonaws.com/Players.csv
!wget https://datasets-aulas.s3.amazonaws.com/czech-financial-dataset-real-anonymized-transactions.zip
!wget https://raw.githubusercontent.com/spark-examples/pyspark-examples/master/resources/zipcodes.json
!unzip czech-financial-dataset-real-anonymized-transactions.zip

--2024-10-28 23:18:28--  https://datasets-aulas.s3.amazonaws.com/cars.json
Resolving datasets-aulas.s3.amazonaws.com (datasets-aulas.s3.amazonaws.com)... 52.217.234.241, 16.15.193.237, 52.216.130.115, ...
Connecting to datasets-aulas.s3.amazonaws.com (datasets-aulas.s3.amazonaws.com)|52.217.234.241|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1580 (1.5K) [application/json]
Saving to: ‘cars.json’


2024-10-28 23:18:29 (42.8 MB/s) - ‘cars.json’ saved [1580/1580]

--2024-10-28 23:18:29--  https://datasets-aulas.s3.amazonaws.com/cidades.json
Resolving datasets-aulas.s3.amazonaws.com (datasets-aulas.s3.amazonaws.com)... 52.217.234.241, 16.15.193.237, 52.216.130.115, ...
Connecting to datasets-aulas.s3.amazonaws.com (datasets-aulas.s3.amazonaws.com)|52.217.234.241|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 6925 (6.8K) [application/json]
Saving to: ‘cidades.json’


2024-10-28 23:18:29 (156 MB/s) - ‘cidades.json’ saved [6925/6925]

--

\
# 👨‍🔬 Testando a instalação
Agora, podemos importar **```SparkSession```** de **```pyspark.sql```** e criar uma **```SparkSession```**. **```SparkSession```** foi introduzido na versão **Spark 2.0** e é um ponto de entrada para a funcionalidade Spark subjacente para criar programaticamente **RDDs**, **DataFrames** e **DataSets**.

Você pode dar um nome à sessão usando **```appName()```** e adicionando algumas configurações com **```config()```** se desejar.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[2]")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

spark

# SparkSession
Desde o Spark 2.0, o **```SparkSession```** se tornou um ponto de entrada para o PySpark trabalhar com **```RDD```** e **```DataFrame```**. Antes da versão 2.0, o **```SparkContext```** costumava ser um ponto de entrada. O **```SparkSession```** é uma classe combinada para todos os diferentes contextos que tínhamos antes do lançamento 2.0 ( **```SparkContext```**, **```StreamingContext```**, **```SQLContext```**, **```HiveContext```**, etc).

Para criar **```SparkSession```** programaticamente no PySpark, você precisa usar o método padrão do construtor **```builder()```**. O método **```getOrCreate()```** retorna um **```SparkSession```** já existente.; se não existir, cria uma nova **```SparkSession```**.

**```master()```** – Se você estiver executando no cluster, você precisará usar seu nome mestre como argumento para **```master()```**. Normalmente, será **```yarn```** ou **```mesos```**, dependendo da configuração do seu cluster. Use **```local[x]```** ao executar no modo **Standalone** onde **```x```** deve ser um valor inteiro e deve ser maior que **```0```**. Isso representa quantas partições ele deve criar ao usar um **```RDD```**, **```DataFrame```** ou **```Dataset```**. Idealmente, o valor **```x```** deve ser o número de núcleos de CPU que você possui.

**```appName()```** – Usado para definir o nome do aplicativo.

**```getOrCreate()```** – Retorna um objeto **```SparkSession```** se já existir e cria um novo se não existir.

\
## Usando **```SparkConfig```**
Se você quiser definir algumas configurações para **```SparkSession```**, use o método **```config()```**.

```python
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("Exemplo") \
      .config("spark.some.config.option", "config-value") \
      .getOrCreate()
```

# Primeiro exemplo: Total de países distintos presentes no dataset
🇨🇺 🇮🇳 🇲🇽 🇿🇦 🇵🇷 🇭🇳 🏴󠁧󠁢󠁥󠁮󠁧󠁿 🇨🇦 🇩🇪 🇮🇷 🇮🇹 🇵🇱 🇨🇴 🇰🇭 🇹🇭 🇱🇦 🇭🇹 🇬🇹 🇵🇪 🏴󠁧󠁢󠁳󠁣󠁴󠁿 🇹🇹 🇬🇷 🇳🇮 🇳🇱 🇺🇸 🇯🇲 🇵🇭 🇪🇨 🇹🇼 🇵🇹 🇩🇴 🇸🇻 🇫🇷 🇨🇳 🇯🇵 🇭🇷🇸🇮🇲🇰🇧🇦🇲🇪 🇬🇺 🇻🇳 🇭🇰 🇮🇪 🇭🇺

In [None]:
from pyspark import SparkConf, SparkContext

# Configuração do Spark
spark = SparkSession.builder\
        .master("local[2]")\
        .appName("Total de países distintos presentes no dataset")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# Carrega o conjunto de dados a partir do arquivo CSV
adultDatasetRDD = spark.sparkContext.textFile("adult.data")

# Divide cada linha do conjunto de dados por vírgula, removendo linhas vazias
splitAdultDatasetRDD = adultDatasetRDD.filter(lambda linha: linha != '').map(lambda linha: linha.split(","))

# Seleciona a coluna correspondente ao país (índice 13) e obtém valores distintos
paises = splitAdultDatasetRDD.map(lambda linha: linha[13]).distinct()

# Conta a quantidade de países distintos
count_paises = paises.count()

# Coleta os países distintos como uma lista
paises_list = paises.collect()

# Imprime o resultado
print(f"Quantidade de países distintos: {count_paises}")
print("Países distintos:", paises_list)

Quantidade de países distintos: 42
Países distintos: [' Cuba', ' India', ' Mexico', ' South', ' Puerto-Rico', ' Honduras', ' England', ' Canada', ' Germany', ' Iran', ' Italy', ' Poland', ' Columbia', ' Cambodia', ' Thailand', ' Laos', ' Haiti', ' Guatemala', ' Peru', ' Scotland', ' Trinadad&Tobago', ' Greece', ' Nicaragua', ' Holand-Netherlands', ' United-States', ' Jamaica', ' ?', ' Philippines', ' Ecuador', ' Taiwan', ' Portugal', ' Dominican-Republic', ' El-Salvador', ' France', ' China', ' Japan', ' Yugoslavia', ' Outlying-US(Guam-USVI-etc)', ' Vietnam', ' Hong', ' Ireland', ' Hungary']


\
# Dataframes
O Apache Spark **DataFrame** é uma estrutura de dados tabular distribuída, oferecendo uma interface de alto nível para manipulação eficiente de grandes conjuntos de dados. Inspirado pela simplicidade das tabelas em bancos de dados relacionais, o DataFrame do Spark simplifica operações complexas, permitindo análises poderosas em ambientes distribuídos.

Sua abstração intuitiva facilita a execução de transformações e consultas SQL, proporcionando flexibilidade e desempenho otimizado para processamento de dados em escala. Com suporte a diversas fontes de dados e integração perfeita com o ecossistema Spark, o DataFrame se tornou uma ferramenta fundamental para cientistas de dados e engenheiros de dados na construção de pipelines de análise robustos e eficientes.

Definição de um [DataFrame](https://www.databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html) pela Databricks:
> DataFrame é uma coleção distribuída de dados organizados em colunas nomeadas. É conceitualmente equivalente a uma tabela em um banco de dados relacional ou a um quadro de dados em R/Python, mas com otimizações mais ricas nos bastidores. DataFrames podem ser construídos a partir de uma ampla variedade de fontes, como arquivos de dados estruturados, tabelas no Hive, bancos de dados externos ou RDDs existentes. – Databricks

\
## Criando um DataFrame
A maneira mais simples de criar um DataFrame é a partir de uma lista de dados Python. O DataFrame também pode ser criado a partir de um RDD e lendo arquivos de diversas fontes.

In [None]:
dados = [('João','','Silva','1991-04-01','M',3000),
  ('Miguel','Ribeiro','','2000-05-19','M',4000),
  ('Roberto','','Ferreira','1978-09-05','M',4000),
  ('Maria','Ana','Oliveira','1967-12-01','F',4000),
  ('Jéssica','Mariana','Brito','1980-02-17','F',-1)
]

columns = ["primeiro_nome","nome_meio","sobrenome","data_nascimento","genero","salario"]
df = spark.createDataFrame(data=dados, schema = columns)

In [None]:
df.show()

+-------------+---------+---------+---------------+------+-------+
|primeiro_nome|nome_meio|sobrenome|data_nascimento|genero|salario|
+-------------+---------+---------+---------------+------+-------+
|         João|         |    Silva|     1991-04-01|     M|   3000|
|       Miguel|  Ribeiro|         |     2000-05-19|     M|   4000|
|      Roberto|         | Ferreira|     1978-09-05|     M|   4000|
|        Maria|      Ana| Oliveira|     1967-12-01|     F|   4000|
|      Jéssica|  Mariana|    Brito|     1980-02-17|     F|     -1|
+-------------+---------+---------+---------------+------+-------+



\
## Lendo um arquivo CSV no DataFrame
PySpark fornece **```csv("path")```** no DataFrameReader para ler um arquivo CSV no PySpark DataFrame e **```dataframeObj.write.csv("path")```** para salvar ou gravar no arquivo CSV.

Este exemplo lê os dados nas colunas DataFrame **```_c0```** para a primeira coluna e **```_c1```** para a segunda e assim por diante. e por padrão o tipo de dados para todas essas colunas é tratado como String.

In [None]:
spark = SparkSession.builder\
        .master("local[2]")\
        .appName("Lendo um arquivo CSV no DataFrame")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

df = spark.read.csv("BBDC4_10y.csv")
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)



In [None]:
df.show()

+----------+------------------+------------------+------------------+------------------+------------------+--------+---------+------------+
|       _c0|               _c1|               _c2|               _c3|               _c4|               _c5|     _c6|      _c7|         _c8|
+----------+------------------+------------------+------------------+------------------+------------------+--------+---------+------------+
|      Date|              Open|              High|               Low|             Close|         Adj Close|  Volume|Dividends|Stock Splits|
|2013-11-21|11.936270713806152|12.002909660339355|11.850031852722168|11.955870628356934| 7.617593288421631|15586647|      0.0|         0.0|
|2013-11-22|11.838272094726562|12.049949645996094|11.556035041809082|11.877471923828125| 7.567643165588379|30884260|      0.0|         0.0|
|2013-11-25|11.916670799255371|11.951951026916504|11.591315269470215| 11.77163314819336| 7.500205039978027|17439983|      0.0|         0.0|
|2013-11-26| 11.6932

In [None]:
df = spark.read.option('header', True).csv("BBDC4_10y.csv")
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Adj Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Dividends: string (nullable = true)
 |-- Stock Splits: string (nullable = true)



In [None]:
df.show()

+----------+------------------+------------------+------------------+------------------+------------------+--------+---------+------------+
|      Date|              Open|              High|               Low|             Close|         Adj Close|  Volume|Dividends|Stock Splits|
+----------+------------------+------------------+------------------+------------------+------------------+--------+---------+------------+
|2013-11-21|11.936270713806152|12.002909660339355|11.850031852722168|11.955870628356934| 7.617593288421631|15586647|      0.0|         0.0|
|2013-11-22|11.838272094726562|12.049949645996094|11.556035041809082|11.877471923828125| 7.567643165588379|30884260|      0.0|         0.0|
|2013-11-25|11.916670799255371|11.951951026916504|11.591315269470215| 11.77163314819336| 7.500205039978027|17439983|      0.0|         0.0|
|2013-11-26| 11.69323444366455|12.104828834533691| 11.63443374633789|11.720672607421875| 7.467736721038818|26836513|      0.0|         0.0|
|2013-11-27|11.75203

In [None]:
df = spark.read.option('header', True).option('inferSchema', True).csv("BBDC4_10y.csv")
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Dividends: double (nullable = true)
 |-- Stock Splits: double (nullable = true)



# Principais **transformações** e **ações** disponíveis no Apache Spark DataFrame
Depois de criar o DataFrame a partir do arquivo CSV, podemos aplicar todas as **transformações** e **ações** de suporte ao DataFrame. Abaixo estão algumas das principais **transformações** e **ações** disponíveis no Apache Spark DataFrame.

\
## Transformações

\
**```select```**: Seleciona um conjunto específico de colunas.

```python
df.select("coluna1", "coluna2")
```
\
**```filter```**: Filtra as linhas com base em uma condição.

```python
df.filter(df["coluna"] > 10)
```

\
**```groupBy```**: Agrupa o DataFrame por uma ou mais colunas.

```python
df.groupBy("coluna1").agg({"coluna2": "sum"})
```

\
**```withColumn```**: Adiciona ou substitui uma coluna.
```python
df.withColumn("nova_coluna", df["coluna"] * 2)
```

\
**```join```**: Realiza uma junção entre dois DataFrames.
```python
df1.join(df2, df1["coluna1"] == df2["coluna2"], "inner")
```

\
**```orderBy```**: Ordena o DataFrame com base em uma ou mais colunas.

```python
df.orderBy("coluna1", ascending=False)
```

\
**```drop```**: Remove uma coluna do DataFrame.
```python
df.drop("coluna")
```
\
**```distinct```**: Retorna as linhas distintas do DataFrame.
```python
df.distinct()
```
\
##Ações
\
**```show```**: Exibe as primeiras linhas do DataFrame.
```python
df.show()
```

\
**```count```**: Retorna o número de linhas no DataFrame.
```python
df.count()
```

\
**```collect```**: Retorna todas as linhas do DataFrame como uma lista no programa driver.
```python
df.collect()
```

\
**```take```**: Retorna as primeiras **```n```** linhas do DataFrame.
```python
df.take(5)
```

\
**```describe```**: Calcula estatísticas descritivas para colunas numéricas.
```python
df.describe("coluna1")
```

\
**```printSchema```**: Exibe o esquema do DataFrame.
```python
df.printSchema()
```

\
**```write```**: Escreve o DataFrame em fontes externas.
```python
df.write.format("parquet").save("caminho/do/arquivo")
```

\
**```save```**: Salva o DataFrame em um formato específico (por exemplo, parquet, CSV).
```python
df.write.save("caminho/do/arquivo", format="parquet")
```


# 📚 Exercício: Análise de informações dos jogadores da NBA ⛹️
Responda as seguintes questões a partir dos dados de jogadores da NBA [NBA Players stats since 1950](https://www.kaggle.com/datasets/drgilermo/nba-players-stats).

1. Qual a média de altura e de peso dos jogadores?
2. Crie uma nova coluna com o $IMC = \frac{peso}{altura²}$.
3. Qual a maior altura, o maior peso, a menor altura e o menor peso?
4. Em qual estado americano nasceu o maior número de jogadores?
5. Qual ou quais são os jogadores mais altos?

In [None]:
!head Players.csv

,Player,height,weight,collage,born,birth_city,birth_state
0,Curly Armstrong,180,77,Indiana University,1918,,
1,Cliff Barker,188,83,University of Kentucky,1921,Yorktown,Indiana
2,Leo Barnhorst,193,86,University of Notre Dame,1924,,
3,Ed Bartels,196,88,North Carolina State University,1925,,
4,Ralph Beard,178,79,University of Kentucky,1927,Hardinsburg,Kentucky
5,Gene Berce,180,79,Marquette University,1926,,
6,Charlie Black,196,90,University of Kansas,1921,Arco,Idaho
7,Nelson Bobb,183,77,Temple University,1924,Philadelphia,Pennsylvania
8,Jake Bornheimer,196,90,Muhlenberg College,1927,New Brunswick,New Jersey


In [None]:
players_df = spark.read.option('header', True).option('inferSchema', True).csv("Players.csv")
players_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Player: string (nullable = true)
 |-- height: integer (nullable = true)
 |-- weight: integer (nullable = true)
 |-- collage: string (nullable = true)
 |-- born: integer (nullable = true)
 |-- birth_city: string (nullable = true)
 |-- birth_state: string (nullable = true)



In [None]:
players_df.show()

+---+----------------+------+------+--------------------+----+-------------+------------+
|_c0|          Player|height|weight|             collage|born|   birth_city| birth_state|
+---+----------------+------+------+--------------------+----+-------------+------------+
|  0| Curly Armstrong|   180|    77|  Indiana University|1918|         NULL|        NULL|
|  1|    Cliff Barker|   188|    83|University of Ken...|1921|     Yorktown|     Indiana|
|  2|   Leo Barnhorst|   193|    86|University of Not...|1924|         NULL|        NULL|
|  3|      Ed Bartels|   196|    88|North Carolina St...|1925|         NULL|        NULL|
|  4|     Ralph Beard|   178|    79|University of Ken...|1927|  Hardinsburg|    Kentucky|
|  5|      Gene Berce|   180|    79|Marquette University|1926|         NULL|        NULL|
|  6|   Charlie Black|   196|    90|University of Kansas|1921|         Arco|       Idaho|
|  7|     Nelson Bobb|   183|    77|   Temple University|1924| Philadelphia|Pennsylvania|
|  8| Jake

#### 1️⃣. Qual a média de altura e de peso dos jogadores?

In [None]:
from pyspark.sql.functions import col, mean, udf, round

In [None]:
players_df.select(round(mean(col('height')/100), 2)).collect()[0][0]

1.99

In [None]:
# Solução alternativa

from pyspark.sql.functions import mean, avg

players_df.select( avg( players_df['height'] ),  mean( players_df.weight )  ).show()

+------------------+-----------------+
|       avg(height)|      avg(weight)|
+------------------+-----------------+
|198.70492221372098|94.78321856669217|
+------------------+-----------------+



In [None]:
players_df.agg( { 'height': 'avg', 'weight':'avg' } ).show()

+-----------------+------------------+
|      avg(weight)|       avg(height)|
+-----------------+------------------+
|94.78321856669217|198.70492221372098|
+-----------------+------------------+



In [None]:
from pyspark.sql.functions import col, mean, udf

media_altura = df.select(round(mean(col('height')/100), 2)).collect()[0][0]
media_peso = df.select(round(mean(col('weight')), 2)).collect()[0][0]

print("""Média de altura dos jogadores: {}m \nMédia de peso dos jogadores: {}kg
      """.format(media_altura, media_peso))