In [1]:
%%bash
pip install --upgrade tensorflow==1.4
pip install --ignore-installed --upgrade pytz==2018.4
pip uninstall -y google-cloud-dataflow
pip install --upgrade apache-beam[gcp]==2.6
source activate py2env
pip install google-cloud-bigquery==0.23.0


Collecting tensorflow==1.4
  Using cached https://files.pythonhosted.org/packages/99/72/a420e22dc93416d30981e87a2318823ec09a9b18631369df0e7d9d164073/tensorflow-1.4.0-cp27-cp27mu-manylinux1_x86_64.whl
Collecting tensorflow-tensorboard<0.5.0,>=0.4.0rc1 (from tensorflow==1.4)
  Using cached https://files.pythonhosted.org/packages/64/cd/f3d14d441eb1c5228aaf7e12e8e94895ae73e9af50383e481610b34357bd/tensorflow_tensorboard-0.4.0-py2-none-any.whl
Collecting bleach==1.5.0 (from tensorflow-tensorboard<0.5.0,>=0.4.0rc1->tensorflow==1.4)
  Using cached https://files.pythonhosted.org/packages/33/70/86c5fec937ea4964184d4d6c4f0b9551564f821e1c3575907639036d9b90/bleach-1.5.0-py2.py3-none-any.whl
Collecting html5lib==0.9999999 (from tensorflow-tensorboard<0.5.0,>=0.4.0rc1->tensorflow==1.4)
Installing collected packages: html5lib, bleach, tensorflow-tensorboard, tensorflow
  Found existing installation: html5lib 1.1
    Uninstalling html5lib-1.1:
      Successfully uninstalled html5lib-1.1
  Found existin

apache-airflow 1.9.0 has requirement bleach==2.1.2, but you'll have bleach 1.5.0 which is incompatible.
pandas-gbq 0.3.0 has requirement google-cloud-bigquery>=0.28.0, but you'll have google-cloud-bigquery 0.25.0 which is incompatible.
googledatastore 7.0.1 has requirement httplib2<0.10,>=0.9.1, but you'll have httplib2 0.11.3 which is incompatible.
google-cloud-pubsub 0.26.0 has requirement google-cloud-core<0.26dev,>=0.25.0, but you'll have google-cloud-core 0.23.1 which is incompatible.
pandas-gbq 0.3.0 has requirement google-cloud-bigquery>=0.28.0, but you'll have google-cloud-bigquery 0.23.0 which is incompatible.


In [1]:
import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from sys import argv
import datetime

PROJECT_ID = 'project-id'
#SCHEMA = 'id:STRING,file_name:STRING,file_index:INTEGER,img_type:STRING,time_utc:DATETIME,episode_id:STRING,event_id:STRING,event_type:STRING,llcrnrlat:FLOAT,llcrnrlon:FLOAT,urcrnrlat:FLOAT,urcrnrlon:FLOAT,proj:STRING,size_x:INTEGER,size_y:INTEGER,height_m:FLOAT,width_m:FLOAT,data_min:FLOAT,data_max:FLOAT,pct_missing:FLOAT'

def dropcolums_Catalog(data):
  del data['minute_offsets']
  return data

def convert_types_Catalog(data):
  """Converts string values to their appropriate type."""
  data['height_m'] = float(data['height_m']) if 'height_m' in data else None
  data['width_m'] = float(data['width_m']) if 'width_m' in data else None
  return data

def dropcolums_Storm(data):
    del data['CZ_TYPE']
    del data['CZ_FIPS']
    del data['WFO']
    del data['BEGIN_DATE_TIME']
    del data['CZ_TIMEZONE']
    del data['END_DATE_TIME']
    del data['DAMAGE_PROPERTY']
    del data['DAMAGE_CROPS']
    del data['TOR_F_SCALE']
    del data['TOR_LENGTH']
    del data['TOR_WIDTH']
    del data['TOR_OTHER_WFO']
    del data['TOR_OTHER_CZ_STATE']
    del data['TOR_OTHER_CZ_FIPS']
    del data['TOR_OTHER_CZ_NAME']
    del data['END_LON']
    del data['EPISODE_NARRATIVE']
    del data['EVENT_NARRATIVE']
    del data['DATA_SOURCE']
    del data['BEGIN_RANGE']
    del data['BEGIN_AZIMUTH']
    del data['END_RANGE']
    del data['END_AZIMUTH']
    del data['BEGIN_LAT']
    del data['BEGIN_LON']
    del data['END_LAT']
    return data


def convert_types_Storm(data):
  """Converts string values to their appropriate type."""
  data['EVENT_ID'] = int(data['EVENT_ID']) if 'EVENT_ID' in data else None
  data['EPISODE_ID'] = int(data['EPISODE_ID']) if 'EPISODE_ID' in data else None
  #data['DEATHS'] = int(data['DEATHS']) if 'DEATHS' in data else None
  return data

query = """
SELECT s.ID,s.FILE_NAME,s.img_type,s.EVENT_ID,s.EVENT_TYPE,s.EPISODE_ID,d.BEGIN_YEARMONTH, 
d.END_YEARMONTH, d.STATE,d.DEATHS_DIRECT,d.FLOOD_CAUSE FROM `Sevir.catalog` s INNER JOIN  
`Sevir.StormEvents_details_2018` d 
ON cast(SUBSTR(s.id,2,15) as numeric) = d.EVENT_ID
    """

p = beam.Pipeline(options=PipelineOptions(flags=argv,
runner='DataflowRunner',
project='project-id',
job_name='sevir-pipeline'+ '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S'),
temp_location='gs://sevir-pipeline/temp',
staging_location='gs://sevir-pipeline/tempstg',
region='us-central1'))


(p | 'ReadingFromBucket-Catalog' >> beam.io.ReadFromText('gs://sevir-pipeline/CATALOG .csv', skip_header_lines =1)
| 'SplittingCSVData-Catalog' >> beam.Map(lambda x: x.split(','))
| 'FormattingToCSV-Catalog' >> beam.Map(lambda x: {"id": x[0], "file_name": x[1], "file_index": x[2],
"img_type": x[3],"time_utc": x[4], "minute_offsets": x[5], "episode_id": x[6],
"event_id": x[7], "event_type": x[8], "llcrnrlat": x[9], "llcrnrlon": x[10],
"urcrnrlat": x[11],"urcrnrlon": x[12], "proj": x[13], "size_x": x[14],
"size_y": x[15], "height_m": x[16], "width_m": x[17],"data_min": x[18],
"data_max": x[19], "pct_missing": x[20]})
| 'PreprocessedData-Catalog' >> beam.Map(convert_types_Catalog)
| 'DroppingBadColumns-Catalog' >> beam.Map(dropcolums_Catalog)
| 'WriteToBigQuery-Catalog' >> beam.io.WriteToBigQuery(
'{0}:Sevir.catalog'.format(PROJECT_ID),
schema='id:STRING,file_name:STRING,file_index:INTEGER,img_type:STRING,time_utc:DATETIME,episode_id:STRING,event_id:STRING,event_type:STRING,llcrnrlat:FLOAT,llcrnrlon:FLOAT,urcrnrlat:FLOAT,urcrnrlon:FLOAT,proj:STRING,size_x:INTEGER,size_y:INTEGER,height_m:FLOAT,width_m:FLOAT,data_min:FLOAT,data_max:FLOAT,pct_missing:FLOAT',write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
| 'ReadingFromBucket-Storm' >> beam.io.ReadFromText('gs://sevir-pipeline/StormEvents_details_2018.csv', skip_header_lines =1)
| 'SplittingCSVData-Storm' >> beam.Map(lambda x: x.split(','))
| 'FormattingToCSV-Storm' >> beam.Map(lambda x: {"BEGIN_YEARMONTH":x[0],"BEGIN_DAY":x[1],"BEGIN_TIME":x[2],"END_YEARMONTH":x[3],"END_DAY":x[4],"END_TIME":x[5],"EPISODE_ID":x[6],"EVENT_ID":x[7],"STATE":x[8],"STATE_FIPS":x[9],"YEAR":x[10],"MONTH_NAME":x[11],"EVENT_TYPE":x[12],"CZ_TYPE":x[13],"CZ_FIPS":x[14],"CZ_NAME":x[15],"WFO":x[16],"BEGIN_DATE_TIME":x[17],"CZ_TIMEZONE":x[18],"END_DATE_TIME":x[19],"INJURIES_DIRECT":x[20],"INJURIES_INDIRECT":x[21],"DEATHS_DIRECT":x[22],"DEATHS_INDIRECT":x[23],"DAMAGE_PROPERTY":x[24],"DAMAGE_CROPS":x[25],"SOURCE":x[26],"MAGNITUDE":x[27],"MAGNITUDE_TYPE":x[28],"FLOOD_CAUSE":x[29],"CATEGORY":x[30],"TOR_F_SCALE":x[31],"TOR_LENGTH":x[32],"TOR_WIDTH":x[33],"TOR_OTHER_WFO":x[34],"TOR_OTHER_CZ_STATE":x[35],"TOR_OTHER_CZ_FIPS":x[36],"TOR_OTHER_CZ_NAME":x[37],"BEGIN_RANGE":x[38],"BEGIN_AZIMUTH":x[39],"BEGIN_LOCATION":x[40],"END_RANGE":x[41],"END_AZIMUTH":x[42],"END_LOCATION":x[43],"BEGIN_LAT":x[44],"BEGIN_LON":x[45],"END_LAT":x[46],"END_LON":x[47],"EPISODE_NARRATIVE":x[48],"EVENT_NARRATIVE":x[49],"DATA_SOURCE":x[50]})
| 'PreprocessedData-Storm' >> beam.Map(convert_types_Storm)
| 'DroppingBadColumns-Storm' >> beam.Map(dropcolums_Storm)
| 'WriteToBigQuery-Storm' >> beam.io.WriteToBigQuery('{0}:Sevir.StormEvents_details_2018'.format(PROJECT_ID),
             schema='BEGIN_YEARMONTH:INTEGER,BEGIN_DAY:INTEGER,BEGIN_TIME:INTEGER,END_YEARMONTH:INTEGER,END_DAY:INTEGER,END_TIME:INTEGER,EPISODE_ID:INTEGER,EVENT_ID:INTEGER,STATE:STRING,STATE_FIPS:INTEGER,YEAR:INTEGER,MONTH_NAME:STRING,EVENT_TYPE:STRING,CZ_NAME:STRING,INJURIES_DIRECT:STRING,INJURIES_INDIRECT:STRING,DEATHS_DIRECT:STRING,DEATHS_INDIRECT:STRING,SOURCE:STRING,MAGNITUDE:STRING,MAGNITUDE_TYPE:STRING,FLOOD_CAUSE:STRING,CATEGORY:STRING,BEGIN_LOCATION:STRING,END_LOCATION:STRING',
             write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
 
| 'ReadingFromBucket-Storm2019' >> beam.io.ReadFromText('gs://sevir-pipeline/StormEvents_Details_2019.csv', skip_header_lines =1)
| 'SplittingCSVData-Storm2019' >> beam.Map(lambda x: x.split(','))
| 'FormattingToCSV-Storm2019' >> beam.Map(lambda x: {"BEGIN_YEARMONTH":x[0],"BEGIN_DAY":x[1],"BEGIN_TIME":x[2],"END_YEARMONTH":x[3],"END_DAY":x[4],"END_TIME":x[5],"EPISODE_ID":x[6],"EVENT_ID":x[7],"STATE":x[8],"STATE_FIPS":x[9],"YEAR":x[10],"MONTH_NAME":x[11],"EVENT_TYPE":x[12],"CZ_TYPE":x[13],"CZ_FIPS":x[14],"CZ_NAME":x[15],"WFO":x[16],"BEGIN_DATE_TIME":x[17],"CZ_TIMEZONE":x[18],"END_DATE_TIME":x[19],"INJURIES_DIRECT":x[20],"INJURIES_INDIRECT":x[21],"DEATHS_DIRECT":x[22],"DEATHS_INDIRECT":x[23],"DAMAGE_PROPERTY":x[24],"DAMAGE_CROPS":x[25],"SOURCE":x[26],"MAGNITUDE":x[27],"MAGNITUDE_TYPE":x[28],"FLOOD_CAUSE":x[29],"CATEGORY":x[30],"TOR_F_SCALE":x[31],"TOR_LENGTH":x[32],"TOR_WIDTH":x[33],"TOR_OTHER_WFO":x[34],"TOR_OTHER_CZ_STATE":x[35],"TOR_OTHER_CZ_FIPS":x[36],"TOR_OTHER_CZ_NAME":x[37],"BEGIN_RANGE":x[38],"BEGIN_AZIMUTH":x[39],"BEGIN_LOCATION":x[40],"END_RANGE":x[41],"END_AZIMUTH":x[42],"END_LOCATION":x[43],"BEGIN_LAT":x[44],"BEGIN_LON":x[45],"END_LAT":x[46],"END_LON":x[47],"EPISODE_NARRATIVE":x[48],"EVENT_NARRATIVE":x[49],"DATA_SOURCE":x[50]})
| 'PreprocessedData-Storm2019' >> beam.Map(convert_types_Storm)
| 'DroppingBadColumns-Storm2019' >> beam.Map(dropcolums_Storm)
| 'WriteToBigQuery-Storm2019' >> beam.io.WriteToBigQuery('{0}:Sevir.StormEvents_Details_2019'.format(PROJECT_ID),
             schema='BEGIN_YEARMONTH:INTEGER,BEGIN_DAY:INTEGER,BEGIN_TIME:INTEGER,END_YEARMONTH:INTEGER,END_DAY:INTEGER,END_TIME:INTEGER,EPISODE_ID:INTEGER,EVENT_ID:INTEGER,STATE:STRING,STATE_FIPS:INTEGER,YEAR:INTEGER,MONTH_NAME:STRING,EVENT_TYPE:STRING,CZ_NAME:STRING,INJURIES_DIRECT:STRING,INJURIES_INDIRECT:STRING,DEATHS_DIRECT:STRING,DEATHS_INDIRECT:STRING,SOURCE:STRING,MAGNITUDE:STRING,MAGNITUDE_TYPE:STRING,FLOOD_CAUSE:STRING,CATEGORY:STRING,BEGIN_LOCATION:STRING,END_LOCATION:STRING',
             write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
| 'FetchingSEVIRData' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True))
| 'WriteCombinedDataToBigQuery' >> beam.io.WriteToBigQuery(
           '{0}:Sevir.combined_data'.format(PROJECT_ID),schema='ID:STRING,FILE_NAME:STRING,img_type:STRING,EVENT_ID:STRING,EVENT_TYPE:STRING,EPISODE_ID:STRING,BEGIN_YEARMONTH:INTEGER,END_YEARMONTH:INTEGER,STATE:STRING,DEATHS_DIRECT:STRING,FLOOD_CAUSE:STRING',
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

result = p.run()


Using this argument will have no effect on the actual scopes for tokens
requested. These scopes are set at VM instance creation time and
can't be overridden in the request.

