In [4]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from formatter_classes import SplitByCommasDoFn, FilterWithDelaysDoFn, CreateKeyValueDoFn, FormatForBigQueryDoFn
import os

SERVICE_ACCOUNT = "curso-apache-beam-gcp.json"
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = SERVICE_ACCOUNT

CLOUD_STORAGE = "gs://curso-apache-beam-gcp-n"
CLOUD_STORAGE_INPUT = "gs://curso-apache-beam-gcp-n/inputs/voos_sample.csv"
CLOUD_STORAGE_TEMP = f"{CLOUD_STORAGE}/temp"
CLOUD_STORAGE_TEMPLATE_FILE = f"{CLOUD_STORAGE}/template/curso-apache-beam-bckt-bq-local-teste"

BIG_QUERY_TABLE = f"curso-apache-beam-gcp:cursoapachebeamgcpdataset.cursoapachebeamgcpdataset-flights"

pipelines_options = PipelineOptions.from_dictionary({
    'project': 'curso-apache-beam-gcp',
    'runner': 'DataflowRunner',
    'region': 'us',
    'staging_location': CLOUD_STORAGE_TEMP,
    'temp_location': CLOUD_STORAGE_TEMP,
    'template_location': CLOUD_STORAGE_TEMPLATE_FILE,
    'save_main_session': True
})

p = beam.Pipeline(options=pipelines_options)

def process_data(pipeline: beam.Pipeline, type_of_pipeline: str):
    return (
            pipeline
            | f"Importar Dados ({type_of_pipeline})" >> beam.io.ReadFromText(CLOUD_STORAGE_INPUT, skip_header_lines=1)
            | f"Separar por Virgulas ({type_of_pipeline})" >> beam.ParDo(SplitByCommasDoFn())
            | f"Voos sem atraso ({type_of_pipeline})" >> beam.Filter(FilterWithDelaysDoFn())
            | f"Criar par ({type_of_pipeline})" >> beam.Map(CreateKeyValueDoFn())
    )



In [None]:
# GROUP BY + COUNT
contagem_atrasos = (
        process_data(p, "Contagem Atrasos")
        | "Contar por key" >> beam.combiners.Count.PerKey()
)

In [None]:
# GROUP BY + SUM
soma_atrasos = (
        process_data(p, "Soma Atrasos")
        | "Somar por key" >> beam.CombinePerKey(sum)
)

In [None]:
# Formatting and sending to BQ

tabela_atrasos = (
        {'contagem_atrasos': contagem_atrasos, 'soma_atrasos': soma_atrasos}
        | 'Agrupando por chave' >> beam.CoGroupByKey()
        | 'Transformar para formato BigQuery' >> beam.ParDo(FormatForBigQueryDoFn())
        | 'Enviando para o Big Query' >> beam.io.WriteToBigQuery(
    table=BIG_QUERY_TABLE,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    custom_gcs_temp_location=CLOUD_STORAGE_TEMP)
)

In [None]:
# Running pipeline
p.run()