### Enviando Arquivos para o HDFS

- Enviar os dados para o diretório /semantix/challenge/input 
- Acessar o Namenode docker exec -it namenode bash
- Criar diretório no HDFS /user/NOME_USER/data/covid -- hdfs dfs -mkdir /user/NOME_USER/data/covid
- Enviar todos os dados .csv para o HDFS -- hdfs dfs -put /input/dados_covid_19/*.csv /user/NOME_USER/data/covid
- Enviar dados para a tabela HIVE -- load data inpath '/user/NOME_USER/data/covid' overwrite into table NOME_TABLE;

In [30]:
!hdfs dfs -ls /user/hive/warehouse/covid.db/dados_cov_br

Found 4 items
-rwxrwxr-x   3 root supergroup   62492959 2022-04-29 15:26 /user/hive/warehouse/covid.db/dados_cov_br/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv
-rwxrwxr-x   3 root supergroup   76520681 2022-04-29 15:26 /user/hive/warehouse/covid.db/dados_cov_br/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv
-rwxrwxr-x   3 root supergroup   91120916 2022-04-29 15:26 /user/hive/warehouse/covid.db/dados_cov_br/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv
-rwxrwxr-x   3 root supergroup    3046774 2022-04-29 15:26 /user/hive/warehouse/covid.db/dados_cov_br/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv


In [171]:
!hdfs dfs -ls /user/bross/data/covid

Found 2 items
drwxr-xr-x   - root supergroup          0 2022-04-29 17:53 /user/bross/data/covid/ano=2020
drwxr-xr-x   - root supergroup          0 2022-04-29 17:54 /user/bross/data/covid/ano=2021


In [217]:
df_covid = spark.read.csv('/user/bross/data/covid', header='true', sep=';', inferSchema=False)

In [175]:
from pyspark.sql.types import *

In [218]:
df_covid.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: string (nullable = true)
 |-- codmun: string (nullable = true)
 |-- codRegiaoSaude: string (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: string (nullable = true)
 |-- semanaEpi: string (nullable = true)
 |-- populacaoTCU2019: string (nullable = true)
 |-- casosAcumulado: string (nullable = true)
 |-- casosNovos: string (nullable = true)
 |-- obitosAcumulado: string (nullable = true)
 |-- obitosNovos: string (nullable = true)
 |-- Recuperadosnovos: string (nullable = true)
 |-- emAcompanhamentoNovos: string (nullable = true)
 |-- interior/metropolitana: string (nullable = true)
 |-- ano: integer (nullable = true)



In [219]:
schema_df = StructType([
    StructField('regiao', StringType(), nullable=False),
    StructField('estado', StringType(), nullable=False),
    StructField('municipio', StringType()),
    StructField('coduf', IntegerType()),
    StructField('codmun', IntegerType()),
    StructField('codRegiaoSaude', IntegerType()),
    StructField('nomeRegiaoSaude', StringType()),
    StructField('data', StringType()),
    StructField('semanaEpi', StringType()),
    StructField('populacaoTCU2019', StringType()),
    StructField('casosAcumulado', IntegerType()),
    StructField('casosNovos', IntegerType()),
    StructField('obitosAcumulado', IntegerType()),
    StructField('obitosNovos', IntegerType()),
    StructField('Recuperadosnovos', IntegerType()),
    StructField('emAcompanhamentoNovos', IntegerType()),
    StructField('interior/metropolitana', IntegerType()),
    StructField('ano', IntegerType())
])

In [220]:
df_covid = spark.read.csv('/user/bross/data/covid/ano=2020', header=True, sep=';', schema=schema_df)

In [221]:
df_covid.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: string (nullable = true)
 |-- semanaEpi: string (nullable = true)
 |-- populacaoTCU2019: string (nullable = true)
 |-- casosAcumulado: integer (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior/metropolitana: integer (nullable = true)
 |-- ano: integer (nullable = true)



In [222]:
type(df_covid)

pyspark.sql.dataframe.DataFrame

In [223]:
df_covid_reduced = df_covid.select('regiao', 'estado', 'municipio', 'data', 'casosAcumulado', 'obitosAcumulado', 'Recuperadosnovos', 'emAcompanhamentoNovos', 'ano')

df_covid_reduced.show(5)

+------+------+---------+----------+--------------+---------------+----------------+---------------------+----+
|regiao|estado|municipio|      data|casosAcumulado|obitosAcumulado|Recuperadosnovos|emAcompanhamentoNovos| ano|
+------+------+---------+----------+--------------+---------------+----------------+---------------------+----+
|Brasil|  null|     null|2020-02-25|             0|              0|            null|                 null|null|
|Brasil|  null|     null|2020-02-26|             1|              0|            null|                 null|null|
|Brasil|  null|     null|2020-02-27|             1|              0|            null|                 null|null|
|Brasil|  null|     null|2020-02-28|             1|              0|            null|                 null|null|
|Brasil|  null|     null|2020-02-29|             2|              0|            null|                 null|null|
+------+------+---------+----------+--------------+---------------+----------------+--------------------

In [224]:
df_covid_reduced = df_covid_reduced.na.fill(value=0)
df_covid_reduced.show(5)

+------+------+---------+----------+--------------+---------------+----------------+---------------------+---+
|regiao|estado|municipio|      data|casosAcumulado|obitosAcumulado|Recuperadosnovos|emAcompanhamentoNovos|ano|
+------+------+---------+----------+--------------+---------------+----------------+---------------------+---+
|Brasil|  null|     null|2020-02-25|             0|              0|               0|                    0|  0|
|Brasil|  null|     null|2020-02-26|             1|              0|               0|                    0|  0|
|Brasil|  null|     null|2020-02-27|             1|              0|               0|                    0|  0|
|Brasil|  null|     null|2020-02-28|             1|              0|               0|                    0|  0|
|Brasil|  null|     null|2020-02-29|             2|              0|               0|                    0|  0|
+------+------+---------+----------+--------------+---------------+----------------+---------------------+---+
o

In [225]:
from pyspark.sql.functions import *

In [226]:
df_covid_reduced = df_covid_reduced.withColumn('data', from_unixtime(unix_timestamp(col('data'), 'yyyy-MM-dd'), 'dd/MM/yyyy hh:mm'))

+------+------+---------+----------------+--------------+---------------+----------------+---------------------+---+
|regiao|estado|municipio|            data|casosAcumulado|obitosAcumulado|Recuperadosnovos|emAcompanhamentoNovos|ano|
+------+------+---------+----------------+--------------+---------------+----------------+---------------------+---+
|Brasil|  null|     null|25/02/2020 12:00|             0|              0|               0|                    0|  0|
|Brasil|  null|     null|26/02/2020 12:00|             1|              0|               0|                    0|  0|
|Brasil|  null|     null|27/02/2020 12:00|             1|              0|               0|                    0|  0|
|Brasil|  null|     null|28/02/2020 12:00|             1|              0|               0|                    0|  0|
|Brasil|  null|     null|29/02/2020 12:00|             2|              0|               0|                    0|  0|
+------+------+---------+----------------+--------------+-------

In [230]:
df_covid_reduced.show(5)

+------+------+---------+----------------+--------------+---------------+----------------+---------------------+---+
|regiao|estado|municipio|            data|casosAcumulado|obitosAcumulado|Recuperadosnovos|emAcompanhamentoNovos|ano|
+------+------+---------+----------------+--------------+---------------+----------------+---------------------+---+
|Brasil|  null|     null|25/02/2020 12:00|             0|              0|               0|                    0|  0|
|Brasil|  null|     null|26/02/2020 12:00|             1|              0|               0|                    0|  0|
|Brasil|  null|     null|27/02/2020 12:00|             1|              0|               0|                    0|  0|
|Brasil|  null|     null|28/02/2020 12:00|             1|              0|               0|                    0|  0|
|Brasil|  null|     null|29/02/2020 12:00|             2|              0|               0|                    0|  0|
+------+------+---------+----------------+--------------+-------

In [236]:
df_covid_reduced = df_covid_reduced.na.fill(value='')

In [237]:
df_covid_reduced.show(5)

+------+------+---------+----------------+--------------+---------------+----------------+---------------------+---+
|regiao|estado|municipio|            data|casosAcumulado|obitosAcumulado|Recuperadosnovos|emAcompanhamentoNovos|ano|
+------+------+---------+----------------+--------------+---------------+----------------+---------------------+---+
|Brasil|      |         |25/02/2020 12:00|             0|              0|               0|                    0|  0|
|Brasil|      |         |26/02/2020 12:00|             1|              0|               0|                    0|  0|
|Brasil|      |         |27/02/2020 12:00|             1|              0|               0|                    0|  0|
|Brasil|      |         |28/02/2020 12:00|             1|              0|               0|                    0|  0|
|Brasil|      |         |29/02/2020 12:00|             2|              0|               0|                    0|  0|
+------+------+---------+----------------+--------------+-------

In [227]:
df_covid_reduced.groupBy(df_covid_reduced['regiao']).agg({'casosAcumulado':'max'}).show()

+------------+-------------------+
|      regiao|max(casosAcumulado)|
+------------+-------------------+
|    Nordeste|             493400|
|         Sul|             492583|
|     Sudeste|            1462297|
|Centro-Oeste|             308868|
|      Brasil|            7675973|
|       Norte|             293540|
+------------+-------------------+

