<a href="https://colab.research.google.com/github/felipecampelo/ApacheBeam-DataflowGCP/blob/main/Pandas_Beam_Dataflow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### ❗ Habilitando APIs necessárias no GCP ❗

`Link`: [Enable APIs](https://console.cloud.google.com/flows/enableapi?apiid=dataflow,compute_component,logging,storage_component,storage_api,bigquery,pubsub,datastore.googleapis.com,cloudresourcemanager.googleapis.com&_ga=2.189426155.1983962144.1622827940-955197899.1622719217&_gac=1.221249130.1622828282.Cj0KCQjwnueFBhChARIsAPu3YkSUzoSQn7RE14ObAE51mBY111345EEps_OAYCDz3smm2mLh2zjZ__8aAq9dEALw_wcB&wdLOR=cFAD8FEBF-3295-4A46-A7CC-2BD1857695D0)

▶ *Dataflow API*, *Compute Engine API*, *Cloud Logging API*, *Cloud Storage*, *Google Cloud Storage JSON API*, *BigQuery API*, *Cloud Pub/Sub API*, *Cloud Datastore API*, *Cloud Resource Manager API*

### ❗ Apache Beam ❗

`Passo a passo para instalação do Apache Beam no Colab:`

`1)` pip install --upgrade pip

`2)` pip install apache-beam[interactive]

`3)` Reiniciar ambiente de execução

`4)` pip install apache-beam[gcp]

`5)` Reiniciar ambiente de execução

`6)` import apache_beam as beam

`OBS: Nunca usar funções de exibição e gravação na mesma pipeline`

In [None]:
# Necessário atualizar o pip para instalação do Beam
!pip install --upgrade pip

In [None]:
# Instalando o apache-beam[interactive]
!pip install apache-beam[interactive]

In [None]:
# Instalando o apache-beam[gcp]
!pip install apache-beam[gcp]

###⚡ Criação do Modelo de Pipeline no Bucket do GCP ⚡

🔷 Dropar colunas que não forem necessárias

🔷 Realizar o tratamento dos dados

🔷 Criar a pipeline com os filtros desejados

🔷 Criar um job no Dataflow

In [1]:
# Importando as bibliotecas necessárias
import pandas as pd
import numpy as np

import apache_beam as beam
import os
from apache_beam.options.pipeline_options import PipelineOptions

###⚡ Leitura do arquivo CSV ⚡


In [2]:
# Lendo o arquivo CSV
df = pd.read_csv('https://storage.googleapis.com/bc23-soulcode/DADOS/flights.csv')

# Setando o visualizador do DataFrame para 100 colunas
pd.set_option('display.max_columns', 100)
df.head()

  exec(code_obj, self.user_global_ns, self.user_ns)


Unnamed: 0,YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,SCHEDULED_TIME,ELAPSED_TIME,AIR_TIME,DISTANCE,WHEELS_ON,TAXI_IN,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
0,2015,1,1,4,AS,98,N407AS,ANC,SEA,5,2354.0,-11.0,21.0,15.0,205.0,194.0,169.0,1448,404.0,4.0,430,408.0,-22.0,0,0,,,,,,
1,2015,1,1,4,AA,2336,N3KUAA,LAX,PBI,10,2.0,-8.0,12.0,14.0,280.0,279.0,263.0,2330,737.0,4.0,750,741.0,-9.0,0,0,,,,,,
2,2015,1,1,4,US,840,N171US,SFO,CLT,20,18.0,-2.0,16.0,34.0,286.0,293.0,266.0,2296,800.0,11.0,806,811.0,5.0,0,0,,,,,,
3,2015,1,1,4,AA,258,N3HYAA,LAX,MIA,20,15.0,-5.0,15.0,30.0,285.0,281.0,258.0,2342,748.0,8.0,805,756.0,-9.0,0,0,,,,,,
4,2015,1,1,4,AS,135,N527AS,SEA,ANC,25,24.0,-1.0,11.0,35.0,235.0,215.0,199.0,1448,254.0,5.0,320,259.0,-21.0,0,0,,,,,,


###⚡ Análise e Tratamento dos Dados ⚡


In [3]:
# Dropando colunas desnecessárias para o problema em questão
df.drop(['YEAR', 'MONTH', 'DAY', 'DAY_OF_WEEK', 'AIRLINE', 'FLIGHT_NUMBER', 'TAIL_NUMBER', 'DESTINATION_AIRPORT', 
         'SCHEDULED_DEPARTURE', 'DEPARTURE_TIME', 'TAXI_OUT',	'WHEELS_OFF', 'SCHEDULED_TIME', 'ELAPSED_TIME', 'AIR_TIME', 
         'DISTANCE', 'WHEELS_ON', 'TAXI_IN', 'SCHEDULED_ARRIVAL', 'ARRIVAL_TIME', 'ARRIVAL_DELAY', 'DIVERTED', 'CANCELLATION_REASON', 
         'AIR_SYSTEM_DELAY', 'SECURITY_DELAY', 'AIRLINE_DELAY', 'LATE_AIRCRAFT_DELAY', 'WEATHER_DELAY'], axis = 1, inplace = True)

df.head()

Unnamed: 0,ORIGIN_AIRPORT,DEPARTURE_DELAY,CANCELLED
0,ANC,-11.0,0
1,LAX,-8.0,0
2,SFO,-2.0,0
3,LAX,-5.0,0
4,SEA,-1.0,0


In [4]:
# Removendo voos cancelados do DataFrame
filtro = df.CANCELLED == 0
df = df.loc[filtro]

# Removendo a coluna CANCELLED do DataFrame
df.drop(['CANCELLED'], axis = 1, inplace = True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  errors=errors,


In [None]:
# Verificando inconsistências no DataFrame resultante
print(df.groupby(['ORIGIN_AIRPORT'], dropna = False).size().sort_values(ascending = False)) # Inconsistências detectadas

print('\nMáximo e mínimo da coluna DEPARTURE_DELAY:\n')
print(df.DEPARTURE_DELAY.max()) # Coerente
print(df.DEPARTURE_DELAY.min()) # Coerente

# Removendo ORIGIN_AIRPORT com valor numérico
filtro = df.ORIGIN_AIRPORT.str.match('[A-Z]') == True # Excluindo valores que contém números
filtro2 = df.DEPARTURE_DELAY > 0
df = df.loc[filtro & filtro2]

###⚡ Carregando o DataFrame tratado para CSV ⚡


In [9]:
df.to_csv("df_tratado.csv", index = False)

###⚡ Criação da Pipeline ⚡


In [None]:
df.head()

Unnamed: 0,ORIGIN_AIRPORT,DEPARTURE_DELAY
7,LAX,14.0
9,LAS,3.0
20,LAS,25.0
27,MSP,12.0
29,DEN,21.0


In [10]:
# Conectando com o service account do GCP
serviceAccount = '/content/drive/MyDrive/KeysGCP/aulas-soulcode-felipe-1ab7e143ccf1.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = serviceAccount

# Criando as configurações da Pipeline para conexão com o GCP
pipeline_options = {
    'project': 'aulas-soulcode-felipe', # ID do projeto do GCP
    'runner': 'DataflowRunner', # Aplicação que irá rodar (Dataflow)
    'region': 'southamerica-east1', # Região de preferência
    'staging_location': 'gs://atividade-beam-dataflow/staging/', # Localização para arquivos em staging
    'temp_location': 'gs://atividade-beam-dataflow/temp/', # Localização para arquivos temporários
    'template_location': 'gs://atividade-beam-dataflow/models/modelo_batch' # Localização do modelo de Pipeline
}

# Transformando o dicionário para o tipo de PipelineOption
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)

# Criando a Pipeline com as opções estabelecidas
p1 = beam.Pipeline(options = pipeline_options)

tempoAtraso = (
    p1
    |'Extrair do CSV' >> beam.io.ReadFromText('gs://atividade-beam-dataflow/DADOS/df_tratado.csv', skip_header_lines = 1) # Lendo o CSV que está no bucket
    |'Separador do CSV' >> beam.Map(lambda record: record.split(',')) # Definindo o separador do arquivo
    |'Agregar as colunas' >> beam.Map(lambda record: (record[0], float(record[1]))) # Pegando apenas as colunas 4 e 8
    |'Construir uma nova tabela por minuto' >> beam.CombinePerKey(sum) # Agrupando por chave e fazendo a soma
)

qtdAtraso = (
    p1
    |'1Extrair do CSV' >> beam.io.ReadFromText('gs://atividade-beam-dataflow/DADOS/df_tratado.csv', skip_header_lines = 1) # Lendo o CSV que está no bucket
    |'1Separador do CSV' >> beam.Map(lambda record: record.split(',')) # Definindo o separador do arquivo
    |'1Agregar as colunas' >> beam.Map(lambda record: (record[0], float(record[1]))) # Pegando apenas as colunas 4 e 8
    |'1Construir uma nova tabela por contagem' >> beam.combiners.Count.PerKey() # Fazendo uma contagem de ocorrências
)

# A última pipeline não precisa do p1 (pernas) já que estamos juntando elas
tabela = (
    {'Quantidade_minutos': tempoAtraso, 'Numero_de_atrasos': qtdAtraso}
    |'Agrupar as pernas' >> beam.CoGroupByKey()
    |'Gravar o resultado' >> beam.io.WriteToText('gs://atividade-beam-dataflow/DADOS/Tratado/agregado', file_name_suffix = '.csv') # "agregado" é o nome dado ao arquivo com sufixo ".csv"
)

# Executando as Pipelines
p1.run()

# OBS: O Pipeline não está sendo executado, o modelo está sendo armazenado no bucket do GCP.
# Ao executar o modelo através do Dataflow no GCP, será gerado o arquivo tratado.

<DataflowPipelineResult None at 0x7f52db270850>