# Spark - Exercício prático

Este notebook contém o exercício prático de Spark usando python. Você pode realizá-lo localmente em sua máquina (se estiver utilizando Linux) ou subir o código no Google Colab. Se estiver usando Windows, verifique o doc [2-2-spark-exercicios](2-2-spark-exercicios.md) para um tutorial de instalação do Spark no Windows.

## Configuração do ambiente. 

As próximos células fazem a instalação e configuração do java-8 e do spark em sua máquina.

In [None]:
%%bash

# Install Java
# apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install PySpark
pip install -q pyspark

In [None]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

## 01 - Leitura de dados

Para começar, faça a leitura do arquivo `titanic.csv` para o ambiente spark. Algumas informações importantes:

- O arquivo tem ";" como separador,
- O arquivo possui os nomes das colunas na primeira linha,
- O arquivo deve ser lido com os tipos corretos.

In [58]:
import csv
import requests
from pathlib import Path
from urllib.parse import urlparse

import pyspark.sql.functions as func
from pyspark.sql.functions import when
from pyspark.sql.types import BooleanType

In [59]:
def baixar_dados(nome_arquivo, url):
  requisicao = requests.get(url)
  conteudo = requisicao.content
  arquivo_csv = open(nome_arquivo, 'wb')
  arquivo_csv.write(conteudo)
  arquivo_csv.close()

In [60]:
url = "https://raw.githubusercontent.com/neylsoncrepalde/titanic_data_with_semicolon/main/titanic.csv"
nome_arquivo = Path(urlparse(url).path).name
baixar_dados(nome_arquivo, url)

# Desenvolva seu código de leitura aqui:
df = spark.read.csv(path=nome_arquivo, header=True, sep=";", inferSchema=True)

## 02 - Algumas análises preliminares

Exiba na tela:

1) O schema da tabela
2) Os 10 primeiros casos
3) Apenas as pessoas que sobreviveram
4) Apenas as pessoas que não sobreviveram e eram do sexo masculino
5) A média de tarifa paga para cada classe
6) A média de tarifa paga para cada classe e sexo
7) Número de sobreviventes por classe e sexo
8) Número de sobreviventes por sexo e categorias de idade (as categorias de idade devem ser construídas como: abaixo de 18 = criança, entre 18 e 40 = jovem adulto, maior de 40 = adulto maduro).

In [61]:
df_casted = df.withColumn("Survived", df.Survived.cast(BooleanType()))

In [62]:
# 1)
# Desenvolva sua resposta aqui:
df_casted.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: boolean (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [63]:
# 2)
# Desenvolva sua resposta aqui:
df_casted.show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|   false|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|    true|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|    true|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|    true|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|   false|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|   false|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [64]:
# 3)
# Desenvolva sua resposta aqui:
df_casted.filter(df_casted.Survived == True).show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|    Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----+--------+
|          2|    true|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599| 71.2833|  C85|       C|
|          3|    true|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|   7.925| null|       S|
|          4|    true|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|    53.1| C123|       S|
|          9|    true|     3|Johnson, Mrs. Osc...|female|27.0|    0|    2|          347742| 11.1333| null|       S|
|         10|    true|     2|Nasser, Mrs. Nich...|female|14.0|    1|    0|          237736| 30.0708| null|       C|
|         11|    true|     3|Sandstrom, Miss. ...|female| 4.0|    1|    

In [65]:
# 4)
# Desenvolva sua resposta aqui:
df_casted.filter((df_casted.Survived == False) & (df_casted.Sex == "male")).show()

+-----------+--------+------+--------------------+----+----+-----+-----+---------------+-------+-----------+--------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|         Ticket|   Fare|      Cabin|Embarked|
+-----------+--------+------+--------------------+----+----+-----+-----+---------------+-------+-----------+--------+
|          1|   false|     3|Braund, Mr. Owen ...|male|22.0|    1|    0|      A/5 21171|   7.25|       null|       S|
|          5|   false|     3|Allen, Mr. Willia...|male|35.0|    0|    0|         373450|   8.05|       null|       S|
|          6|   false|     3|    Moran, Mr. James|male|null|    0|    0|         330877| 8.4583|       null|       Q|
|          7|   false|     1|McCarthy, Mr. Tim...|male|54.0|    0|    0|          17463|51.8625|        E46|       S|
|          8|   false|     3|Palsson, Master. ...|male| 2.0|    3|    1|         349909| 21.075|       null|       S|
|         13|   false|     3|Saundercock, Mr. ...|male|2

In [66]:
# 5)
# Desenvolva sua resposta aqui:
df_casted.groupBy("Pclass").agg(func.round(mean("Fare"), 2).alias("AvgFare")).sort("Pclass").show()

+------+-------+
|Pclass|AvgFare|
+------+-------+
|     1|  84.15|
|     2|  20.66|
|     3|  13.68|
+------+-------+



In [67]:
# 6)
# Desenvolva sua resposta aqui:
df_casted.groupBy("Pclass", "Sex").agg(func.round(mean("Fare"), 2).alias("AvgFare")).sort("Pclass", "Sex").show()

+------+------+-------+
|Pclass|   Sex|AvgFare|
+------+------+-------+
|     1|female| 106.13|
|     1|  male|  67.23|
|     2|female|  21.97|
|     2|  male|  19.74|
|     3|female|  16.12|
|     3|  male|  12.66|
+------+------+-------+



In [68]:
# 7)
# Desenvolva sua resposta aqui:
df_casted.filter(df_casted.Survived == True).groupBy("Pclass", "Sex").count().sort("Pclass", "Sex").show()

+------+------+-----+
|Pclass|   Sex|count|
+------+------+-----+
|     1|female|   91|
|     1|  male|   45|
|     2|female|   70|
|     2|  male|   17|
|     3|female|   72|
|     3|  male|   47|
+------+------+-----+



In [74]:
# 8)
# Desenvolva sua resposta aqui:
# Categorias de idade devem ser construídas como: abaixo de 18 = criança, entre 18 e 40 = jovem adulto, maior de 40 = adulto maduro).
df_casted.filter(df_casted.Survived == True) \
    .withColumn("Category", \
        when(df_casted.Age < 18, "children") \
        .when((df_casted.Age >= 18) & (df_casted.Age <= 40), "young adult") \
        .otherwise("mature adult")
        ) \
    .groupBy("Sex", "Category").count().sort("Sex", "Category").show()

+------+------------+-----+
|   Sex|    Category|count|
+------+------------+-----+
|female|    children|   38|
|female|mature adult|   73|
|female| young adult|  122|
|  male|    children|   23|
|  male|mature adult|   34|
|  male| young adult|   52|
+------+------------+-----+



## 03 - Escreve dados

Faça uma seleção apenas das pessoas que sobreviveram e escreva os dados em um arquivo **parquet** particionado por sexo e classe.

In [77]:
# Desenvolva a resposta aqui...
survived = df_casted.filter(df_casted.Survived == True)

survived.write.partitionBy("Sex", "Pclass").mode("overwrite").parquet("survived")


                                                                                