In [None]:
#install apache beam
!pip install apache-beam[gcp]

Collecting apache-beam[gcp]
  Downloading apache_beam-2.60.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.6 kB)
Collecting crcmod<2.0,>=1.7 (from apache-beam[gcp])
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting dill<0.3.2,>=0.3.1.1 (from apache-beam[gcp])
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting cloudpickle~=2.2.1 (from apache-beam[gcp])
  Downloading cloudpickle-2.2.1-py3-none-any.whl.metadata (6.9 kB)
Collecting fastavro<2,>=0.23.6 (from apache-beam[gcp])
  Downloading fastavro-1.9.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Collecting fasteners<1.0,>=0.

In [None]:
#aut. google colab with my user
from google.colab import auth
auth.authenticate_user()

In [None]:
#conf. project
!gcloud config set project halogen-oxide-440319-r7


Updated property [core/project].


In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

#conf. pipeline arguments
@classmethod
def _add_argparse_args(cls, parser):
    parser.add_argument('--project_id', required=True, help='ID do projeto do GCP')
    parser.add_argument('--region', required=True, help='Região do Dataflow')
    parser.add_argument('--temp_location', required=True, help='Bucket do GCS para arquivos temporários')
    parser.add_argument('--output_table', required=True, help='Tabela destino no formato PROJECT:DATASET.TABLE')

def run():
      pipeline_options = PipelineOptions(
          runner='DataflowRunner',
          project='halogen-oxide-440319-r7',
          region='us-central1',
          temp_location='gs://proj_cyclistic/temp',
          max_num_workers=5,  #limit workers
          autoscaling_algorithm='THROUGHPUT_BASED'
      )

#create pipeline
      with beam.Pipeline(options=pipeline_options) as pipeline:
        #set table schema
        schemaTotal = """
          usertype:STRING,zip_code_start:STRING,borough_start:STRING,neighborhood_start:STRING,zip_code_end:STRING,borough_end:STRING,neighborhood_end:STRING,start_day:DATE,stop_day:DATE,day_mean_temperature:FLOAT64,day_mean_wind_speed:STRING,day_total_precipitation:FLOAT64,trip_minutes:FLOAT64,trip_count:INT64
        """

        #set querys SQL = public database with zip code table
        #query total historic
        queryTotal = """
          SELECT TRI.usertype,
            ZIPSTART.zip_code AS zip_code_start,
            ZIPSTARTNAME.borough borough_start,
            ZIPSTARTNAME.neighborhood AS neighborhood_start,
            ZIPEND.zip_code AS zip_code_end,
            ZIPENDNAME.borough borough_end,
            ZIPENDNAME.neighborhood AS neighborhood_end,
            DATE_ADD(DATE(TRI.starttime), INTERVAL 5 YEAR) AS start_day,
            DATE_ADD(DATE(TRI.stoptime), INTERVAL 5 YEAR) AS stop_day,
            WEA.temp AS day_mean_temperature,
            WEA.wdsp AS day_mean_wind_speed,
            WEA.prcp day_total_precipitation,
            ROUND(CAST(TRI.tripduration / 60 AS INT64), -1) AS trip_minutes,
            COUNT(TRI.bikeid) AS trip_count
          FROM
            `bigquery-public-data.new_york_citibike.citibike_trips` AS TRI
          INNER JOIN
            `bigquery-public-data.geo_us_boundaries.zip_codes` ZIPSTART
            ON ST_WITHIN(
              ST_GEOGPOINT(TRI.start_station_longitude, TRI.start_station_latitude), ZIPSTART.zip_code_geom)
          INNER JOIN
            `bigquery-public-data.geo_us_boundaries.zip_codes` ZIPEND
            ON ST_WITHIN(
              ST_GEOGPOINT(TRI.end_station_longitude, TRI.end_station_latitude), ZIPEND.zip_code_geom)
          INNER JOIN
            `bigquery-public-data.noaa_gsod.gsod20*` AS WEA
            ON PARSE_DATE("%Y%m%d", CONCAT(WEA.year, WEA.mo, WEA.da)) = DATE(TRI.starttime)
          INNER JOIN
            `halogen-oxide-440319-r7.CyclisticNYC.zip_code` AS ZIPSTARTNAME
            ON ZIPSTART.zip_code = CAST(ZIPSTARTNAME.zip AS STRING)
          INNER JOIN
            `halogen-oxide-440319-r7.CyclisticNYC.zip_code` AS ZIPENDNAME
            ON ZIPEND.zip_code = CAST(ZIPENDNAME.zip AS STRING)
          WHERE
            WEA.wban = '94728' -- NEW YORK CENTRAL PARK
            AND EXTRACT(YEAR FROM DATE(TRI.starttime)) BETWEEN 2014 AND 2015
          GROUP BY
            1,   2,   3,   4,   5,   6,   7,   8,   9,   10,   11,   12,   13;
          """

        #set table schema summer
        schemaSummer = """
          usertype:STRING,start_station_longitude:FLOAT,start_station_latitude:FLOAT,end_station_longitude:FLOAT,end_station_latitude:FLOAT,zip_code_start:STRING,borough_start:STRING,neighborhood_start:STRING,zip_code_end:STRING,borough_end:STRING,neighborhood_end:STRING,start_day:DATE,stop_day:DATE,day_mean_temperature:FLOAT,day_mean_wind_speed:STRING,day_total_precipitation:FLOAT,trip_minutes:FLOAT,bikeid:INTEGER
        """

        #query summer historic
        querySummer = """
          SELECT TRI.usertype,
            TRI.start_station_longitude,
            TRI.start_station_latitude,
            TRI.end_station_longitude,
            TRI.end_station_latitude,
            ZIPSTART.zip_code AS zip_code_start,
            ZIPSTARTNAME.borough borough_start,
            ZIPSTARTNAME.neighborhood AS neighborhood_start,
            ZIPEND.zip_code AS zip_code_end,
            ZIPENDNAME.borough borough_end,
            ZIPENDNAME.neighborhood AS neighborhood_end,
            DATE_ADD(DATE(TRI.starttime), INTERVAL 5 YEAR) AS start_day,
            DATE_ADD(DATE(TRI.stoptime), INTERVAL 5 YEAR) AS stop_day,
            WEA.temp AS day_mean_temperature, -- Mean temp
            WEA.wdsp AS day_mean_wind_speed, -- Mean wind speed
            WEA.prcp day_total_precipitation, -- Total precipitation
          ROUND(CAST(TRI.tripduration / 60 AS INT64), -1) AS trip_minutes,
            TRI.bikeid
          FROM
            `bigquery-public-data.new_york_citibike.citibike_trips` AS TRI
          INNER JOIN
            `bigquery-public-data.geo_us_boundaries.zip_codes` ZIPSTART
            ON ST_WITHIN(ST_GEOGPOINT(TRI.start_station_longitude, TRI.start_station_latitude),ZIPSTART.zip_code_geom)
          INNER JOIN
            `bigquery-public-data.geo_us_boundaries.zip_codes` ZIPEND
            ON ST_WITHIN(ST_GEOGPOINT(TRI.end_station_longitude, TRI.end_station_latitude), ZIPEND.zip_code_geom)
          INNER JOIN `bigquery-public-data.noaa_gsod.gsod20*` AS WEA
            ON PARSE_DATE("%Y%m%d", CONCAT(WEA.year, WEA.mo, WEA.da)) = DATE(TRI.starttime)
          INNER JOIN `halogen-oxide-440319-r7.CyclisticNYC.zip_code` AS ZIPSTARTNAME
            ON ZIPSTART.zip_code = CAST(ZIPSTARTNAME.zip AS STRING)
          INNER JOIN `halogen-oxide-440319-r7.CyclisticNYC.zip_code` AS ZIPENDNAME
            ON ZIPEND.zip_code = CAST(ZIPENDNAME.zip AS STRING)
          WHERE
            WEA.wban = '94728' -- NEW YORK CENTRAL PARK
            AND DATE(TRI.starttime) BETWEEN DATE('2015-07-01') AND DATE('2015-09-30') --Use data for three summer months
        """

        #first data flow Total historic - get data and input query result into new table on BigQuery
        dataTotal = (
            pipeline
            | 'ReadFromBigqueryTotal' >> beam.io.ReadFromBigQuery(query=queryTotal, use_standard_sql=True)
        )

        dataTotal | 'WriteToBigQueryTotal' >> beam.io.WriteToBigQuery(
            table='halogen-oxide-440319-r7:CyclisticNYC.historic_user',
            schema=schemaTotal,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
        )

        #second data flow Summer historic - get data and input query result into new table on BigQuery
        dataSummer = (
            pipeline
            | 'ReadFromBigquerySummer' >> beam.io.ReadFromBigQuery(query=querySummer, use_standard_sql=True)
        )

        dataSummer | 'WriteToBigQuerySummer' >> beam.io.WriteToBigQuery(
            table='halogen-oxide-440319-r7:CyclisticNYC.historic_user_summer',
            schema=schemaSummer,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
        )

if __name__ == '__main__':
    run()

