#Capturando dados do Covid

In [None]:
# Importando biblitecas
import requests 
import json
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType , DoubleType
from pyspark.sql.functions import substring, to_date, year , month, col , sum , desc , asc
from datetime import datetime
import pandas as pd

In [None]:
# Fazendo o requests 
url = 'https://covid19-brazil-api.now.sh/api/report/v1'
headers = {}

response = requests.request('GET', url, headers=headers)

In [None]:
# Transformando  o arquivo para o formato Json 
dados_covid = response.json()

In [None]:
# Definindo o schema 
schema = StructType([
     StructField("cases", IntegerType(), True),
     StructField("datetime", StringType(), True),
     StructField("deaths", IntegerType(), True),
     StructField("refuses", IntegerType(), True),
     StructField("state", StringType(), True),
     StructField("suspects", IntegerType(), True),
     StructField("uf", StringType(), True),
     StructField("uid", IntegerType(), True),
      ])

In [None]:
# Criando o df 
df = spark.createDataFrame(dados_covid['data'])

In [None]:
display(df)

cases,datetime,deaths,refuses,state,suspects,uf,uid
5411545,2022-05-05T20:37:22.288Z,168311,596,São Paulo,5334,SP,35
3360000,2022-05-05T20:37:22.288Z,61353,104,Minas Gerais,925,MG,31
2465347,2022-05-05T20:37:22.288Z,43124,119,Paraná,400,PR,41
2347290,2022-05-05T20:37:22.288Z,39323,330,Rio Grande do Sul,416,RS,43
2149139,2022-05-05T20:37:22.288Z,73480,148,Rio de Janeiro,1254,RJ,33
1707242,2022-05-05T20:37:22.288Z,21783,47,Santa Catarina,346,SC,42
1544000,2022-05-05T20:37:22.288Z,29863,36,Bahia,573,BA,29
1335144,2022-05-05T20:37:22.288Z,26508,55,Goiás,353,GO,52
1244705,2022-05-05T20:37:22.288Z,26928,89,Ceará,493,CE,23
1047222,2022-05-05T20:37:22.288Z,14396,21,Espírito Santo,71,ES,32


In [None]:
df = df.withColumn("datetime", substring("datetime", 0,19))

In [None]:
df = df.withColumn("datetime",to_date("datetime", "yyyy-MM-dd'T'HH:mm:ss"))

In [None]:
df.show()

+-------+----------+------+-------+-------------------+--------+---+---+
|  cases|  datetime|deaths|refuses|              state|suspects| uf|uid|
+-------+----------+------+-------+-------------------+--------+---+---+
|5411545|2022-05-05|168311|    596|          São Paulo|    5334| SP| 35|
|3360000|2022-05-05| 61353|    104|       Minas Gerais|     925| MG| 31|
|2465347|2022-05-05| 43124|    119|             Paraná|     400| PR| 41|
|2347290|2022-05-05| 39323|    330|  Rio Grande do Sul|     416| RS| 43|
|2149139|2022-05-05| 73480|    148|     Rio de Janeiro|    1254| RJ| 33|
|1707242|2022-05-05| 21783|     47|     Santa Catarina|     346| SC| 42|
|1544000|2022-05-05| 29863|     36|              Bahia|     573| BA| 29|
|1335144|2022-05-05| 26508|     55|              Goiás|     353| GO| 52|
|1244705|2022-05-05| 26928|     89|              Ceará|     493| CE| 23|
|1047222|2022-05-05| 14396|     21|     Espírito Santo|      71| ES| 32|
| 925885|2022-05-05| 21629|     23|         Pernamb

In [None]:
df = df.withColumn("Month",  month("datetime"))\
      .withColumn("Year",  year("datetime"))

### Realizando a conexão no PostegreeSql com pyspark

In [None]:

pgDF = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://pgservidor.postgres.database.azure.com:5432/postgres?user=henrysilva@pgservidor&password=*********&sslmode=require")\
.option("dbtable", "pg_catalog.pg_tables")\
.option("user", "henrysilva").option("password", "********").load()
      

In [None]:
display(pgDF.head(5))

schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity
pg_catalog,pg_statistic,azure_superuser,,True,False,False,False
pg_catalog,pg_foreign_table,azure_superuser,,True,False,False,False
pg_catalog,pg_authid,azure_superuser,pg_global,True,False,False,False
pg_catalog,pg_user_mapping,azure_superuser,,True,False,False,False
pg_catalog,pg_subscription,azure_superuser,pg_global,True,False,False,False


In [None]:
# cria a tabela "dados_covid" a apartir dos dados do dataframe df.
df.write.mode("overwrite")\
  .format("jdbc")\
  .option("url", "jdbc:postgresql://pgservidor.postgres.database.azure.com:5432/postgres?user=henrysilva@pgservidor&password=*********&sslmode=require")\
  .option("dbtable", "dados_covid")\
  .option("user", "henrysilva")\
  .option("password", "*******")\
  .save()

In [None]:
query_sql_ceara = """  
  SELECT CASES AS CASOS,
         datetime as Datetime,
         deaths as Mortes,
         state as Estado 
         FROM dados_covid
         
         where state in ('Ceará', 'Rio de Janeiro')

"""

In [None]:
#Realizando uma query: selecionando somente os dados do ceará 
dados_covid_ceara = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://pgservidor.postgres.database.azure.com:5432/postgres?user=henrysilva@pgservidor&password=*********&sslmode=require")\
.option("query", query_sql_ceara)\
.option("user", "henrysilva").option("password", "********").load()
      

In [None]:
display(dados_covid_ceara.show())

+-------+----------+------+--------------+
|  casos|  datetime|mortes|        estado|
+-------+----------+------+--------------+
|1244705|2022-05-05| 26928|         Ceará|
|2149139|2022-05-05| 73480|Rio de Janeiro|
+-------+----------+------+--------------+

