Introdução ao Apache Beam!


In [2]:
#INSTALANDO O BEAM
#!pip install apache-beam[interactive]

#INSTALANDO OS RECURSOS PARA QUE O BEAM POSSA INTERAGIR COM O CLOUD
!pip install apache-beam[gcp]




Construindo um Pipeline

* skip_header_lines=1: Informa ao pipeline quantas linhas devem ser saltadas, em caso de erroneamente usar uma linha de dados como cabeçalho.

* ReadFromText: Lê um arquivo de um determinado tipo. Pode ser texto, avro, etc. Como o documento usado é CSV, usa-se Text.

* WriteToText: Salva um arquivo em um determinado tipo, que deve especificado.

* Map(lambda record: record.split(','))
  * Map percorre todo o arquivo CSV
  * record.split() modifica o separador, dependendo do tipo que for definido.

In [None]:
#IMPORTANDO O BEAM
import apache_beam as beam
"""
#CRIANDO UM PIPELINE
p1 = beam.Pipeline()

#DEFININDO OS PASSOS DO PIPELINE
voos = (
    p1 #p1 está recebendo esses passos
    |'Extrair os Dados' >> beam.io.ReadFromText('/content/drive/MyDrive/voos_sample.csv', skip_header_lines=1) #Rótulo do passo que está sendo executado - Passo executa funcao
    |'Separador do Dataframe' >> beam.Map(lambda record: record.split(',')) #Informando o tipo do separador
  # |'Saída de Dados' >> beam.Map(print) #Mostra o Dataframe
    |'Gravar Resultado' >> beam.io.WriteToText('resultado.txt') #Salva o resultado em um documento de texto
)

#EXECUTANDO O PIPELINE
p1.run() 
"""

"\n#CRIANDO UM PIPELINE\np1 = beam.Pipeline()\n\n#DEFININDO OS PASSOS DO PIPELINE\nvoos = (\n    p1 #p1 está recebendo esses passos\n    |'Extrair os Dados' >> beam.io.ReadFromText('/content/drive/MyDrive/voos_sample.csv', skip_header_lines=1) #Rótulo do passo que está sendo executado - Passo executa funcao\n    |'Separador do Dataframe' >> beam.Map(lambda record: record.split(',')) #Informando o tipo do separador\n  # |'Saída de Dados' >> beam.Map(print) #Mostra o Dataframe\n    |'Gravar Resultado' >> beam.io.WriteToText('resultado.txt') #Salva o resultado em um documento de texto\n)\n\n#EXECUTANDO O PIPELINE\np1.run() \n"

Forma alternativa de construir um Pipeline

* A função de filtragem substitui a função anônima que poderia ser usada da mesma forma.

* FlatMap cria linhas ao inves de colunas

In [None]:
#CRIANDO UMA FUNCAO DE FILTRAGEM
palavras=['quatro','que']

def rastrearPalavras(i): #Recebe cada elemento rastreado no Map
  if i in palavras: #Confere o valor de cada elemento
    return True #Caso atenda a condicao, diz ao beam que o valor deve ser filtrado

p2 = beam.Pipeline()

poema = (
    p2
    |'Extrair os Dados' >> beam.io.ReadFromText('/content/drive/MyDrive/poema.txt')
    |'Separador do Dataframe' >> beam.FlatMap(lambda record: record.split(' ')) 
    |'Encontrar Palavras' >> beam.Filter(rastrearPalavras) #Chamando a funcao
    |'Saída de Dados' >> beam.FlatMap(print)
   #|'Gravar Resultado' >> beam.io.WriteToText('resultado.txt')
)
p2.run()



quatro
que
quatro
quatro
que
quatro


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f67e8bd6e90>

Inserindo passos para tratamento de dados

In [None]:
#CRIANDO UM PIPELINE
p1 = beam.Pipeline()

#DEFININDO OS PASSOS DO PIPELINE
voos = (
    p1 #p1 está recebendo esses passos
    |'Extrair os Dados' >> beam.io.ReadFromText('/content/drive/MyDrive/voos_sample.csv', skip_header_lines=1) #Rótulo do passo que está sendo executado - Passo executa funcao
    |'Separador do Dataframe' >> beam.Map(lambda record: record.split(',')) #Informando o tipo do separador
    |'Filtrar por JFK' >> beam.Filter(lambda record: record[3] == 'JFK')
    |'Saída de Dados' >> beam.Map(print) #Mostra o Dataframe
   #|'Gravar Resultado' >> beam.io.WriteToText('resultado.txt') #Salva o resultado em um documento de texto
)

#EXECUTANDO O PIPELINE
p1.run()



['2019-04-27', '19805', '3', 'JFK', 'LAX', '1224', '-6', '1614', '39', '371', '2475', '3']
['2019-04-27', '19805', '9', 'JFK', 'LAX', '648', '-7', '1029', '19', '365', '2475', '9']
['2019-04-27', '19805', '19', 'JFK', 'LAX', '1024', '-6', '1353', '18', '359', '2475', '17']
['2019-04-27', '19805', '21', 'JFK', 'LAX', '1906', '-4', '2246', '16', '359', '2475', '19']


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f67eaf52650>

Combinando dados

* CombinePerKey entende a primeira coluna como chave e a segunda coluna como valor.

In [None]:
#IMPORTANDO O BEAM
import apache_beam as beam

#CRIANDO UM PIPELINE
p3 = beam.Pipeline()

#DEFININDO OS PASSOS DO PIPELINE
voosComAtrasoTempo = (
    p3 #p3 está recebendo esses passos
    |'Extrair os Dados' >> beam.io.ReadFromText('/content/drive/MyDrive/voos_sample.csv', skip_header_lines=0) #Rótulo do passo que está sendo executado - Passo executa funcao
    |'Separador do Dataframe' >> beam.Map(lambda record: record.split(',')) #Informando o tipo do separador
    |'Aeroportos Atrasados' >> beam.Filter(lambda record: int(record[8]) > 0) #Se maior que zero, houve atraso no aeroporto
    |'Agregacao de Colunas' >> beam.Map(lambda record: (record[3],int(record[8]))) #Apresentará apenas duas colunas
    |'Combinacao de Dados' >> beam.CombinePerKey(sum) #Soma os valores de colunas iguais que foram retornados
   #|'Saída de Dados' >> beam.Map(print) #Mostra o Dataframe
   #|'Gravar Resultado' >> beam.io.WriteToText('resultado.txt') #Salva o resultado em um documento de texto
)

voosComAtrasoQuantidade = (
    p3 #p3 está recebendo esses passos - quando usado novamente, ele fica armazenado em outro lugar.
    |'1Extrair os Dados' >> beam.io.ReadFromText('/content/drive/MyDrive/voos_sample.csv', skip_header_lines=0) #Rótulo do passo que está sendo executado - Passo executa funcao
    |'1Separador do Dataframe' >> beam.Map(lambda record: record.split(',')) #Informando o tipo do separador
    |'1Aeroportos Atrasados' >> beam.Filter(lambda record: int(record[8]) > 0) #Se maior que zero, houve atraso no aeroporto
    |'1Agregacao de Colunas' >> beam.Map(lambda record: (record[3],int(record[8]))) #Apresentará apenas duas colunas
    |'1Quantidade de Atrasos' >> beam.combiners.Count.PerKey() #Conta quantas vezes tal coluna retornou um valor
   #|'1Saída de Dados' >> beam.Map(print) #Mostra o Dataframe
   #|'1Gravar Resultado' >> beam.io.WriteToText('resultado.txt') #Salva o resultado em um documento de texto
)

#RESULTADO FINAL
tabelaFinal = (
    {'atrasoTempo':voosComAtrasoTempo, 'atrasoQuantidade':voosComAtrasoQuantidade} #Passa os pipes como dicionarios
    |'Agrupamento dos Pipelines' >> beam.CoGroupByKey() #Agrupa os resultados das pipelines
    |'Imprimir Tabela Final' >> beam.Map(print)  #Mostra o Dataframe
    |'Salvar Tabela Final' >> beam.io.WriteToText('resultado.csv') #Salva o resultado em um documento de texto
)

#EXECUTANDO O PIPELINE
p3.run()



('JFK', {'atrasoTempo': [94], 'atrasoQuantidade': [5]})
('DFW', {'atrasoTempo': [153], 'atrasoQuantidade': [2]})
('OGG', {'atrasoTempo': [95], 'atrasoQuantidade': [1]})
('LAX', {'atrasoTempo': [12], 'atrasoQuantidade': [2]})
('SFO', {'atrasoTempo': [208], 'atrasoQuantidade': [2]})


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f67e7796e90>

Salvando o Pipeline em Cloud


In [5]:
import apache_beam as beam
import os #Importando o sistema operacional

#ATRIBUINDO A CHAVE DE SERVIÇO DO GOOGLE CLOUD À UMA VARIÁVEL
serviceAccount = '/content/drive/MyDrive/ingest-ana-304af4810579.json' 

#DANDO AUTORIZACAO AO SISTEMA OPERACIONAL
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = serviceAccount

#CRIANDO UM PIPELINE
p4 = beam.Pipeline()

#DEFININDO OS PASSOS DO PIPELINE
voosComAtrasoTempo = (
    p4 #p3 está recebendo esses passos
    |'Extrair os Dados' >> beam.io.ReadFromText('/content/drive/MyDrive/voos_sample.csv', skip_header_lines=0) #Rótulo do passo que está sendo executado - Passo executa funcao
    |'Separador do Dataframe' >> beam.Map(lambda record: record.split(',')) #Informando o tipo do separador
    |'Aeroportos Atrasados' >> beam.Filter(lambda record: int(record[8]) > 0) #Se maior que zero, houve atraso no aeroporto
    |'Agregacao de Colunas' >> beam.Map(lambda record: (record[3],int(record[8]))) #Apresentará apenas duas colunas
    |'Combinacao de Dados' >> beam.CombinePerKey(sum) #Soma os valores de colunas iguais que foram retornados
)

voosComAtrasoQuantidade = (
    p4 #p3 está recebendo esses passos - quando usado novamente, ele fica armazenado em outro lugar.
    |'1Extrair os Dados' >> beam.io.ReadFromText('/content/drive/MyDrive/voos_sample.csv', skip_header_lines=0) #Rótulo do passo que está sendo executado - Passo executa funcao
    |'1Separador do Dataframe' >> beam.Map(lambda record: record.split(',')) #Informando o tipo do separador
    |'1Aeroportos Atrasados' >> beam.Filter(lambda record: int(record[8]) > 0) #Se maior que zero, houve atraso no aeroporto
    |'1Agregacao de Colunas' >> beam.Map(lambda record: (record[3],int(record[8]))) #Apresentará apenas duas colunas
    |'1Quantidade de Atrasos' >> beam.combiners.Count.PerKey() #Conta quantas vezes tal coluna retornou um valor
)

#RESULTADO FINAL
tabelaFinal = (
    {'atrasoTempo':voosComAtrasoTempo, 'atrasoQuantidade':voosComAtrasoQuantidade} #Passa os pipes como dicionarios
   #|'Agrupamento dos Pipelines' >> beam.CoGroupByKey() #Agrupa os resultados das pipelines
   #|'Salvando no Stoage' >> beam.io.WriteToText('gs://data-lake-ana/entrada/teste.csv') #Enviando o resultado para o bucket no GCP
)

#EXECUTANDO O PIPELINE
p4.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f93393a2e50>