In [30]:
import os, datetime
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from __future__ import absolute_import
from apache_beam import pvalue
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

In [None]:
PROJECT_ID = 'uber-fair'
BUCKET = 'gs://uber-fair-beam-bucket'
DIR_PATH_IN = BUCKET + '/input/' 
DIR_PATH = BUCKET + '/output/' + datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S') + '/'
           
class CastAsDateTime(beam.DoFn):
  def process(self, element):
    import datetime
    wData = element
    weatherDate = str(datetime.datetime.fromtimestamp(wData["time_stamp"]).strftime('%Y-%m-%d'))
    wData['time_stamp'] = weatherDate
    return [wData]
         

# run pipeline on Dataflow 
options = {
    'runner': 'DataflowRunner',
    'job_name': 'transform-weather-date-5',
    'project': PROJECT_ID,
    'temp_location': BUCKET + '/temp',
    'staging_location': BUCKET + '/staging',
    'machine_type': 'n1-standard-4', # machine types listed here: https://cloud.google.com/compute/docs/machine-types
    'num_workers': 10
}
opts = beam.pipeline.PipelineOptions(flags=[], **options)

p = beam.Pipeline('DataflowRunner', options=opts)

sql = 'SELECT * FROM weather_modeled.weather'
bq_source = beam.io.BigQuerySource(query=sql, use_standard_sql=True)

query_results = p | 'Read from BigQuery' >> beam.io.Read(bq_source)

# write PCollection to log file
query_results | 'Write log 1' >> WriteToText(DIR_PATH + 'query_results.txt')

# remove duplicate student records
formatted_date_pcoll = query_results | 'ParDo for Date' >> beam.ParDo(CastAsDateTime())

# write PCollection to log file
formatted_date_pcoll | 'Write log 2' >> WriteToText(DIR_PATH + 'formatted_date_pcoll.txt')

dataset_id = 'weather_modeled'
table_id = 'weather_DF'
schema_id = 'temp:FLOAT,location:STRING, clouds:FLOAT,pressure:FLOAT,rain:FLOAT,time_stamp:DATE,humidity:FLOAT,wind:FLOAT,id:STRING'

# write PCollection to new BQ table
formatted_date_pcoll | 'Write BQ table' >> beam.io.WriteToBigQuery(dataset=dataset_id, 
                                                table=table_id, 
                                                schema=schema_id,
                                                project=PROJECT_ID,
                                                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                                                batch_size=int(100))
result = p.run()
result.wait_until_finish()