In [1]:
# run in prompt -> python -m pip install findspark
import platform
print('Python Version ->', platform.python_version())
import findspark
findspark.find()
findspark.init()
import pyspark
findspark.find()
print('Spark Version ->', pyspark.__version__)

Python Version -> 3.7.13
Spark Version -> 2.4.6


In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark import SparkContext, SparkConf, SparkFiles
import urllib.request
from datetime import date

appName = "TCC_covid"

spark = SparkSession \
        .builder \
        .appName(appName) \
        .master('local[*]')\
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.2.1")\
        .config("spark.mongodb.input.uri", "mongodb://localhost:27017/CovidDB") \
        .config("spark.mongodb.output.uri", "mongodb://localhost:27017/CovidDB") \
        .getOrCreate()
spark

# Coleta de Dados Covid-19

In [10]:
#Rodar de segunda a sexta após as 16:00 devido disponibilização dos dados de vacinação.
#Não havendo dados de vacinação não haverá coleta de dados deste. Separar jobs.

# url = "https://www.seade.gov.br/wp-content/uploads/2021/06/Dados-covid-19-estado.csv" #antigo
url = 'https://www.seade.gov.br/wp-content/uploads/coronavirus-files/Dados-covid-19-estado.csv' #novo

dataAtual = date.today()
diaAtual = str(dataAtual)[8:10]
mesAtual = str(dataAtual)[5:7]
anoAtual = str(dataAtual)[0:4]
dataAtualFmt = anoAtual+mesAtual+diaAtual

# Trata Url da evolução da vacinação
urlVac = f"https://www.saopaulo.sp.gov.br/wp-content/uploads/{anoAtual}/{mesAtual}/{dataAtualFmt}_evolucao_aplicacao_doses.csv"

arqVac = urlVac[58::1]

print(url)
print(urlVac)
print(arqVac)

https://www.seade.gov.br/wp-content/uploads/coronavirus-files/Dados-covid-19-estado.csv
https://www.saopaulo.sp.gov.br/wp-content/uploads/2022/09/20220913_evolucao_aplicacao_doses.csv
20220913_evolucao_aplicacao_doses.csv


In [72]:
from pyspark.sql.types import *

spark.sparkContext.addFile(urlVac)

# Create an expected schema
columns = StructType([StructField('Data',
                                  StringType(), True),
                    StructField('Dose',
                                StringType(), True),
                    StructField('Qtde',
                                IntegerType(), True)])

df = spark.read.csv(SparkFiles.get(arqVac), schema=columns, header=True, sep=';')
dfVac = df.filter(f.col("Qtde").cast("int").isNotNull())

print('Realizando Tratamento dos Dados da Vacinacao')
dfVac = dfVac.na.fill(0)
dfVac.printSchema()

dfVac = dfVac.withColumn('Data', f.regexp_replace(dfVac.columns[0], ' de janeiro de ', '/01/'))\
            .withColumn('Data', f.regexp_replace(dfVac.columns[0], ' de fevereiro de ', '/02/'))\
            .withColumn('Data', f.regexp_replace(dfVac.columns[0], ' de março de ', '/03/'))\
            .withColumn('Data', f.regexp_replace(dfVac.columns[0], ' de abril de ', '/04/'))\
            .withColumn('Data', f.regexp_replace(dfVac.columns[0], ' de maio de ', '/05/'))\
            .withColumn('Data', f.regexp_replace(dfVac.columns[0], ' de junho de ', '/06/'))\
            .withColumn('Data', f.regexp_replace(dfVac.columns[0], ' de julho de ', '/07/'))\
            .withColumn('Data', f.regexp_replace(dfVac.columns[0], ' de agosto de ', '/08/'))\
            .withColumn('Data', f.regexp_replace(dfVac.columns[0], ' de setembro de ', '/09/'))\
            .withColumn('Data', f.regexp_replace(dfVac.columns[0], ' de outubro de ', '/10/'))\
            .withColumn('Data', f.regexp_replace(dfVac.columns[0], ' de novembro de ', '/11/'))\
            .withColumn('Data', f.regexp_replace(dfVac.columns[0], ' de dezembro de ', '/12/'))

dfVac = dfVac.withColumn('month', f.split(dfVac.columns[0], '/').getItem(0)) \
             .withColumn('day', f.split(dfVac.columns[0], '/').getItem(1)) \
             .withColumn('year', f.split(dfVac.columns[0], '/').getItem(2))\
             .withColumn('slash', f.lit('/'))


dfVac = dfVac.withColumn('Tp_Dose', f.lit(f.col(dfVac.columns[1])))\
    .withColumn('Qtd_Doses_Apl_Dia', f.lit(f.col(dfVac.columns[2])))\
    .drop(dfVac.columns[1])\
    .drop(dfVac.columns[2])

dfVac = dfVac.withColumn('Data', f.concat(f.lpad(dfVac.day, 2, '0'),  dfVac.slash, f.lpad(dfVac.month, 2, '0'), dfVac.slash, dfVac.year))

dfVac.printSchema()

dfVac = dfVac.withColumn('Data', f.from_unixtime(f.unix_timestamp('Data', 'dd/MM/yyyy')))
dfVac = dfVac.withColumn('Data', f.lit(f.substring(dfVac.Data,1,10)))

dfVac = dfVac.drop(dfVac.columns[1])\
             .drop(dfVac.columns[2])\
             .drop(dfVac.columns[3])\
             .drop(dfVac.columns[4])

dfVac.show(20,False)
print('FIM do Tratamento dos Dados da Vacinacao')

Realizando Tratamento dos Dados da Vacinacao
root
 |-- Data: string (nullable = true)
 |-- Dose: string (nullable = true)
 |-- Qtde: integer (nullable = true)

root
 |-- Data: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- year: string (nullable = true)
 |-- slash: string (nullable = false)
 |-- Tp_Dose: string (nullable = true)
 |-- Qtd_Doses_Apl_Dia: integer (nullable = true)

+----------+-----------------+-----------------+
|Data      |Tp_Dose          |Qtd_Doses_Apl_Dia|
+----------+-----------------+-----------------+
|2021-01-17|1º DOSE          |342              |
|2021-01-17|1º DOSE ADICIONAL|31               |
|2021-01-17|2º DOSE          |50               |
|2021-01-17|ÚNICA            |19               |
|2021-01-18|1º DOSE          |1158             |
|2021-01-18|1º DOSE ADICIONAL|49               |
|2021-01-18|2º DOSE          |30               |
|2021-01-18|ÚNICA            |138              |
|2021-01-19|1º DOSE     

In [62]:
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("Dados-covid-19-estado.csv"), inferSchema=True, header=True, sep=';')

df.printSchema()

df = df.na.fill(0)

df = df.withColumn('Data', f.regexp_replace('Data', 'jan', '01'))\
       .withColumn('Data', f.regexp_replace('Data', 'fev', '02'))\
       .withColumn('Data', f.regexp_replace('Data', 'mar', '03'))\
       .withColumn('Data', f.regexp_replace('Data', 'abr', '04'))\
       .withColumn('Data', f.regexp_replace('Data', 'mai', '05'))\
       .withColumn('Data', f.regexp_replace('Data', 'jun', '06'))\
       .withColumn('Data', f.regexp_replace('Data', 'jul', '07'))\
       .withColumn('Data', f.regexp_replace('Data', 'ago', '08'))\
       .withColumn('Data', f.regexp_replace('Data', 'set', '09'))\
       .withColumn('Data', f.regexp_replace('Data', 'out', '10'))\
       .withColumn('Data', f.regexp_replace('Data', 'nov', '11'))\
       .withColumn('Data', f.regexp_replace('Data', 'dez', '12'))


df = df.withColumn('Total_de_Casos', f.lit(f.col(df.columns[1])))\
       .withColumn('Casos_por_Dia', f.lit(f.col(df.columns[2])))\
       .withColumn('Obitos_por_Dia', f.lit(f.col(df.columns[3])))\
       .drop(df.columns[1])\
       .drop(df.columns[2])\
       .drop(df.columns[3])\
       .drop(df.columns[4])\
       .drop(df.columns[5])\
       .drop(df.columns[6])\
       .drop(df.columns[7])\
       .drop(df.columns[8])

df.show(20,False)

df = df.withColumn('Data', f.from_unixtime(f.unix_timestamp('Data', 'dd/MM/yyyy')))
df = df.withColumn('Data', f.lit(f.substring(df.Data,1,10)))

df.show(20,False)

df.printSchema()

root
 |-- Data: string (nullable = true)
 |-- Casos: integer (nullable = true)
 |-- Casos por dia: integer (nullable = true)
 |-- �bitos por dia: integer (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)

+----------+--------------+-------------+--------------+
|Data      |Total_de_Casos|Casos_por_Dia|Obitos_por_Dia|
+----------+--------------+-------------+--------------+
|26/02/2020|1             |1            |0             |
|27/02/2020|0             |0            |0             |
|28/02/2020|0             |0            |0             |
|29/02/2020|2             |1            |0             |
|01/03/2020|0             |0            |0             |
|02/03/2020|0             |0            |0             |
|03/03/2020|0             |0            |0             |
|04/03/2020|3             |1            |0             |
|05/03/2020|6            

### Gravação das Bases no MongoDB

In [29]:
#Inicio gravação e recuperação dos dados da covid via mongodb
df.write.format("com.mongodb.spark.sql.DefaultSource")\
        .option("uri", "mongodb://localhost:27017/CovidDB.dados_covid_19_sp")\
        .mode("overwrite")\
        .save()
df.unpersist()
#Fim gravação e recuperação dos dados da covid via mongodb

#Inicio gravação e recuperação dos dados da vacinação contra o covid via mongodb
dfVac.write.format("com.mongodb.spark.sql.DefaultSource")\
        .option("uri", "mongodb://localhost:27017/CovidDB.dados_vacinacao_covid_19_sp")\
        .mode("overwrite")\
        .save()
dfVac.unpersist()
#Fim gravação e recuperação dos dados da vacinação contra o covid via mongodb

DataFrame[Data: string, Tp_Dose: string, Qtd_Doses_Apl_Dia: int]

### Consulta Base MongoDB Covid-19

In [3]:
df2 = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
                .option("uri", "mongodb://localhost:27017/CovidDB.dados_covid_19_sp")\
                .load()

df2.select(['*']).orderBy(f.col('Data').desc()).show(10,False)
df2.select('Data', 'Obitos_por_Dia').orderBy(f.col('Data').desc()).show(10,False)
df2.select(f.format_number(f.sum('Casos_por_Dia'),0).alias('Total_de_Casos'), f.format_number(f.sum('Obitos_por_Dia'),0).alias('Total_de_Mortes')).show()

+-------------+----------+--------------+--------------+--------------------------+
|Casos_por_Dia|Data      |Obitos_por_Dia|Total_de_Casos|_id                       |
+-------------+----------+--------------+--------------+--------------------------+
|595          |2022-09-18|1             |6071578       |[6327c70536b3b908a2f69121]|
|2051         |2022-09-17|31            |6070983       |[6327c70536b3b908a2f69120]|
|3012         |2022-09-16|42            |6068932       |[6327c70536b3b908a2f6911f]|
|2962         |2022-09-15|16            |6065920       |[6327c70536b3b908a2f6911e]|
|3947         |2022-09-14|26            |6062958       |[6327c70536b3b908a2f6911d]|
|2869         |2022-09-13|29            |6059011       |[6327c70536b3b908a2f6911c]|
|434          |2022-09-12|3             |6056142       |[6327c70536b3b908a2f6911b]|
|614          |2022-09-11|1             |6055708       |[6327c70536b3b908a2f6911a]|
|3183         |2022-09-10|19            |6055094       |[6327c70536b3b908a2f

### Consulta Base MongoDB Vacinação

In [4]:
df2Vac = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
                .option("uri", "mongodb://localhost:27017/CovidDB.dados_vacinacao_covid_19_sp")\
                .load()
df2Vac.select(['*']).orderBy(f.col('Data').desc(), f.col('Tp_Dose')).show(10,False)
df2Vac.groupBy('Tp_Dose').agg(f.sum('Qtd_Doses_Apl_Dia').alias('Total_de_Doses_Apl')).show()
df2Vac.select(f.format_number(f.sum('Qtd_Doses_Apl_Dia'),0).alias('Total_de_Doses_Apl')).show()

+----------+-----------------+-----------------+--------------------------+
|Data      |Qtd_Doses_Apl_Dia|Tp_Dose          |_id                       |
+----------+-----------------+-----------------+--------------------------+
|2022-09-18|82               |2º DOSE ADICIONAL|[6327c71336b3b908a2f69d76]|
|2022-09-17|1224             |1º DOSE          |[6327c71336b3b908a2f69d71]|
|2022-09-17|1413             |1º DOSE ADICIONAL|[6327c71336b3b908a2f69d72]|
|2022-09-17|1128             |2º DOSE          |[6327c71336b3b908a2f69d73]|
|2022-09-17|6156             |2º DOSE ADICIONAL|[6327c71336b3b908a2f69d74]|
|2022-09-17|44               |3º DOSE ADICIONAL|[6327c71336b3b908a2f69d75]|
|2022-09-16|4910             |1º DOSE          |[6327c71336b3b908a2f69d6b]|
|2022-09-16|9396             |1º DOSE ADICIONAL|[6327c71336b3b908a2f69d6c]|
|2022-09-16|4319             |2º DOSE          |[6327c71336b3b908a2f69d6d]|
|2022-09-16|30083            |2º DOSE ADICIONAL|[6327c71336b3b908a2f69d6e]|
+----------+

In [5]:
df2Vac.select(f.max(df2Vac.Data)).show()

+----------+
| max(Data)|
+----------+
|2022-09-18|
+----------+



In [22]:
dfjoin = df2.alias('cov').select(f.col('cov.Data'), f.col('cov.Casos_por_Dia'), f.col('cov.Obitos_por_Dia'))\
                         .join(df2Vac.alias('vac').select(f.col('vac.Data').alias('dataVac'),f.col('vac.Qtd_Doses_Apl_Dia'), f.col('vac.Tp_Dose'))\
                               , f.col('cov.Data') == f.col('dataVac'), "left").na.fill(0).na.fill("")
dfjoin = dfjoin.drop(dfjoin.columns[3])
dfD = df2Vac.groupBy('Data').agg(f.sum('Qtd_Doses_Apl_Dia').alias('Total_de_Doses_Apl_Dia'))

dfjoin2 = dfjoin.join(dfD.alias('sjoin').select(f.col('sjoin.Data').alias('sjoinData'), f.col('sjoin.Total_de_Doses_Apl_Dia'))\
                               , dfjoin.Data == f.col('sjoinData'), "left").na.fill(0).na.fill("")

dfjoin.unpersist()
dfD.unpersist()
# dfjoin2.printSchema()
dfjoin2 = dfjoin2.drop(dfjoin2.sjoinData)

dflst = dfjoin2.groupBy('Data').agg(f.collect_list(f.struct('Tp_Dose', 'Qtd_Doses_Apl_Dia')).alias('Lista_Vac'))

dfjoin3 = dfjoin2.join(dflst.alias('join3').select(f.col('join3.Data').alias('join3Dt'), f.col('join3.Lista_Vac'))\
                               , dfjoin2.Data == f.col('join3Dt'), 'left').na.fill(0).na.fill("")

dfcons = dfjoin3.select('Data', 'Casos_por_Dia', 'Obitos_por_Dia', 'Total_de_Doses_Apl_Dia', 'Lista_Vac').orderBy('Data')
dfcons = dfcons.dropDuplicates(['Data'])

dfcons.printSchema()

dfcons.show(30,False)
dfjoin2.unpersist()
dfjoin3.unpersist()
dflst.unpersist()
# dfjoin2.groupBy('Data', 'Tp_Dose', 'Qtd_Doses_Apl_Dia').agg(f.struct('Tp_Dose', 'Qtd_Doses_Apl_Dia').alias('Lista_Vacina')).show(30,False)
# dfjoin2.select(f.struct('Tp_Dose', 'Qtd_Doses_Apl_Dia').alias('Lista_Vacina').collect()).show(30,False)
# dfjoin2.groupBy('Data').agg(f.collect_list(f.struct('Tp_Dose', 'Qtd_Doses_Apl_Dia')).alias('Lista_Vac')).show(50,False)
# dfjoin2.select(['*']).orderBy(f.col('Data').desc(), f.col('Tp_Dose')).show(10,False)
# dfjoin2.groupBy('Data','Total_de_Doses_Apl_Dia').count().orderBy(f.col('Data').desc(), 'Total_de_Doses_Apl_Dia').drop('count').show()
# dfjoin2.select('Data', 'Total_de_Doses_Apl_Dia').distinct().agg(f.sum('Total_de_Doses_Apl_Dia')).alias('Total_de_Doses_Apl').show()

root
 |-- Data: string (nullable = false)
 |-- Casos_por_Dia: integer (nullable = true)
 |-- Obitos_por_Dia: integer (nullable = true)
 |-- Total_de_Doses_Apl_Dia: long (nullable = true)
 |-- Lista_Vac: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Tp_Dose: string (nullable = false)
 |    |    |-- Qtd_Doses_Apl_Dia: integer (nullable = true)

+----------+-------------+--------------+----------------------+---------+
|Data      |Casos_por_Dia|Obitos_por_Dia|Total_de_Doses_Apl_Dia|Lista_Vac|
+----------+-------------+--------------+----------------------+---------+
|2020-02-26|1            |0             |0                     |[[, 0]]  |
|2020-02-27|0            |0             |0                     |[[, 0]]  |
|2020-02-28|0            |0             |0                     |[[, 0]]  |
|2020-02-29|1            |0             |0                     |[[, 0]]  |
|2020-03-01|0            |0             |0                     |[[, 0]]  |
|2020-03-02|0 

DataFrame[Data: string, Lista_Vac: array<struct<Tp_Dose:string,Qtd_Doses_Apl_Dia:int>>]

In [None]:
dfcons.write.format("com.mongodb.spark.sql.DefaultSource")\
        .option("uri", "mongodb://localhost:27017/CovidDB.dados_consolidados_vac_casos_covid_sp")\
        .mode("overwrite")\
        .save()
dfcons.unpersist()

df3 = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
                .option("uri", "mongodb://localhost:27017/CovidDB.dados_consolidados_vac_casos_covid_sp")\
                .load()


In [24]:
df3.select('Data', 'Casos_por_Dia', 'Obitos_por_Dia', 'Total_de_Doses_Apl_Dia', 'Lista_Vac').orderBy('Data').show(30,False)

+----------+-------------+--------------+----------------------+---------+
|Data      |Casos_por_Dia|Obitos_por_Dia|Total_de_Doses_Apl_Dia|Lista_Vac|
+----------+-------------+--------------+----------------------+---------+
|2020-02-26|1            |0             |0                     |[[, 0]]  |
|2020-02-27|0            |0             |0                     |[[, 0]]  |
|2020-02-28|0            |0             |0                     |[[, 0]]  |
|2020-02-29|1            |0             |0                     |[[, 0]]  |
|2020-03-01|0            |0             |0                     |[[, 0]]  |
|2020-03-02|0            |0             |0                     |[[, 0]]  |
|2020-03-03|0            |0             |0                     |[[, 0]]  |
|2020-03-04|1            |0             |0                     |[[, 0]]  |
|2020-03-05|3            |0             |0                     |[[, 0]]  |
|2020-03-06|4            |0             |0                     |[[, 0]]  |
|2020-03-07|3            