Notebook: Igor R. Amorim

# Apache Beam



- É uma linguagem de modelagem Open-Source, a qual você pode gerar modelos de pipeline em diversas linguagens SDK's diferentes, tambem open-source.

- A modelagem em Beam permite uma divisão perfeitamente paralela entre as operações (pernas) de um processamento de dados. Ela é adequada para o uso de uma computação em cluster.

- Essas tarefas são úteis para mover dados entre diferentes locais de armazenamento e/ou fontes de dados, transformar, filtrar e limpar dados ou carregar dados em um novo sistema.

- As pipelines servem principalmente para facilitar trabalhos recorrentes.

- Todas as vezes que for utilizar pipelines com GCP é preciso ativar o GCP, exceto quando usar Big Query e API internamente.

# Bibliotecas

In [None]:
# No Colab, é necessário fazer a atualização do pip, para sincronizar as versões. 

!pip install --upgrade pip

Reinicie a maquina antes de dar continuidade, mesmo que não seja solicitado, a fim de evitar erros.

In [None]:
# instala primeiro o interactive antes do gcp

#!pip install apache-beam[interactive]
!pip install apache_beam[interactive]

Reinicie novamente a maquina antes de dar continuidade, a fim de evitar erros.

In [None]:
# não use o comando forçado com -u ou algo do tipo,
# pois isso pode retirar funções essenciais da local-machine do Colab

!pip install apache_beam[gcp]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting google-cloud-videointelligence<2,>=1.8.0
  Downloading google_cloud_videointelligence-1.16.3-py2.py3-none-any.whl (183 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m183.9/183.9 kB[0m [31m8.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting grpcio-gcp<1,>=0.2.2
  Downloading grpcio_gcp-0.2.2-py2.py3-none-any.whl (9.4 kB)
Collecting google-apitools<0.5.32,>=0.5.31
  Downloading google-apitools-0.5.31.tar.gz (173 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m173.5/173.5 kB[0m [31m17.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting google-cloud-bigtable<2,>=0.31.1
  Downloading google_cloud_bigtable-1.7.2-py2.py3-none-any.whl (267 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m267.7/267.7 kB[0m [31m18.7 MB/s[0m eta [36m0:00:00[0m
Collecting google-cloud-vision<2,>

Obedeça o aviso e reinie o runtime mais uma vez

In [None]:
import apache_beam as beam

In [None]:
# Conectando o Colab com o GoogleDrive que é onde mantenho minha chave json
# para acessar os os buckets dentro da gcp e onde mantenho anguns dataset's
# mais leves para estudo.

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Operações com o Modelo em Apache Beam

## Ler e exibir um Arquivo

In [None]:
# Para Beam, todos os arquivos são lidos os caracteres como bit, portanto 
# independente da extenção vamos usar o metodo .ReadFromText().

# A Posição da lista começa em 0 (zero).

p1 = beam.Pipeline()

voos = (
    p1
    |'Passo1: Extrair do .csv'>> beam.io.ReadFromText('/content/drive/MyDrive/Datasets/voos.csv', skip_header_lines=1) # serve para qlqr arquivo isolado csv, xml, xls etc
    |'Passo2: Separador do .csv'>> beam.Map(lambda record: record.split(','))
    |'Passo3: Exibir o resultado'>> beam.Map(print)
)

p1.run()

['1', '1', '2015', 'AS', 'ANC', 'SEA', '5', '2354.0', '-11.0', '205.0', '430', '-22.0']
['1', '1', '2015', 'AA', 'LAX', 'PBI', '10', '2.0', '-8.0', '280.0', '750', '-9.0']
['1', '1', '2015', 'US', 'SFO', 'CLT', '20', '18.0', '-2.0', '286.0', '806', '5.0']
['1', '1', '2015', 'AA', 'LAX', 'MIA', '20', '15.0', '-5.0', '285.0', '805', '-9.0']
['1', '1', '2015', 'AS', 'SEA', 'ANC', '25', '24.0', '-1.0', '235.0', '320', '-21.0']
['1', '1', '2015', 'DL', 'SFO', 'MSP', '25', '20.0', '-5.0', '217.0', '602', '8.0']
['1', '1', '2015', 'NK', 'LAS', 'MSP', '25', '19.0', '-6.0', '181.0', '526', '-17.0']
['1', '1', '2015', 'US', 'LAX', 'CLT', '30', '44.0', '14.0', '273.0', '803', '-10.0']
['1', '1', '2015', 'AA', 'SFO', 'DFW', '30', '19.0', '-11.0', '195.0', '545', '-13.0']
['1', '1', '2015', 'DL', 'LAS', 'ATL', '30', '33.0', '3.0', '221.0', '711', '-15.0']
['1', '1', '2015', 'DL', 'DEN', 'ATL', '30', '24.0', '-6.0', '173.0', '523', '-30.0']
['1', '1', '2015', 'AA', 'LAS', 'MIA', '35', '27.0', '-8.0'

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f9c84055450>

In [None]:
# Criando Arquivo a partir de dados brutos

p1 = beam.Pipeline()

voos = (
    p1
    |'Passo1: Extrair do .csv'>> beam.io.ReadFromText('/content/drive/MyDrive/Datasets/voos.csv', skip_header_lines=1) 
    # .ReadFromText SERVE PARA QUALQUER ARQUIVO ISOLADO csv, xml, xls, ETC. PARA O BEAM, TODOS SÃO ARQUIVOS DE TEXTO.
    |'Passo2: Separador do .csv'>> beam.Map(lambda record: record.split(','))
    #|'Passo3: Exibir o resultado'>> beam.Map(print) 
    # SE TENTAR GRAVAR DEPOIS DE PRINTAR, A MEMORIA VAI ESTAR VAZIA POIS O LOCAL APONTADO ONDE ESTAVA OS DADOS JÁ FOI DESCARREGADO NO PRINT
    |'Passo3: Gravar o resultado'>> beam.io.WriteToText('Voos_Brutos.txt')
    # NÃO DA PARA USAR O PRINT E O WRITE NA MESMA BUNDLE DEVIDO A VOLATILIDADE.
)

p1.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f37d356fbd0>

## Filtros

### Filtros sobre strings

In [None]:
# Filtrando por Aeroporto de origem

# Ao rodar, ele vai trazer todos os SFO, já que passamos essa posição da coluna.
# Skip_header zero é para não pular as nomenclaturas da tabela.
# O número em record[ ] define a coluna.

p1 = beam.Pipeline()

voos = (
    p1
    |'Passo1: Extrair do .csv'>> beam.io.ReadFromText('/content/drive/MyDrive/Datasets/voos.csv', skip_header_lines=0)
    # .ReadFromText SERVE PARA QUALQUER ARQUIVO ISOLADO csv, xml, xls, ETC. PARA O BEAM, TODOS SÃO ARQUIVOS DE TEXTO.
    |'Passo2: Separador do .csv'>> beam.Map(lambda record: record.split(','))
    |'Passo3: Filtrar aeroporto'>> beam.Filter(lambda record: record[4] == 'SFO')
    |'Passo4: Exibir o resultado'>> beam.Map(print) 
   #|'Passo4: Gravar o resultado'>> beam.io.WriteToText('Voos_Filtrados.txt')
    # NÃO DA PARA USAR O PRINT E O WRITE NA MESMA BUNDLE DEVIDO A VOLATILIDADE.
)

p1.run()

In [None]:
# Filtrando por companhia aerea

p1 = beam.Pipeline()

voos = (
    p1
    |'Passo1: Extrair do .csv'>> beam.io.ReadFromText('/content/drive/MyDrive/Datasets/voos.csv', skip_header_lines=0)
    # .ReadFromText SERVE PARA QUALQUER ARQUIVO ISOLADO csv, xml, xls, ETC. PARA O BEAM, TODOS SÃO ARQUIVOS DE TEXTO.
    |'Passo2: Separador do .csv'>> beam.Map(lambda record: record.split(','))
    |'Passo3: Filtrar aeroporto'>> beam.Filter(lambda record: record[3] == 'DL')
    |'Passo4: Exibir o resultado'>> beam.Map(print)
   #|'Passo4: Gravar o resultado'>> beam.io.WriteToText('Voos_Filtrados.txt')
    # NÃO DA PARA USAR O PRINT E O WRITE NA MESMA BUNDLE DEVIDO A VOLATILIDADE.
)

p1.run()

### Filtros Numéricos (float ou integer)

In [None]:
# Filtrando por atraso na partida (caso numérico)
# O convencional é filtrar por atraso na chegada e não na saída do avião.

p1 = beam.Pipeline()

voos = (
    p1
    |'Passo1: Extrair do .csv'>> beam.io.ReadFromText('/content/drive/MyDrive/Datasets/voos.csv', skip_header_lines=1)
    # .ReadFromText SERVE PARA QUALQUER ARQUIVO ISOLADO csv, xml, xls, ETC. PARA O BEAM, TODOS SÃO ARQUIVOS DE TEXTO.
    |'Passo2: Separador do .csv'>> beam.Map(lambda record: record.split(','))
    |'Passo3: Filtrar aeroporto'>> beam.Filter(lambda record: float(record[8]) > 0.0)
    # Se não pular o header, ou seja, pular a primeira linha, haverá um erro, pois a primeira linha são os atributos do DataFrame.
    # como o registro é lido bit a bit no pipeline antes de qualquer comparação numerica é necessario convertê-lo
    |'Passo4: Exibir o resultado'>> beam.Map(print)
   #|'Passo4: Gravar o resultado'>> beam.io.WriteToText('Voos_Filtrados.txt')
    # NÃO DA PARA USAR O PRINT E O WRITE NA MESMA BUNDLE DEVIDO A VOLATILIDADE.
)

p1.run()

## Agrupamentos


In [None]:
# Filtrando por atraso na partida e somando o total de minutos por 
# aeroporto de origem

p1 = beam.Pipeline()

qtd_tempo_de_atraso = (
    p1
    |'Passo1: Extrair do .csv'>> beam.io.ReadFromText('/content/drive/MyDrive/Datasets/voos.csv', skip_header_lines=1)
    # .ReadFromText SERVE PARA QUALQUER ARQUIVO ISOLADO csv, xml, xls, ETC. PARA O BEAM, TODOS SÃO ARQUIVOS DE TEXTO.
    |'Passo2: Separador do .csv'>> beam.Map(lambda record: record.split(','))
    |'Passo3: Filtrar aeroporto'>> beam.Filter(lambda record: float(record[8]) > 0.0)
    |'Passo4: Agregar as colunas'>> beam.Map(lambda record: (record[4],float(record[8])))
    |'Passo5: Construir uma nova tabela'>> beam.CombinePerKey(sum)
    # Neste caso, o Beam agrupa automaticamente por resultados iguais de mesma chave de filtro.
    |'Passo6: Exibir o resultado'>> beam.Map(print)
   #|'Passo6: Gravar o resultado'>> beam.io.WriteToText('Voos_Filtrados.txt')
    # NÃO DA PARA USAR O PRINT E O WRITE NA MESMA BUNDLE DEVIDO A VOLATILIDADE.
)

p1.run()

('LAX', 281.0)
('LAS', 40.0)
('MSP', 67.0)
('DEN', 528.0)
('PHX', 1087.0)
('SJU', 387.0)
('BQN', 5.0)
('ONT', 335.0)
('ORD', 902.0)
('DFW', 1176.0)
('MKE', 48.0)
('PDX', 85.0)
('IAH', 205.0)
('BOI', 280.0)
('PHL', 175.0)
('SFO', 465.0)
('IAD', 96.0)
('JFK', 348.0)
('SMF', 97.0)
('MCI', 14.0)
('ATL', 122.0)
('DCA', 189.0)
('BOS', 206.0)
('CLE', 217.0)
('BFL', 6.0)
('SAT', 89.0)
('OMA', 27.0)
('MCO', 414.0)
('BUF', 276.0)
('EWR', 134.0)
('PIT', 73.0)
('ABQ', 38.0)
('PVD', 20.0)
('SEA', 144.0)
('MIA', 339.0)
('SLC', 243.0)
('BWI', 145.0)
('LGA', 274.0)
('OKC', 36.0)
('ATW', 1.0)
('MEM', 23.0)
('FLL', 152.0)
('SAV', 87.0)
('FAR', 8.0)
('ERI', 16.0)
('ACY', 102.0)
('TUS', 15.0)
('SPI', 4.0)
('CLD', 63.0)
('PSC', 1.0)
('CPR', 4.0)
('DAL', 52.0)
('CID', 10.0)
('PSP', 513.0)
('CAE', 3.0)
('PBI', 28.0)
('RDU', 58.0)
('GUC', 7.0)
('MLU', 32.0)
('KOA', 6.0)
('CMH', 160.0)
('LAW', 364.0)
('CHS', 8.0)
('DRO', 16.0)
('ROC', 3.0)
('FSD', 12.0)
('RSW', 67.0)
('BDL', 134.0)
('AUS', 173.0)
('HNL', 83.0)

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f9c82a0bad0>

In [None]:
# Filtrando por saída atrasada com total de vezes por aeroporto de origem

p1 = beam.Pipeline()

qtd_atrasos = (
    p1
    |'Passo1: Extrair do .csv'>> beam.io.ReadFromText('/content/drive/MyDrive/Datasets/voos.csv', skip_header_lines=1)
    # .ReadFromText SERVE PARA QUALQUER ARQUIVO ISOLADO csv, xml, xls, ETC. PARA O BEAM, TODOS SÃO ARQUIVOS DE TEXTO.
    |'Passo2: Separador do .csv'>> beam.Map(lambda record: record.split(','))
    |'Passo3: Filtrar aeroporto'>> beam.Filter(lambda record: float(record[8]) > 0.0)
    |'Passo4: Agregar as colunas'>> beam.Map(lambda record: (record[4],float(record[8])))
    |'Passo5: Construir uma nova tabela'>> beam.combiners.Count.PerKey()
    |'Passo6: Exibir o resultado'>> beam.Map(print)
   #|'Passo6: Gravar o resultado'>> beam.io.WriteToText('Voos_Filtrados.txt')
    # NÃO DA PARA USAR O PRINT E O WRITE NA MESMA BUNDLE DEVIDO A VOLATILIDADE.
)

p1.run()



('LAX', 12)
('LAS', 6)
('MSP', 7)
('DEN', 25)
('PHX', 25)
('SJU', 8)
('BQN', 2)
('ONT', 6)
('ORD', 23)
('DFW', 30)
('MKE', 5)
('PDX', 5)
('IAH', 9)
('BOI', 5)
('PHL', 3)
('SFO', 16)
('IAD', 10)
('JFK', 13)
('SMF', 5)
('MCI', 3)
('ATL', 10)
('DCA', 6)
('BOS', 13)
('CLE', 6)
('BFL', 1)
('SAT', 5)
('OMA', 2)
('MCO', 7)
('BUF', 5)
('EWR', 15)
('PIT', 3)
('ABQ', 3)
('PVD', 5)
('SEA', 17)
('MIA', 12)
('SLC', 17)
('BWI', 9)
('LGA', 9)
('OKC', 5)
('ATW', 1)
('MEM', 2)
('FLL', 14)
('SAV', 2)
('FAR', 2)
('ERI', 1)
('ACY', 1)
('TUS', 4)
('SPI', 1)
('CLD', 2)
('PSC', 1)
('CPR', 1)
('DAL', 7)
('CID', 1)
('PSP', 7)
('CAE', 2)
('PBI', 2)
('RDU', 2)
('GUC', 1)
('MLU', 2)
('KOA', 1)
('CMH', 5)
('LAW', 1)
('CHS', 1)
('DRO', 1)
('ROC', 1)
('FSD', 2)
('RSW', 4)
('BDL', 4)
('AUS', 8)
('HNL', 8)
('COS', 2)
('TPA', 2)
('JAX', 1)
('SHV', 3)
('DTW', 9)
('MSN', 1)
('SNA', 7)
('LIT', 2)
('LNK', 1)
('AVL', 1)
('HSV', 2)
('XNA', 1)
('SAN', 3)
('TTN', 3)
('ALB', 1)
('ISP', 2)
('MDW', 3)
('FAT', 2)
('BUR', 3)
('ORF'

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f9c83a38550>

## Co GroupBy Key: Agrupamento de elementos por cada chave distinta

In [None]:
# Criar nova tabela com dados agregados de duas pipes
p1 = beam.Pipeline()

qtd_atrasos = (
    p1
    |'Passo1: Extrair do .csv'>> beam.io.ReadFromText('/content/drive/MyDrive/Datasets/voos.csv', skip_header_lines=1)
    |'Passo2: Separador do .csv'>> beam.Map(lambda record: record.split(','))
    |'Passo3: Filtrar aeroporto'>> beam.Filter(lambda record: float(record[8]) > 0.0)
    |'Passo4: Agregar as colunas'>> beam.Map(lambda record: (record[4],float(record[8])))
    |'Passo5: Construir uma nova tabela'>> beam.combiners.Count.PerKey()
)


soma_tempo_de_atraso = (
    p1
    |'Passo7: Extrair do .csv'>> beam.io.ReadFromText('/content/drive/MyDrive/Datasets/voos.csv', skip_header_lines=1)
    |'Passo8: Separador do .csv'>> beam.Map(lambda record: record.split(','))
    |'Passo9: Filtrar aeroporto'>> beam.Filter(lambda record: float(record[8]) > 0.0)
    |'Passo10: Agregar as colunas'>> beam.Map(lambda record: (record[4],float(record[8])))
    |'Passo11: Construir uma nova tabela'>> beam.CombinePerKey(sum)
)



tabela = (
    {'Quantidade_minutos':soma_tempo_de_atraso, 'Numero_de_atrasos':qtd_atrasos}
    |'Passo1: Agrupar as penas' >> beam.CoGroupByKey()
    |'Passo2: Resultado' >> beam.Map(print)
)

p1.run()

('LAX', {'Quantidade_minutos': [281.0], 'Numero_de_atrasos': [12]})
('LAS', {'Quantidade_minutos': [40.0], 'Numero_de_atrasos': [6]})
('MSP', {'Quantidade_minutos': [67.0], 'Numero_de_atrasos': [7]})
('DEN', {'Quantidade_minutos': [528.0], 'Numero_de_atrasos': [25]})
('PHX', {'Quantidade_minutos': [1087.0], 'Numero_de_atrasos': [25]})
('SJU', {'Quantidade_minutos': [387.0], 'Numero_de_atrasos': [8]})
('BQN', {'Quantidade_minutos': [5.0], 'Numero_de_atrasos': [2]})
('ONT', {'Quantidade_minutos': [335.0], 'Numero_de_atrasos': [6]})
('ORD', {'Quantidade_minutos': [902.0], 'Numero_de_atrasos': [23]})
('DFW', {'Quantidade_minutos': [1176.0], 'Numero_de_atrasos': [30]})
('MKE', {'Quantidade_minutos': [48.0], 'Numero_de_atrasos': [5]})
('PDX', {'Quantidade_minutos': [85.0], 'Numero_de_atrasos': [5]})
('IAH', {'Quantidade_minutos': [205.0], 'Numero_de_atrasos': [9]})
('BOI', {'Quantidade_minutos': [280.0], 'Numero_de_atrasos': [5]})
('PHL', {'Quantidade_minutos': [175.0], 'Numero_de_atrasos': 

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f9c7ea8fbd0>