In [4]:
#install apache beam
!pip install apache-beam[gcp]



In [None]:
#aut. google colab with my user
from google.colab import auth
auth.authenticate_user()

In [None]:
#conf. project
!gcloud config set project deft-apparatus-439911-j9

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

#conf. pipeline arguments
@classmethod
def _add_argparse_args(cls, parser):
    parser.add_argument('--project_id', required=True, help='ID do projeto do GCP')
    parser.add_argument('--region', required=True, help='Região do Dataflow')
    parser.add_argument('--temp_location', required=True, help='Bucket do GCS para arquivos temporários')
    parser.add_argument('--output_table', required=True, help='Tabela destino no formato PROJECT:DATASET.TABLE')

def run():
      pipeline_options = PipelineOptions(
          runner='DataflowRunner',
          project='deft-apparatus-439911-j9',
          region='us-central1',
          temp_location='gs://fiber_calls/temp',
          max_num_workers=5,  #limit workers
          autoscaling_algorithm='THROUGHPUT_BASED'
      )

#create pipeline
      with beam.Pipeline(options=pipeline_options) as pipeline:
        #set table schema
        schemaTotal = """
          date_created:DATE,contacts_n:STRING,contacts_n_1:STRING,contacts_n_2:STRING,contacts_n_3:STRING,contacts_n_4:STRING,contacts_n_5:STRING,contacts_n_6:STRING,contacts_n_7:STRING,new_type:STRING,new_market:STRING,
        """

        #set querys SQL = merge three tables
        queryTotal = """
          SELECT * FROM `deft-apparatus-439911-j9.fiber.market_1`
            UNION ALL
          SELECT * FROM `deft-apparatus-439911-j9.fiber.market_2`
            UNION ALL
          SELECT * FROM `deft-apparatus-439911-j9.fiber.market_3`
        """

        #data flow - get data and input query result into new table on BigQuery
        dataTotal = (
            pipeline
            | 'ReadFromBigquery' >> beam.io.ReadFromBigQuery(query=queryTotal, use_standard_sql=True)
        )

        dataTotal | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
            table='deft-apparatus-439911-j9:fiber.customer_call',
            schema=schemaTotal,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
        )

if __name__ == '__main__':
    run()