<a href="https://colab.research.google.com/github/felipecampelo/Beam-BucketGCP-Dataflow/blob/main/Beam_BucketGCP_Dataflow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### ❗ Habilitando APIs necessárias no GCP ❗

`Link`: [Enable APIs](https://console.cloud.google.com/flows/enableapi?apiid=dataflow,compute_component,logging,storage_component,storage_api,bigquery,pubsub,datastore.googleapis.com,cloudresourcemanager.googleapis.com&_ga=2.189426155.1983962144.1622827940-955197899.1622719217&_gac=1.221249130.1622828282.Cj0KCQjwnueFBhChARIsAPu3YkSUzoSQn7RE14ObAE51mBY111345EEps_OAYCDz3smm2mLh2zjZ__8aAq9dEALw_wcB&wdLOR=cFAD8FEBF-3295-4A46-A7CC-2BD1857695D0)

▶ *Dataflow API*, *Compute Engine API*, *Cloud Logging API*, *Cloud Storage*, *Google Cloud Storage JSON API*, *BigQuery API*, *Cloud Pub/Sub API*, *Cloud Datastore API*, *Cloud Resource Manager API*

### ❗ Apache Beam ❗

`Passo a passo para instalação do Apache Beam no Colab:`

`1)` pip install --upgrade pip

`2)` pip install apache-beam[interactive]

`3)` Reiniciar ambiente de execução

`4)` pip install apache-beam[gcp]

`5)` Reiniciar ambiente de execução

`6)` import apache_beam as beam

`OBS: Nunca usar funções de exibição e gravação na mesma pipeline`

In [None]:
# Necessário atualizar o pip para instalação do Beam
!pip install --upgrade pip

In [None]:
# Instalando o apache-beam[interactive]
!pip install apache-beam[interactive]

In [None]:
# Instalando o apache-beam[gcp]
!pip install apache-beam[gcp]

###⚡ Criação do Modelo de Pipeline no Bucket do GCP ⚡

In [3]:
# Importação de bibliotecas necessárias
import apache_beam as beam
import os
from apache_beam.options.pipeline_options import PipelineOptions

# Conectando com o service account do GCP
serviceAccount = '/content/drive/MyDrive/KeysGCP/aulas-soulcode-felipe-1ab7e143ccf1.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = serviceAccount

# Criando as configurações da Pipeline para conexão com o GCP
pipeline_options = {
    'project': 'aulas-soulcode-felipe', # ID do projeto do GCP
    'runner': 'DataflowRunner', # Aplicação que irá rodar (Dataflow)
    'region': 'southamerica-east1', # Região de preferência
    'staging_location': 'gs://beam-soulcode/staging/', # Localização para arquivos em staging
    'temp_location': 'gs://beam-soulcode/temp/', # Localização para arquivos temporários
    'template_location': 'gs://beam-soulcode/models/modelo_batch' # Localização do modelo de Pipeline
}

# Transformando o dicionário para o tipo de PipelineOption
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)

# Criando a Pipeline com as opções estabelecidas
p1 = beam.Pipeline(options = pipeline_options)

tempoAtraso = (
    p1
    |'Extrair do CSV' >> beam.io.ReadFromText('gs://beam-soulcode/DADOS/voos.csv', skip_header_lines = 1) # Lendo o CSV que está no bucket
    |'Separador do CSV' >> beam.Map(lambda record: record.split(',')) # Definindo o separador do arquivo
    |'Filtrar atrasos' >> beam.Filter(lambda record: float(record[8]) > 0) # Pegando apenas registros com atraso
    |'Agregar as colunas' >> beam.Map(lambda record: (record[4], float(record[8]))) # Pegando apenas as colunas 4 e 8
    |'Construir uma nova tabela por minuto' >> beam.CombinePerKey(sum) # Agrupando por chave e fazendo a soma
)

qtdAtraso = (
    p1
    |'1Extrair do CSV' >> beam.io.ReadFromText('gs://beam-soulcode/DADOS/voos.csv', skip_header_lines = 1) # Lendo o CSV que está no bucket
    |'1Separador do CSV' >> beam.Map(lambda record: record.split(',')) # Definindo o separador do arquivo
    |'1Filtrar atrasos' >> beam.Filter(lambda record: float(record[8]) > 0) # Pegando apenas registros com atraso
    |'1Agregar as colunas' >> beam.Map(lambda record: (record[4], float(record[8]))) # Pegando apenas as colunas 4 e 8
    |'1Construir uma nova tabela por contagem' >> beam.combiners.Count.PerKey() # Fazendo uma contagem de ocorrências
)

# A última pipeline não precisa do p1 (pernas) já que estamos juntando elas
tabela = (
    {'Quantidade_minutos': tempoAtraso, 'Numero_de_atrasos': qtdAtraso}
    |'Agrupar as pernas' >> beam.CoGroupByKey()
    |'Gravar o resultado' >> beam.io.WriteToText('gs://beam-soulcode/DADOS/Tratado/agregado', file_name_suffix = '.csv') # "agregado" é o nome dado ao arquivo com sufixo ".csv"
)

# Executando as Pipelines
p1.run()

# OBS: O Pipeline não está sendo executado, o modelo está sendo armazenado no bucket do GCP.
# Ao executar o modelo através do Dataflow no GCP, será gerado o arquivo tratado.

<DataflowPipelineResult None at 0x7f1160851490>