In [4]:
!hdfs dfs -mkdir /user/alex/proj

In [18]:
!hdfs dfs -ls /user/alex/proj/data

Found 4 items
-rw-r--r--   3 root supergroup   62492959 2022-04-24 23:45 /user/alex/proj/data/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup   76520681 2022-04-24 23:45 /user/alex/proj/data/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv
-rw-r--r--   3 root supergroup   91120916 2022-04-24 23:45 /user/alex/proj/data/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup    3046774 2022-04-24 23:45 /user/alex/proj/data/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv


In [21]:
!hdfs dfs -cat /user/alex/proj/data/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv\ |head -n 3

regiao;estado;municipio;coduf;codmun;codRegiaoSaude;nomeRegiaoSaude;data;semanaEpi;populacaoTCU2019;casosAcumulado;casosNovos;obitosAcumulado;obitosNovos;Recuperadosnovos;emAcompanhamentoNovos;interior/metropolitana
Brasil;;;76;;;;2020-02-25;9;210147125;0;0;0;0;;;
Brasil;;;76;;;;2020-02-26;9;210147125;1;1;0;0;;;
cat: Unable to write to output stream.


In [47]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
schema = StructType()\
    .add("regiao", StringType())\
    .add("estado", StringType())\
    .add("municipio", StringType())\
    .add("coduf", IntegerType())\
    .add("codmun", IntegerType())\
    .add("codRegiaoSaude", IntegerType())\
    .add("nomeRegiaoSaude", StringType())\
    .add("data", DateType())\
    .add("semanaEpi", IntegerType())\
    .add("populacaoTCU2019", IntegerType())\
    .add("casosAcumulado", IntegerType())\
    .add("casosNovos", IntegerType())\
    .add("obitosAcumulado", IntegerType())\
    .add("obitosNovos", IntegerType())\
    .add("Recuperadosnovos", IntegerType())\
    .add("emAcompanhamentoNovos", IntegerType())\
    .add("interior/metropolitana", BooleanType())

In [3]:
print(schema)

StructType(List(StructField(regiao,StringType,true),StructField(estado,StringType,true),StructField(municipio,StringType,true),StructField(coduf,IntegerType,true),StructField(codmun,IntegerType,true),StructField(codRegiaoSaude,IntegerType,true),StructField(nomeRegiaoSaude,StringType,true),StructField(data,DateType,true),StructField(semanaEpi,IntegerType,true),StructField(populacaoTCU2019,IntegerType,true),StructField(casosAcumulado,IntegerType,true),StructField(casosNovos,IntegerType,true),StructField(obitosAcumulado,IntegerType,true),StructField(obitosNovos,IntegerType,true),StructField(Recuperadosnovos,IntegerType,true),StructField(emAcompanhamentoNovos,IntegerType,true),StructField(interior/metropolitana,BooleanType,true)))


In [4]:
covidbr = spark.read.csv("/user/alex/proj/data/", schema=schema, sep=";")

In [5]:
covidbr.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: date (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: integer (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior/metropolitana: boolean (nullable = true)



In [70]:
covidbr.write.parquet("/user/alex/proj/parquet/", partitionBy="municipio")

In [71]:
!hdfs dfs -ls /user/alex/proj/parquet/

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-04-25 00:42 /user/alex/proj/parquet/_SUCCESS
drwxr-xr-x   - root supergroup          0 2022-04-25 00:42 /user/alex/proj/parquet/municipio=__HIVE_DEFAULT_PARTITION__


In [6]:
covidbr = covidbr.withColumn("ano", year("data"))\
    .withColumn("mes", month("data"))

In [7]:
covidbr.sort(desc("municipio")).show(5)

+------+------+---------+-----+------+--------------+---------------+----+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+----+----+
|regiao|estado|municipio|coduf|codmun|codRegiaoSaude|nomeRegiaoSaude|data|semanaEpi|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interior/metropolitana| ano| mes|
+------+------+---------+-----+------+--------------+---------------+----+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+----+----+
|  null|  null|     null| null|  null|          null|           null|null|     null|            null|          null|      null|           null|       null|            null|                 null|                  null|null|null|
|  null|  null|     null| null|  null|          null|           null|null|     null|    

In [8]:
covid_mun = \
covidbr.groupBy(["ano", "mes", "municipio"])\
    .agg(sum("casosNovos").alias("Casos_mes"),
        sum("obitosNovos").alias("Obitos_mes"),
        max("populacaoTCU2019").alias("Pop"))

In [9]:
covid_uf = \
covidbr.groupBy(["ano", "mes", "estado"])\
    .agg(sum("casosNovos").alias("Casos_mes"),
        sum("obitosNovos").alias("Obitos_mes"),
        max("populacaoTCU2019").alias("Pop"))

In [10]:
covid_br = \
covidbr.groupBy(["ano", "mes"])\
    .agg(sum("casosNovos").alias("Casos_mes"),
        sum("obitosNovos").alias("Obitos_mes"),
        max("populacaoTCU2019").alias("Pop"))\
    .withColumn("Obitos_por_mil_habitantes", (1000*col("Obitos_mes")/col("Pop")))\
    .sort(["ano","mes"])

In [113]:
covid_mun.write.saveAsTable("covid_por_municipio",
                            partitionBy="mes",
                            mode="overwrite")

AnalysisException: "Can not create the managed table('`covid_por_municipio`'). The associated location('hdfs://namenode:8020/user/hive/warehouse/covid_por_municipio') already exists.;"

In [11]:
!hdfs dfs -ls /user/hive/warehouse/covid_por_municipio

Found 201 items
-rw-r--r--   2 root supergroup          0 2022-04-25 01:32 /user/hive/warehouse/covid_por_municipio/_SUCCESS
-rw-r--r--   2 root supergroup      10520 2022-04-25 01:32 /user/hive/warehouse/covid_por_municipio/part-00000-9f251a84-5d84-4191-b28a-18da84c058dd-c000.snappy.parquet
-rw-r--r--   2 root supergroup      10542 2022-04-25 01:32 /user/hive/warehouse/covid_por_municipio/part-00001-9f251a84-5d84-4191-b28a-18da84c058dd-c000.snappy.parquet
-rw-r--r--   2 root supergroup      10400 2022-04-25 01:32 /user/hive/warehouse/covid_por_municipio/part-00002-9f251a84-5d84-4191-b28a-18da84c058dd-c000.snappy.parquet
-rw-r--r--   2 root supergroup      10485 2022-04-25 01:32 /user/hive/warehouse/covid_por_municipio/part-00003-9f251a84-5d84-4191-b28a-18da84c058dd-c000.snappy.parquet
-rw-r--r--   2 root supergroup      10274 2022-04-25 01:32 /user/hive/warehouse/covid_por_municipio/part-00004-9f251a84-5d84-4191-b28a-18da84c058dd-c000.snappy.parquet
-rw-r--r--   2 root supergro

In [114]:
covid_uf.write.parquet("/user/alex/proj/uf/")

In [87]:
covid_br.write.csv("/user/alex/proj/br/")

In [88]:
!hdfs dfs -ls -R /user/alex/proj/br/

-rw-r--r--   2 root supergroup          0 2022-04-27 23:25 /user/alex/proj/br/_SUCCESS
-rw-r--r--   2 root supergroup         18 2022-04-27 23:25 /user/alex/proj/br/part-00000-4153b9c5-b55a-49f0-91ee-c80cd3b216e4-c000.csv
-rw-r--r--   2 root supergroup         25 2022-04-27 23:25 /user/alex/proj/br/part-00001-4153b9c5-b55a-49f0-91ee-c80cd3b216e4-c000.csv
-rw-r--r--   2 root supergroup         49 2022-04-27 23:25 /user/alex/proj/br/part-00002-4153b9c5-b55a-49f0-91ee-c80cd3b216e4-c000.csv
-rw-r--r--   2 root supergroup         50 2022-04-27 23:25 /user/alex/proj/br/part-00003-4153b9c5-b55a-49f0-91ee-c80cd3b216e4-c000.csv
-rw-r--r--   2 root supergroup         51 2022-04-27 23:25 /user/alex/proj/br/part-00004-4153b9c5-b55a-49f0-91ee-c80cd3b216e4-c000.csv
-rw-r--r--   2 root supergroup         51 2022-04-27 23:25 /user/alex/proj/br/part-00005-4153b9c5-b55a-49f0-91ee-c80cd3b216e4-c000.csv
-rw-r--r--   2 root supergroup         51 2022-04-27 23:25 /user/alex/proj/br/part-00006-4153b9c

In [89]:
!hdfs dfs -cat /user/alex/proj/br/part-00017-4153b9c5-b55a-49f0-91ee-c80cd3b216e4-c000.csv |head -n 10

2021,6,6034761,165825,210147125,0.7890900244293135


In [90]:
schema_agg = StructType()\
    .add("ano", IntegerType())\
    .add("mes", IntegerType())\
    .add("Casos_mes", IntegerType())\
    .add("Obitos_mes", IntegerType())\
    .add("Pop", IntegerType())\
    .add("Obitos_por_mil_habitantes", FloatType())

In [94]:
covid_stream = spark.readStream.csv("/user/alex/proj/br/", schema=schema_agg)

In [95]:
covid_stream.printSchema()

root
 |-- ano: integer (nullable = true)
 |-- mes: integer (nullable = true)
 |-- Casos_mes: integer (nullable = true)
 |-- Obitos_mes: integer (nullable = true)
 |-- Pop: integer (nullable = true)
 |-- Obitos_por_mil_habitantes: float (nullable = true)



In [96]:
topic_covid_br = covid_stream.writeStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafka:9092")\
    .option("topic", "topic-covid-br")\
    .option("checkpointLocation", "/user/alex/proj/checkpoint")\
    .start()