Based on Apache Beam streaming example in book by Lak 

https://github.com/GoogleCloudPlatform/data-science-on-gcp/tree/master/04_streaming

![alt text](https://covers.oreillystatic.com/images/0636920057628/lrg.jpg)


###First install some python libs we need ...




In [0]:
!pip install apache-beam[gcp]
!pip install --upgrade google-cloud-pubsub
!pip install --upgrade google-cloud-storage
!pip install timezonefinder

Collecting apache-beam[gcp]
[?25l  Downloading https://files.pythonhosted.org/packages/e8/5c/4c4302f48686e89468cc036a8171b55a2f509339dd5d1613be2ec5c41c8e/apache_beam-2.7.0-cp27-cp27mu-manylinux1_x86_64.whl (2.3MB)
[K    100% |████████████████████████████████| 2.3MB 7.2MB/s 
Collecting hdfs<3.0.0,>=2.1.0 (from apache-beam[gcp])
  Downloading https://files.pythonhosted.org/packages/bb/ef/35af4764ea6c60bd14195ca78d4e831d154183f35cc4af4a0b3e01aa28ce/hdfs-2.1.0.tar.gz
Collecting pyvcf<0.7.0,>=0.6.8 (from apache-beam[gcp])
  Downloading https://files.pythonhosted.org/packages/20/b6/36bfb1760f6983788d916096193fc14c83cce512c7787c93380e09458c09/PyVCF-0.6.8.tar.gz
Collecting avro<2.0.0,>=1.8.1 (from apache-beam[gcp])
[?25l  Downloading https://files.pythonhosted.org/packages/eb/27/143f124a7498f841317a92ced877150c5cb8d28a4109ec39666485925d00/avro-1.8.2.tar.gz (43kB)
[K    100% |████████████████████████████████| 51kB 20.3MB/s 
Collecting fastavro==0.19.7 (from apache-beam[gcp])
[?25l  Downloa

### ... and import some libs


In [0]:
import csv
import apache_beam as beam
from google.cloud import storage
import timezonefinder
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

  from google.cloud.proto.pubsub.v1 import pubsub_pb2


### Set your project here

In [0]:
%env GOOGLE_CLOUD_PROJECT=funwithcloud

env: GOOGLE_CLOUD_PROJECT=funwithcloud


### And authenticate 

In [0]:
from google.colab import auth
auth.authenticate_user()

### Remove all previous generated output files


In [0]:
%%bash 
gsutil ls gs://airflights/output/*
gsutil rm gs://airflights/output/*

CommandException: One or more URLs matched no objects.
CommandException: No URLs matched: gs://airflights/output/*


In [0]:
   with beam.Pipeline('DirectRunner') as pipeline:

      airports = (pipeline
         | beam.io.ReadFromText('gs://airflights/airports.csv.gz')
         | beam.Map(lambda line: next(csv.reader([line])))
         | beam.Map(lambda fields: (fields[0], (fields[21], fields[26])))
      )

      airports | beam.Map(lambda (airport, data): '{},{}'.format(airport, ','.join(data)) )| beam.io.textio.WriteToText('gs://airflights/output/extracted_airports')

      pipeline.run()



### Check output 

In [0]:
%%bash 
gsutil ls gs://airflights/output/*

gs://airflights/output/extracted_airports-00000-of-00001


In [0]:
%%bash 
gsutil cat gs://airflights/output/extracted_airports-00000-of-00001 | head -n20

AIRPORT_SEQ_ID,LATITUDE,LONGITUDE
1000101,58.10944444,-152.90666667
1000301,65.54805556,-161.07166667
1000401,68.08333333,-163.16666667
1000501,67.57000000,-148.18388889
1000601,57.74527778,-152.88277778
1000701,55.55472222,-133.10166667
1000801,59.15694444,-151.82916667
1000901,59.36277778,-153.43055556
1001001,42.28888889,-73.71027778
1001002,42.29138889,-73.71027778
1001101,35.99027778,-113.81638889
1001102,35.98611111,-113.81694444
1001201,40.97111111,-74.99750000
1001301,33.62388889,-101.24083333
1001401,64.36416667,-147.36138889
1001402,64.36361111,-147.36388889
1001501,57.06666667,-153.93777778
1001601,58.45750000,-154.02333333
1001701,57.27722222,-154.34222222


In [0]:
def addtimezone(lat, lon):
   try:
      import timezonefinder
      tf = timezonefinder.TimezoneFinder()
      tz = tf.timezone_at(lng=float(lon), lat=float(lat))
      if tz is None:
         tz = 'UTC'
      return (lat, lon, tz)
   except ValueError:
      return (lat, lon, 'TIMEZONE') # header

if __name__ == '__main__':
   with beam.Pipeline('DirectRunner') as pipeline:

      airports = (pipeline
         | beam.io.ReadFromText('gs://airflights/airports.csv.gz')
         | beam.Map(lambda line: next(csv.reader([line])))
         | beam.Map(lambda fields: (fields[0], addtimezone(fields[21], fields[26])))
      )

      airports | beam.Map(lambda (airport, data): '{},{}'.format(airport, ','.join(data)) )| beam.io.textio.WriteToText('gs://airflights/output/extracted_airports')

      pipeline.run()



In [0]:
%%bash 
gsutil ls gs://airflights/output/*

gs://airflights/output/extracted_airports-00000-of-00001


In [0]:
%%bash 
gsutil cat gs://airflights/output/extracted_airports-00000-of-00001 | head -n20

AIRPORT_SEQ_ID,LATITUDE,LONGITUDE,TIMEZONE
1000101,58.10944444,-152.90666667,America/Anchorage
1000301,65.54805556,-161.07166667,America/Anchorage
1000401,68.08333333,-163.16666667,America/Nome
1000501,67.57000000,-148.18388889,America/Anchorage
1000601,57.74527778,-152.88277778,America/Anchorage
1000701,55.55472222,-133.10166667,America/Sitka
1000801,59.15694444,-151.82916667,America/Anchorage
1000901,59.36277778,-153.43055556,America/Anchorage
1001001,42.28888889,-73.71027778,America/New_York
1001002,42.29138889,-73.71027778,America/New_York
1001101,35.99027778,-113.81638889,America/Phoenix
1001102,35.98611111,-113.81694444,America/Phoenix
1001201,40.97111111,-74.99750000,America/New_York
1001301,33.62388889,-101.24083333,America/Chicago
1001401,64.36416667,-147.36138889,America/Anchorage
1001402,64.36361111,-147.36388889,America/Anchorage
1001501,57.06666667,-153.93777778,America/Anchorage
1001601,58.45750000,-154.02333333,America/Anchorage
1001701,57.27722222,-154.34222222,America/

In [0]:
print 'start'
def addtimezone(lat, lon):
   try:
      import timezonefinder
      tf = timezonefinder.TimezoneFinder()
      return (lat, lon, tf.timezone_at(lng=float(lon), lat=float(lat)))
      #return (lat, lon, 'America/Los_Angeles') # FIXME
   except ValueError:
      return (lat, lon, 'TIMEZONE') # header

def as_utc(date, hhmm, tzone):
   try:
      if len(hhmm) > 0 and tzone is not None:
         import datetime, pytz
         loc_tz = pytz.timezone(tzone)
         loc_dt = loc_tz.localize(datetime.datetime.strptime(date,'%Y-%m-%d'), is_dst=False)
         # can't just parse hhmm because the data contains 2400 and the like ...
         loc_dt += datetime.timedelta(hours=int(hhmm[:2]), minutes=int(hhmm[2:]))
         utc_dt = loc_dt.astimezone(pytz.utc)
         print 'get utc'
         return utc_dt.strftime('%Y-%m-%d %H:%M:%S')
      else:
         return '' # empty string corresponds to canceled flights
   except ValueError as e:
      print '{} {} {}'.format(date, hhmm, tzone)
      raise e

def tz_correct(line, airport_timezones):
   fields = line.split(',')
   if fields[0] != 'FL_DATE' and len(fields) == 27:
      # convert all times to UTC
      dep_airport_id = fields[6]
      arr_airport_id = fields[10]
      dep_timezone = airport_timezones[dep_airport_id][2]
      arr_timezone = airport_timezones[arr_airport_id][2]

      for f in [13, 14, 17]: #crsdeptime, deptime, wheelsoff
         fields[f] = as_utc(fields[0], fields[f], dep_timezone)
      for f in [18, 20, 21]: #wheelson, crsarrtime, arrtime
         fields[f] = as_utc(fields[0], fields[f], arr_timezone)

      yield ','.join(fields)
      print 'tz_correct'

if __name__ == '__main__':
   with beam.Pipeline('DirectRunner') as pipeline:
      print 'pipeline run'
      airports = (pipeline
         | 'airports:read' >> beam.io.ReadFromText('gs://airflights/airports.csv.gz')
         | 'airports:fields' >> beam.Map(lambda line: next(csv.reader([line])))
         | 'airports:tz' >> beam.Map(lambda fields: (fields[0], addtimezone(fields[21], fields[26])))
      )

      flights = (pipeline
         | 'flights:read' >> beam.io.ReadFromText('gs://airflights/201501_part.csv')
         | 'flights:tzcorr' >> beam.FlatMap(tz_correct, beam.pvalue.AsDict(airports))
      )

      flights | beam.io.textio.WriteToText('gs://airflights/output/all_flights')

      pipeline.run()
      

start
pipeline run




In [0]:
%%bash 
gsutil cat gs://airflights/output/all_flights*| head -n20