<a href="https://colab.research.google.com/github/caioitalo/soulcode-projetofinal/blob/main/projeto_final_pipeline_apache_beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# pip install --upgrade pip
# pip install apache_beam[interactive]
# pip install apache_beam[gcp]
# pip install gcsfs

#Pipeline

> Esse Colab Notebooks é referente a segunda parte do trabalho de conclusão de curso de Engenharia de dados 

> Para a criação do Dataframe de ‘dfcomb_etanol_trat’ os dados foram inseridos, transformados e normalizados por meio de uma PIPELINE com modelo criado em apache beam usando o dataflow para o work e enviados para um DataLake. Nesse dataframe temos uma a junção de dois dataframe anteriormentes tratados “dfbio_trat” e “dfprecos” que estavam do datalake Google Store. 





In [None]:
import apache_beam as beam
import os
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.textio import WriteToText

colunas_bio = ['','regiao','uf','produto','volume_m3','data']
colunas_preco = ['','data','regiao','estado','produto','postos_pesquisados','uni_medida','media_rev','desvio_rev','preco_min_rev','preco_max_rev','margem_rev','coef_var_rev','media_dist','dp_dist','preco_min_dist','preco_max_dist','coef_var_dist']

def lista_dicionario(elemento, colunas):
  return dict(zip(colunas, elemento))

def trata_data(elemento):
  # Recebe um dicionario e cria um novo campo com ANO-MES -  Retorna o mesmo dicionario com novo campo 
  elemento['ano_mes']= '-'.join(elemento['data'].split('-')[:2])
  return elemento

def chave_uf(elemento):
#  Receber um dicionario -   Retorna uma tupla com estado e o elemento(UF, dicionario )
  chave = elemento['uf']
  return (chave, elemento)

def volume(elemento):
  #  Recebe um tupla ('SAO PAULO', [{},{}]) -   Retorna uma tupla ('SAO PAULO', 8.0)

  uf, registros = elemento
  for registros in registros:
    yield (f"{uf}-{registros['ano_mes']}", float(registros['volume_m3']))

def chave_estado(elemento):
  chave = elemento['estado']
  return (chave, elemento)

def media_rev(elemento):
  estado, registros = elemento
  for registros in registros:
    yield (f"{estado}-{registros['ano_mes']}", float(registros['media_rev']))

def arredonda(elemento):
  #Recebe uma tupla e retorna uma tupla com valor arredondado  
  chave, valor = elemento
  return (chave, round(valor,2))

def filtra_campos_vazios(elemento):
  #Remove elementos que tenham chaves vazias] -   Receber uam tupla e retorna a mesma dupla sem campos vazios   
  chave, dados = elemento
  if all([
      dados['volume_m3'],
      dados['valor_media_rev']
      ]):
      return True
  return False

def descompactar_elementos(elemento):
  #Receber uma tupla ('DISTRITO FEDERAL-2015-10', {'volume_m3': [4.0], 'valor_media_rev': [11.67]})   Retorna uma tupla ('DISTRITO FEDERAL', '2015', '10', '4.0', '11.67')  
  chave, dados = elemento
  volume_m3 = dados['volume_m3'][0] #acessando o primeiro elemento dessa lista [0]
  valor_media_rev = dados['valor_media_rev'][0]
  uf, ano, mes = chave.split('-')
  return uf, ano, mes, str(volume_m3), str(valor_media_rev)  #transformar em str para poder usar o join posteriomente

def preparar_csv(elemento, deliminator=','):
  #Recebe uma tupla e retorna uma string delimitada "DISTRITO FEDERAL;2015;10;4.0;11.67"
  return f"{deliminator}".join(elemento)


pipeline_options = {
    'project':'sc-bc26-ed7',
    'runner':'DataflowRunner',
    'region':'southamerica-east1',
    'staging_location':'gs://projeto-final-equipe4/beam/staging/',
    'temp_location':'gs://projeto-final-equipe4/beam/temp/',
    'template_location':'gs://projeto-final-equipe4/beam/models/modelo_batch'
}

serviceAccount = '/content/sc-bc26-ed7-adb0dc2607d9.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = serviceAccount

pipeline_options = PipelineOptions.from_dictionary(pipeline_options)

p1 = beam.Pipeline(options=pipeline_options)

biocombustiveis = (
    p1
    |'Extrair do CSV bio'>> beam.io.ReadFromText('gs://projeto-final-equipe4/arquivos_trat/dfbio_trat', skip_header_lines=1)  
    |'Sep de dados'>> beam.Map(lambda record: record.split(','))
    |'Filt por produto'>> beam.Filter(lambda record: str(record[3])== 'HIDRATADO')
    |'Tranf lista para dic'>>beam.Map(lista_dicionario, colunas_bio)
    |'Criar Campo ano_mes'>>beam.Map(trata_data)
    |'Criar chave pelo uf'>> beam.Map(chave_uf)
    |'Agrupar pelo uf'>>beam.GroupByKey()
    |'Descompactar vol'>>beam.FlatMap(volume)
    |'Media dos vol pela chave'>> beam.CombinePerKey(sum)
    |'Arredondar resultados'>>beam.Map(arredonda)
    #|'Imprimir o resultado'>> beam.Map(print)
)

precos = (
    p1
    |'Extrair do CSV Preços'>> beam.io.ReadFromText('gs://projeto-final-equipe4/arquivos_trat/dfprecos.csv', skip_header_lines=1) 
    |'Sep de dados Preços'>> beam.Map(lambda record: record.split(',')) 
    |'Filt por prod Preços'>> beam.Filter(lambda record: str(record[4])== 'ETANOL HIDRATADO')
    |'Tranformar lista para dic Preços'>>beam.Map(lista_dicionario, colunas_preco)
    |'Criar Campo ano_mes Preços'>>beam.Map(trata_data) 
    |'Criar chave pelo est Preços'>> beam.Map(chave_estado)
    |'Agrupar estado Preços'>>beam.GroupByKey()
    |'Descompactar vol Preços'>>beam.FlatMap(media_rev)
    |'Media preços'>> beam.combiners.Mean.PerKey()
    |'Arredondar preços'>>beam.Map(arredonda)
    #|'Imprimir o resultado Dataset Preços'>> beam.Map(print)
)

resultado = (
    ({'volume_m3':biocombustiveis,'valor_media_rev':precos})
    |'Mesclar pcol'>>beam.CoGroupByKey()
    |'Filtrar dados vazios'>>beam.Filter(filtra_campos_vazios)
    |'Descompactar'>>beam.Map(descompactar_elementos)   
    |'Preparar csv'>>beam.Map(preparar_csv, deliminator=',')
    #|'Imprimir o resultado da união'>> beam.Map(print)
)

# uf, ano, mes, str(volume_m3), str(valor_media_rev)
header = 'UF,ano,volume_m3,valor_media_rev'
resultado |'Criar arquivo CSV'>> beam.io.WriteToText('gs://projeto-final-equipe4/arquivos_trat/dfcomb_etanol_trat', file_name_suffix='.csv', header=header)


p1.run()

<DataflowPipelineResult None at 0x7fddc31fd940>