<a href="https://colab.research.google.com/github/felipecampelo/PipelineApacheBeam/blob/main/ApacheBeam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### ❗ Apache Beam ❗

`Passo a passo para instalação do Apache Beam no Colab:`

`1)` pip install --upgrade pip

`2)` pip install apache-beam[interactive]

`3)` Reiniciar ambiente de execução

`4)` pip install apache-beam[gcp]

`5)` Reiniciar ambiente de execução

`6)` import apache_beam as beam

`OBS: Nunca usar funções de exibição e gravação na mesma pipeline`

In [None]:
# Necessário atualizar o pip para instalação do Beam
pip install --upgrade pip

In [None]:
# Instalando o apache-beam[interactive]
pip install apache-beam[interactive]

In [None]:
# Instalando o apache-beam[gcp]
pip install apache-beam[gcp]

In [2]:
import apache_beam as beam

###⚡ Ler e Exibir a Leitura ⚡

In [None]:
# Criando uma variável com Pipeline
p1 = beam.Pipeline()

voos = (
    p1
    |'Extrair do CSV' >> beam.io.ReadFromText('/content/drive/MyDrive/Datasets/voos.csv', skip_header_lines = 0)
    |'Separador do CSV' >> beam.Map(lambda record: record.split(','))
    |'Exibir o resultado' >> beam.Map(print)
)

# Executando a Pipeline
p1.run()

###⚡ Ler e Salvar a Leitura ⚡

In [None]:
# Criando uma variável com Pipeline
p1 = beam.Pipeline()

voos = (
    p1
    |'Extrair do CSV' >> beam.io.ReadFromText('/content/drive/MyDrive/Datasets/voos.csv', skip_header_lines = 0)
    |'Separador do CSV' >> beam.Map(lambda record: record.split(','))
    |'Gravar o resultado' >> beam.io.WriteToText('saida.txt')
)

# Executando a Pipeline
p1.run()

###⚡ Filtros ⚡

In [None]:
# Criando uma variável com Pipeline
p1 = beam.Pipeline()

voos = (
    p1
    |'Extrair do CSV' >> beam.io.ReadFromText('/content/drive/MyDrive/Datasets/voos.csv', skip_header_lines = 1)
    |'Separador do CSV' >> beam.Map(lambda record: record.split(','))
    |'Filtrar aero' >> beam.Filter(lambda record: record[4] == 'SFO')
    |'Filtrar atrasos' >> beam.Filter(lambda record: float(record[8]) > 0)
    |'Exibir o resultado' >> beam.Map(print)
)

# Executando a Pipeline
p1.run()

###⚡ Agrupamentos ⚡

In [None]:
# Criando uma variável com Pipeline
p1 = beam.Pipeline()

voos = (
    p1
    |'Extrair do CSV' >> beam.io.ReadFromText('/content/drive/MyDrive/Datasets/voos.csv', skip_header_lines = 1)
    |'Separador do CSV' >> beam.Map(lambda record: record.split(','))
    |'Filtrar atrasos' >> beam.Filter(lambda record: float(record[8]) > 0) # Pegando apenas registros com atraso
    |'Agregar as colunas' >> beam.Map(lambda record: (record[4], float(record[8]))) # Pegando apenas as colunas 4 e 8
    |'Construir uma nova tabela por minuto' >> beam.CombinePerKey(sum) # Agrupando por chave e fazendo a soma
    |'Exibir o resultado' >> beam.Map(print)
)

# Executando a Pipeline
p1.run()

In [None]:
# Criando uma variável com Pipeline
p1 = beam.Pipeline()

voos = (
    p1
    |'Extrair do CSV' >> beam.io.ReadFromText('/content/drive/MyDrive/Datasets/voos.csv', skip_header_lines = 1)
    |'Separador do CSV' >> beam.Map(lambda record: record.split(','))
    |'Filtrar atrasos' >> beam.Filter(lambda record: float(record[8]) > 0) # Pegando apenas registros com atraso
    |'Agregar as colunas' >> beam.Map(lambda record: (record[4], float(record[8]))) # Pegando apenas as colunas 4 e 8
    |'Construir uma nova tabela por contagem' >> beam.combiners.Count.PerKey() # Fazendo uma contagem de ocorrências
    |'Exibir o resultado' >> beam.Map(print)
)

# Executando a Pipeline
p1.run()

###⚡ Co Group By Key ⚡

In [None]:
# Criar nova tabela com dados agregados de duas Pipelines
p1 = beam.Pipeline()

tempoAtraso = (
    p1
    |'Extrair do CSV' >> beam.io.ReadFromText('/content/drive/MyDrive/Datasets/voos.csv', skip_header_lines = 1)
    |'Separador do CSV' >> beam.Map(lambda record: record.split(','))
    |'Filtrar atrasos' >> beam.Filter(lambda record: float(record[8]) > 0) # Pegando apenas registros com atraso
    |'Agregar as colunas' >> beam.Map(lambda record: (record[4], float(record[8]))) # Pegando apenas as colunas 4 e 8
    |'Construir uma nova tabela por minuto' >> beam.CombinePerKey(sum) # Agrupando por chave e fazendo a soma
#   |'Exibir o resultado' >> beam.Map(print)
)

qtdAtraso = (
    p1
    |'1Extrair do CSV' >> beam.io.ReadFromText('/content/drive/MyDrive/Datasets/voos.csv', skip_header_lines = 1)
    |'1Separador do CSV' >> beam.Map(lambda record: record.split(','))
    |'1Filtrar atrasos' >> beam.Filter(lambda record: float(record[8]) > 0) # Pegando apenas registros com atraso
    |'1Agregar as colunas' >> beam.Map(lambda record: (record[4], float(record[8]))) # Pegando apenas as colunas 4 e 8
    |'1Construir uma nova tabela por contagem' >> beam.combiners.Count.PerKey() # Fazendo uma contagem de ocorrências
#   |'1Exibir o resultado' >> beam.Map(print)
)

# A última pipeline não precisa do p1 (pernas) já que estamos juntando elas
tabela = (
    {'Quantidade_minutos': tempoAtraso, 'Numero_de_atrasos': qtdAtraso}
    |'Agrupar as pernas' >> beam.CoGroupByKey()
    |'Resultado' >> beam.Map(print)
)

# Executando as Pipelines
p1.run()