<a href="https://colab.research.google.com/github/pablohenrique93/projeto_final_saude_covid19/blob/main/projeto_final_treat_spark_health_covid.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# INSTALAÇÃO E IMPORTAÇÃO DE BIBLIOTECAS NECESSÁRIAS 

Instalando o PySpark

In [None]:
#Foi realizado a instalação  da biblioteca pyspark
!pip install pyspark

In [None]:
#Realizado a instalação do gcsfs
!pip install gcsfs

Importando bibliotecas 

In [None]:
#Importando pyspark
import pyspark
# Importando a SparkSession
from pyspark.sql import SparkSession
#Importando pyspark sql funções
import pyspark.sql.functions as F
from pyspark.sql.functions import regexp_replace, split
from pyspark.sql.functions import col,isnan, when, count
from pyspark.sql.functions import row_number, asc, desc
#Importando pyspark window
from pyspark.sql.window import Window
#Importando pyspark sql types
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType, StringType, FloatType, LongType, DateType, TimestampType
#from pyspark.sql import *
#Importando o pandas 
import pandas as pd

In [None]:
#Importando os e google cloud
import os
from google.cloud import storage
import google.auth
from google.auth.transport.requests import Request

In [None]:
#Importando pymongo
import pymongo
from pymongo import MongoClient

# CONFIGURANDO E VERIFICANDO A SPARK SESSION E CRIANDO O DATA FRAME 

Configurando ambiente da Spark Session

In [None]:
#Configurando a variável do ambiente da sessão spark
spark = (
    SparkSession.builder
                .master('local')
                .appName('dataset')
                .config('spark.ui.port','4050')
                .getOrCreate()
)

Verificando a Spark Session

In [None]:
#Verificando a variavel spark
spark

Criando o data frame spark

In [None]:
# Criando dataframe 
df = (
    spark.createDataFrame(pd.read_json('/content/Europa-Covid-Test.json' ))
         
)

# MANIPULANDO O SCHEMA

In [None]:
#Visualizando o data frame
df.show(truncate = True)
#Na pré visualização decidir separar a coluna year_week

In [None]:
#Verificando os tipos da coluna
df.dtypes

Criando duas colunas novas year e week

In [None]:
#Criando uma variavel para receber a função split
split_cols = pyspark.sql.functions.split(df['year_week'], '-')

In [None]:
#Criando um novo df com as colunas novas
df1 = df.withColumn('year', split_cols.getItem(0)) \
        .withColumn('week', split_cols.getItem(1))

In [None]:
#Removendo a coluna year_week
df2 = df1.drop('year_week')

In [None]:
#Visualizando remoção da coluna
df2.show()

Manipulando o esquema

In [None]:
#Visualizando as colunas e o tipo delas, usando o schema
df2.printSchema()

In [None]:
#Criando um esquema novo, só foi utilizado para mudar o tipo das colunas
esquema = (
    StructType([
        StructField('country', StringType()),
        StructField('country_code', StringType()),
        StructField('level', StringType()),
        StructField('region', StringType()),
        StructField('region_name', StringType()),
        StructField('population', LongType()),
        StructField('new_cases', FloatType()),
        StructField('tests_done', FloatType()),
        StructField('testing_rate', FloatType()),
        StructField('positivity_rate', FloatType()),
        StructField('testing_data_source', StringType()),
        StructField('year', DateType()),
        StructField('week', StringType()),
    ])
)

In [None]:
#Convertendo data frame em arquivo csv
df2.write.format("csv").mode("overwrite").save("/content/projeto_f.csv")

In [None]:
#Criando um data frame com  as mudanças feita no tipo das colunas
df3 = (
    spark.read.format('csv')
              .option('header', 'true')
              .option('inferschema', 'false')
              .option('delimiter', ',')
              .load('/content/projeto_f.csv', schema = esquema)
)

In [None]:
#Visualizando o data frame 
df3.show()

In [None]:
#Verificando  o novo esquema com as modificações da coluna
df3.printSchema()

# MANIPULANDO AS COLUNAS

Renomeando colunas e valores da coluna pais

In [None]:
# Renomeando as colunas 
df4 = (df3.withColumnRenamed("country","pais")
          .withColumnRenamed("country_code","cod_pais")
          .withColumnRenamed("level","nivel")
          .withColumnRenamed("region","sigla")
          .withColumnRenamed("region_name","nome_regiao")
          .withColumnRenamed("population","populacao")
          .withColumnRenamed("new_cases","novos_casos")
          .withColumnRenamed("tests_done","testes_feitos")
          .withColumnRenamed("testing_rate","taxa_teste")
          .withColumnRenamed("positivity_rate","taxa_positiva")
          .withColumnRenamed("testing_data_source","fonte_dados")
          .withColumnRenamed("year","ano")
          .withColumnRenamed("week","semana")
)

In [None]:
# Renomeando valores da coluna pais
df5 = df4.withColumn("pais", regexp_replace("pais","Sweden","Suécia")) \
                 .withColumn("pais", regexp_replace("pais","Germany","Alemanha")) \
                 .withColumn("pais", regexp_replace("pais","France","França")) \
                 .withColumn("pais", regexp_replace("pais","Greece","Grécia")) \
                 .withColumn("pais", regexp_replace("pais","Slovakia","Eslováquia")) \
                 .withColumn("pais", regexp_replace("pais","Belgium","Bélgica")) \
                 .withColumn("pais", regexp_replace("pais","Finland","Finlândia")) \
                 .withColumn("pais", regexp_replace("pais","Malta","Malta")) \
                 .withColumn("pais", regexp_replace("pais","Croatia","Croácia")) \
                 .withColumn("pais", regexp_replace("pais","Italy","Itália")) \
                 .withColumn("pais", regexp_replace("pais","Lithuania","Lituânia")) \
                 .withColumn("pais", regexp_replace("pais","Norway","Noruega")) \
                 .withColumn("pais", regexp_replace("pais","Spain","Espanha")) \
                 .withColumn("pais", regexp_replace("pais","Czechia","Tchéquia")) \
                 .withColumn("pais", regexp_replace("pais","Denmark","Dinamarca")) \
                 .withColumn("pais", regexp_replace("pais","Ireland","Irlanda")) \
                 .withColumn("pais", regexp_replace("pais","Liechtenstein","Liechtenstein")) \
                 .withColumn("pais", regexp_replace("pais","Iceland","Islândia")) \
                 .withColumn("pais", regexp_replace("pais","Cyprus","Chipre")) \
                 .withColumn("pais", regexp_replace("pais","Estonia","Estônia")) \
                 .withColumn("pais", regexp_replace("pais","Bulgaria","Bulgária")) \
                 .withColumn("pais", regexp_replace("pais","Austria","Áustria")) \
                 .withColumn("pais", regexp_replace("pais","Luxembourg","Luxemburgo")) \
                 .withColumn("pais", regexp_replace("pais","Netherlands","Países Baixos")) \
                 .withColumn("pais", regexp_replace("pais","Portugal","Portugal")) \
                 .withColumn("pais", regexp_replace("pais","Latvia","Letônia")) \
                 .withColumn("pais", regexp_replace("pais","Slovenia","Eslovênia")) \
                 .withColumn("pais", regexp_replace("pais","Poland","Polônia")) \
                 .withColumn("pais", regexp_replace("pais","Irlanda","Irlanda")) \
                 .withColumn("pais", regexp_replace("pais","Romania","Romênia")) \
                 .withColumn("pais", regexp_replace("pais","Hungary","Hungria")) 


In [None]:
#Visualizando tradução feita nas colunas
df5.show()

# VERIFICANDO E REMOVENDO AS INCONSISTÊNCIAS


In [None]:
#Verificando a quantidade de linhas
df5.count()
# total de  4649 linhas

In [None]:
#Verificando se tem linhas duplicadas 
df6 = df5.dropDuplicates()
df6.count()
#Não existe linhas duplicadas

Verificando todas as colunas e analisando possiveis mudanças

In [None]:
#Verificando todas as colunas do data frame
df6.select('pais').distinct().show(100,truncate = False)
#Dropar nivel, só tem um unico valor 'national'
#Coluna fonte de dados, tem valores nulos

Realizando o drop nas colunas e linhas

In [None]:
#Realizando drop nas colunas
df7 = df6.drop('nome_regiao','sigla','nivel')
df7.show()

In [None]:
#Verificando as linhas referente aos valores nulos da coluna "fonte_dados"
df8 = df7.where(~ F.col("fonte_dados").isNull())
df8.show()

In [None]:
#Verificando se sobrou valores nulos nas outras colunas
df_teste = df8.select([count(when
                   (isnan(c) | col(c).isNull(), c)).alias(c) 
for c in df8.columns]
   ).show()
#Obs: Depois que foi alterado o tipo da coluna ano, comecou a da um erro, 
#porém esse código serviu apenas para verificar os NaN restantes

In [None]:
#Substituindo os NAN por 0, afim de facilitar operações no insights
df9 = df8.fillna({'taxa_positiva': 0})
df10 = df9.fillna({'novos_casos': 0})

In [None]:
#Verificando novamente se os NAN foram substituidos 
df_limpo = df10.select([count(when
                   (isnan(c) | col(c).isNull(), c)).alias(c) 
for c in df9.columns]
   ).show()
#Obs: Depois que foi alterado o tipo da coluna ano, comecou a da um erro, 
#porém esse código serviu apenas para verificar os NaN restantes

In [None]:
#Verificando a quantidade de linhas
df10.count()
#Foram removidas  441 linhas no drop de linhas 

In [None]:
#Dropando a coluna fonte de dados
df11 = df10.drop('fonte_dados')

# POSSIVEIS ANÁLISES 

In [None]:
#Visualizando o data frame para extrair possiveis insght's 
df11.show()

Utilizando GroupBy, filtros e algumas funções

In [None]:
#Visualizando a quantidade de registro de cada pais
df11.groupBy('pais').count().orderBy(F.col('count').desc()).show(100)

In [None]:
#Verificando a quantidade de novos casos 
df11.groupBy('pais').sum('novos_casos').show(truncate = False)

In [None]:
#Verificando a quantidade de taxa de testes 
df11.groupBy('pais').sum('testes_feitos').show(truncate = False)

In [None]:
#Verificando a quantidade de casos em cada ano
df11.groupBy('ano').count().orderBy(F.col('count').desc()).show()

In [None]:
#Foi realizado um grupo by para ver os valores minimos e maximos da coluna taxa positiva
df11.groupBy('pais').agg(
    F.round(F.sum('taxa_positiva'),2).alias('taxa_positiva'), 
    F.min('taxa_positiva').alias('valor_min'),
    F.max('taxa_positiva').alias('valor_max')
    ).show()

In [None]:
#Realizando um filtro na Alemanha para  novos casos menores que 3000
df11.where((F.col("pais")=='Alemanha')).filter('novos_casos < 3000').show(20)

In [None]:
#Realizando um filtro na Alemanha para novos casos maiores que 3000
df11.where((F.col("pais")=='Alemanha')).filter('novos_casos > 3000').show(20)

Criando uma partição pela coluna teste_ok, trazendo a quantidade  de linhas que se repetem ordenando pelos novos casos do maior para menor.
1) Obersvamos que a quantidade de linhas com valores zeros é de 16. 
Após isso a nova contagem é realizada trazendo a quantidade de linhas com valores 1.0 sendo igual a 6 linhas
Ao especificar no show(100)temos uma melhor visualização

In [None]:
num_linha = Window.partitionBy('testes_feitos').orderBy(desc('novos_casos'))
df11.withColumn('numero_linha',row_number().over(num_linha)).show(100)

# CRIANDO UM CSV COM OS DADOS TRATADOS E ENVIANDO O CSV PARA A BUCKET 


In [None]:
#Convertendo data frame em arquivo csv
df11.write.format("parquet").mode("overwrite").save("/content/eu_covid_tratado.csv")


In [None]:
df_pd_1 = pd.read_parquet("/content/eu_covid_tratado.csv/part-00000-130d5b74-a971-4efc-ab08-892f83119a71-c000.snappy.parquet")

In [None]:
df_pd_1

In [None]:
df_pd_1.to_csv("/content/eu_covid_tratado2.csv", index = False)

In [None]:
df_teste = pd.read_csv("/content/eu_covid_tratado2.csv")
df_teste

In [None]:
# #Enviando data frame tradado em formato csv
df_pd_1.to_csv('gs://projfinal/tratados/df_pd_1_europa-covid-test.csv',storage_options={'token':'token:gcp'})


# ENVIANDO O ARQUIVO SPARK PARA O MONGODB


In [None]:
# Conectando com o MongoDB

uri = "uri_mongo"
client = MongoClient(uri,tls=True,tlsCertificateKeyFile='key_mongo')

In [None]:
# Criando coleções para enviar para o mongoDB

db = client['projfinal']
colecaotratado1 = db['tratado1']

In [None]:
#Enviando os Datasets tratados para o mongoDB

df_pd_1.reset_index(drop=True)
df01 = df_pd_1.to_dict("records")
colecaotratado1.insert_many(df01)

In [None]:
#Verificando se o arquivo foi enviado
colecaotratado1.count_documents({})