# Combinando as pcollections

Agora nos resta agrupar os dois datasets! Isto é, unir os dois pcollections chuvas e dengue.

Devemos fazer alguns tratamentos pois existem datas que não batem nas pcollections. Como você deve ter percebido o tempo de leitura dos arquivos é muito grande, imagine agora que teremos que ler dois datasets juntos! Para facilitar nossa analise, vamos tirar uma amostra desses dois arquivos e criar outros dois arquivos com essa amostra com um volume menor de dados. Abra seu arquivo e selecione, eu irei fazer uma amostra com 2000 entradas. Se nosso tratamento funciona para um conjunto menor de dados, logo ele funciona para todo o conjunto.


# Refazendo as importações 

In [None]:
!pip install apache_beam
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(argv = None)
pipeline = beam.Pipeline(options = pipeline_options)

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


# Trazendo os métodos de tratamento

In [None]:
def TextoParaLista(elemento,delimitador = '|'):
  """
  Recebe um texto e um limitador
  Retorna uma lista de elementos pelo delimitador
  """
  return elemento.split(delimitador)

In [None]:
colunas_dengue = [ #todas as colunas que vem do dataset
'id',
'data_iniSE',
'casos',
'ibge_code',
'cidade',
'uf',
'cep',
'latitude',
'longitude']

In [None]:
def ListaParaDicionario(elemento, colunas):
  """
  Recebe duas listas e retorna um dicionário
  """
  return dict(zip(colunas, elemento))

In [None]:
def TrataDatas(elemento):
  """
  Recebe um dicionário e cria um novo campo com ano-mês
  """
  elemento['ano-mes'] = '-'.join(elemento['data_iniSE'].split('-')[:2])
  #Supondo que temos a data 2022-05-18 o split retornará uma lista com ['2022','05','18']
  #Então fariaremos a lista usando um [:2] no final pegando as informações até o segundo elemento
  #Aplicamos também o join com o parâmetro '-' que nos retornará uma STRING no formato 2022-05
  return elemento

In [None]:
def ChaveUf(elemento):
  """
  Receber um dicionário
  Retornar uma tupla com o estado(uf) e o elemento
  """
  chave = elemento['uf']
  return (chave, elemento)

In [None]:
def CasosDengue(elemento):
  """
  Recebe uma tupla (RS, [{},{}])
  Retorna uma tupla ('RS-2014-12, 8')
  """
  uf, registros = elemento
  for registro in registros:
    if bool(re.search(r'\d', registro['casos'])):
      yield (f"{uf}-{registro['ano-mes']}", float(registro['casos'])) #Diferente do return o yield vai executar até retornar todos os elementos
    else:
      yield (f"{uf}-{registro['ano-mes']}", 0.0)

In [None]:
def ChaveUfMes(elemento):
  """
  Receber uma lista de elementos
  Retorna uma tupla ('UF-ANO-MES', casos)
  """
  data, mm, uf = elemento
  ano_mes = '-'.join(data.split('-')[:2])
  chave = f'{uf}-{ano_mes}'
  if float(mm) < 0:
    mm = 0.0
  else:
    mm = float(mm)
  return chave, mm

In [None]:
def arredonda(elemento):
  """
  Recebe um elemento 
  retorna o float com uma ou duas casas decimais.
  """
  chave, mm = elemento
  return (chave, round(mm,1))

# Novos Métodos

Temos que evitar que existam espaços vazios no arquivo. Podemos ter varios casos de dengue num determinado mês e nenhuma chuva e então quando colocarmos o arquivo total, teremos vários buracos. Teremos que aplicar um filtro para remover esses casos

In [None]:
def FiltraCamposVazios(elemento):
  """
  Remove elementos que contenam chaves vazias
  """
  chave, dados = elemento
  if all([dados['chuvas'], dados['dengue']]):
      return True
  return False

In [None]:
def DescompactarElementos(elem):
  """
  Recebe uma tupla ('CE-2015-01', {'chuvas': [85.8], 'dengue': [175.0]})
  retorna uma tupla ('CE', '2015', '11', '0.4', '21.0')
  """
  chave, dados = elem
  chuva = dados['chuvas'][0]
  dengue = dados['dengue'][0]
  uf, ano, mes = chave.split('-')
  return uf, ano, mes, str(chuva), str(dengue)

In [None]:
def PrepararCSV(elem, delimitador = ';'):
  """
  Recebe uma tupla ('CE', 2015, 01, 85.8, 175.0)
  Retornar uma string delimitada "CE;2015;01;85.8;175.0"
  """
  return f"{delimitador}".join(elem)

# Pcollections

In [None]:
from apache_beam.io.textio import WriteToText

dengue = (
    pipeline
    | "Leitura do dataset de dengue" >>
      ReadFromText('sample_casos_dengue.txt', skip_header_lines=1) #pular a primeira linha com o skip_header
    | "De texto para lista"  >> beam.Map(TextoParaLista)
    | "Lista para dicionário" >> beam.Map(ListaParaDicionario, colunas_dengue)
    | "Criar campo ano-mês" >> beam.Map(TrataDatas)
    | "Criar chave pelo estado" >> beam.Map(ChaveUf)
    | "Agrupor por estado" >> beam.GroupByKey()
    | "Descompactar casos de dengue" >> beam.FlatMap(CasosDengue)
    | "Soma dos casos pela chave" >> beam.CombinePerKey(sum) #lembrar de converter os dados para ponto flutuante
    #| "Mostrar resultados" >> beam.Map(print) 

)


chuvas = (
    pipeline
    | "Leitura do dataset de chuvas" >>
      ReadFromText('sample_chuvas.csv', skip_header_lines=1)
    | "De texto para lista (chuvas)"  >> beam.Map(TextoParaLista,delimitador = ',') #reutilizando o método anterior só mudando separador
    | "Criando chave UF-ANO-MES" >> beam.Map(ChaveUfMes)
    | "Combinando o total de chuvas pela chave" >> beam.CombinePerKey(sum)
    | "Arredondar resultados de chuvas" >> beam.Map(arredonda)
    #| "Mostrar resultados das chuvas" >> beam.Map(print) 
)


# Juntaremos as Pcollections para a mesma chave.
# Somente com o tratamento abaixo:
#    | "Mesclar pcols" >> beam.CoGroupByKey()
#    | "Filtrar dados vazios" >> beam.Filter(FiltraCamposVazios)
# Tereiamos somente um resultado com ('CE-2015-01', {'chuvas': [85.8], 'dengue': [175.0]})
# Precisamos descompactar esses dados na forma ('CE', 2015, 11, 0.4, 21.0)
#    | "Descompactar elementos" >> beam.Map(DescompactarElementos)
# Com esse acréscimo temos uma tupla com os elementos tratados
# Agora precisamos retornar esse resultado em um arquivo de texto separado por vírgulas
# Faremos a seguinte transformações
# ('CE', 2015, 1, 85.8, 175.0) => "CE,2015,01,85.8,175.0"
# Para esse tratamento e salvar num novo CSV usaremos a seguinte linha de comando: 
#     | "Preparar csv" >> beam.Map(PrepararCSV)
resultado = (
    
    ({'chuvas': chuvas,'dengue': dengue})
    | "Mesclar pcols" >> beam.CoGroupByKey()
    | "Filtrar dados vazios" >> beam.Filter(FiltraCamposVazios)
    | "Descompactar elementos" >> beam.Map(DescompactarElementos)
    | "Preparar csv" >> beam.Map(PrepararCSV)
    #| "Mostrar resultados da União" >> beam.Map(print)
)

header = 'UF;ANO;MES;CHUVA;DENGUE'

resultado | 'Criar arquivo CSV' >> WriteToText('resultado', file_name_suffix='.csv', header = header)

pipeline.run()




<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fe671c87610>