# Spark - Exercício prático

Este notebook contém o exercício prático de Spark usando python. Você pode realizá-lo localmente em sua máquina (se estiver utilizando Linux) ou subir o código no Google Colab. Se estiver usando Windows, verifique o doc [2-2-spark-exercicios](2-2-spark-exercicios.md) para um tutorial de instalação do Spark no Windows.

## Configuração do ambiente. 

As próximos células fazem a instalação e configuração do java-8 e do spark em sua máquina.

Obs.: Já instalado ambos na máquina local.

In [None]:
"""%%bash

# Instal Java
apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install PySpark
pip install -q pyspark"""

In [None]:
"""import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()"""

## 01 - Leitura de dados

Para começar, faça a leitura do arquivo `titanic.csv` para o ambiente spark. Algumas informações importantes:

- O arquivo tem ";" como separador,
- O arquivo possui os nomes das colunas na primeira linha,
- O arquivo deve ser lido com os tipos corretos.

In [54]:
url = "https://raw.githubusercontent.com/neylsoncrepalde/titanic_data_with_semicolon/main/titanic.csv"

# Desenvolva seu código de leitura aqui:

from pyspark.sql import SparkSession
from pyspark import SparkFiles

spark = SparkSession.builder.appName('Classify Urls').getOrCreate() # Inicia a sessão Spark (driver)
spark.sparkContext.addFile(url) # Adiciona o arquivo temporário


df_titanic = (spark.read.csv('file:///'+SparkFiles.get('titanic.csv'), 
                             header=True, 
                             inferSchema=True, 
                             sep=';') 
             ) # Faz a leitura do csv

## 02 - Algumas análises preliminares

Exiba na tela:

1) O schema da tabela
2) Os 10 primeiros casos
3) Apenas as pessoas que sobreviveram
4) Apenas as pessoas que não sobreviveram e eram do sexo masculino
5) A média de tarifa paga para cada classe
6) A média de tarifa paga para cada classe e sexo
7) Número de sobreviventes por classe e sexo
8) Número de sobreviventes por sexo e categorias de idade (as categorias de idade devem ser construídas como: abaixo de 18 = criança, entre 18 e 40 = jovem adulto, maior de 40 = adulto maduro).

In [5]:
# 1)
# Desenvolva sua resposta aqui:
df_titanic.schema
df_titanic.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [10]:
# 2)
# Desenvolva sua resposta aqui:
df_titanic.show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [16]:
# 3)
# Desenvolva sua resposta aqui:
df_titanic.filter('Survived=1').show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|    Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599| 71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|   7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|    53.1| C123|       S|
|          9|       1|     3|Johnson, Mrs. Osc...|female|27.0|    0|    2|          347742| 11.1333| null|       S|
|         10|       1|     2|Nasser, Mrs. Nich...|female|14.0|    1|    0|          237736| 30.0708| null|       C|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    

In [17]:
# 4)
# Desenvolva sua resposta aqui:
df_titanic.filter("Survived='0' and sex='male'").show()

+-----------+--------+------+--------------------+----+----+-----+-----+---------------+-------+-----------+--------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|         Ticket|   Fare|      Cabin|Embarked|
+-----------+--------+------+--------------------+----+----+-----+-----+---------------+-------+-----------+--------+
|          1|       0|     3|Braund, Mr. Owen ...|male|22.0|    1|    0|      A/5 21171|   7.25|       null|       S|
|          5|       0|     3|Allen, Mr. Willia...|male|35.0|    0|    0|         373450|   8.05|       null|       S|
|          6|       0|     3|    Moran, Mr. James|male|null|    0|    0|         330877| 8.4583|       null|       Q|
|          7|       0|     1|McCarthy, Mr. Tim...|male|54.0|    0|    0|          17463|51.8625|        E46|       S|
|          8|       0|     3|Palsson, Master. ...|male| 2.0|    3|    1|         349909| 21.075|       null|       S|
|         13|       0|     3|Saundercock, Mr. ...|male|2

In [33]:
# 5)
# Desenvolva sua resposta aqui:
from pyspark.sql.functions import col, round

# Para converter todo o dataframe:
#df_titanic.select(*(col(Fare).cast('numeric').alias(Fare) for Fare in df_titanic.columns)).printSchema()

# Para converter somente a coluna requisitada:
df_titanic = df_titanic.withColumn('Fare', col('Fare').cast('float'))
df_titanic_med = df_titanic.groupBy('Pclass').avg('Fare')
df_titanic_med = df_titanic_med.select('Pclass', round('avg(Fare)',2).alias('med_tarifa_paga')).orderBy('Pclass')

df_titanic_med.show()

+------+---------------+
|Pclass|med_tarifa_paga|
+------+---------------+
|     1|          84.15|
|     2|          20.66|
|     3|          13.68|
+------+---------------+



In [64]:
# 6)
# Desenvolva sua resposta aqui:
df_titanic_med_2 = df_titanic.groupBy('Pclass','Sex').avg('Fare')
df_titanic_med_2 = df_titanic_med_2.select('Pclass','Sex', round('avg(Fare)',2).alias('med_tarifa_paga')).orderBy('Sex', 'Pclass')
df_titanic_med_2.show()

+------+------+---------------+
|Pclass|   Sex|med_tarifa_paga|
+------+------+---------------+
|     1|female|         106.13|
|     2|female|          21.97|
|     3|female|          16.12|
|     1|  male|          67.23|
|     2|  male|          19.74|
|     3|  male|          12.66|
+------+------+---------------+



In [57]:
# 7)
# Desenvolva sua resposta aqui:
df_titanic.select('Pclass', 'Sex', 'Survived').groupBy('Pclass', 'Sex').sum('Survived').orderBy('Sex','Pclass').show(10)

+------+------+-------------+
|Pclass|   Sex|sum(Survived)|
+------+------+-------------+
|     1|female|           91|
|     2|female|           70|
|     3|female|           72|
|     1|  male|           45|
|     2|  male|           17|
|     3|  male|           47|
+------+------+-------------+



In [49]:
# 8)
# Desenvolva sua resposta aqui:
# abaixo de 18 = criança, entre 18 e 40 = jovem adulto, maior de 40 = adulto maduro
df_titanic = df_titanic.withColumn('Age', col('Age'))

from pyspark.sql.functions import when
df_titanic = df_titanic.withColumn('Age', \
                      when(df_titanic['Age']<18, 'criança')\
                      .when(df_titanic['Age']==18, 'jovem adulto')\
                      .when(df_titanic['Age']<=40 , 'jovem adulto')\
                      .when(df_titanic['Age']>40 , 'adulto maduro')
                      .otherwise(df_titanic['Age']))
table_categ = df_titanic.groupBy('Sex', 'Age').count().orderBy('Sex', 'Age').show()

+------+-------------+-----+
|   Sex|          Age|count|
+------+-------------+-----+
|female|         null|   53|
|female|adulto maduro|   48|
|female|      criança|   55|
|female| jovem adulto|  158|
|  male|         null|  124|
|  male|adulto maduro|  100|
|  male|      criança|   58|
|  male| jovem adulto|  295|
+------+-------------+-----+



## 03 - Escreve dados

Faça uma seleção apenas das pessoas que sobreviveram e escreva os dados em um arquivo **parquet** particionado por sexo e classe.

In [45]:
# Desenvolva a resposta aqui...
from pyspark.sql import Row

df_titanic = spark.read.csv('file:///'+SparkFiles.get('titanic.csv'), header=True, sep=';')
df_titanic_surv = df_titanic.filter('Survived=1')

df_titanic_surv.write.partitionBy("Pclass","Sex").mode("append").parquet("/Users/anacr/onboarding-ed-a3data/3-spark/titanic")

#Ex.: Consulta Pclass=1 Sex=male
# spark.read.parquet("/Users/anacr/onboarding-ed-a3data/3-spark/titanic.parquet/Pclass=1/Sex=male")
#titanic_parquet.explain()

Py4JJavaError: An error occurred while calling o505.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:874)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 167.0 failed 1 times, most recent failure: Lost task 0.0 in stage 167.0 (TID 3048) (DESKTOP-HBRS09Q.mshome.net executor driver): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: ExitCodeException exitCode=-1073741515: 
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:585)
	at org.apache.hadoop.util.Shell.run(Shell.java:482)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:776)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:869)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:852)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:149)
	at org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.newOutputWriter(FileFormatDataWriter.scala:241)
	at org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.write(FileFormatDataWriter.scala:262)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:278)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
	... 9 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:200)
	... 33 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: ExitCodeException exitCode=-1073741515: 
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:585)
	at org.apache.hadoop.util.Shell.run(Shell.java:482)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:776)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:869)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:852)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:149)
	at org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.newOutputWriter(FileFormatDataWriter.scala:241)
	at org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.write(FileFormatDataWriter.scala:262)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:278)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
	... 9 more
