Install the following packages before running the notebook

In [None]:
%%bash
pip install --upgrade tensorflow==1.4
pip install --ignore-installed --upgrade pytz==2018.4
pip uninstall -y google-cloud-dataflow
pip install --upgrade apache-beam[gcp]==2.12.0
pip install google-cloud-bigquery --upgrade

Create DataSet on BigQuery

In [None]:
%%bash
bq mk --dataset sevir-306302:sevirdataset

Creating Tables on BigQuery

In [None]:
%%bash
bq mk -t sevirdataset.catalogcsv \
id:STRING,file_name:STRING,file_index:INTEGER,img_type:STRING,time_utc:DATETIME,episode_id:STRING,event_id:STRING,event_type:STRING,llcrnrlat:FLOAT,llcrnrlon:FLOAT,urcrnrlat:FLOAT,urcrnrlon:FLOAT,proj:STRING,size_x:INTEGER,size_y:INTEGER,height_m:FLOAT,width_m:FLOAT,data_min:FLOAT,data_max:FLOAT,pct_missing:FLOAT
bq mk -t sevirdataset.storm_details \
BEGIN_YEARMONTH:INTEGER,BEGIN_DAY:INTEGER,BEGIN_TIME:INTEGER,END_YEARMONTH:INTEGER,END_DAY:INTEGER,END_TIME:INTEGER,EPISODE_ID:STRING,EVENT_ID:INTEGER,STATE:STRING,STATE_FIPS:INTEGER,YEAR:INTEGER,MONTH_NAME:STRING,EVENT_TYPE:STRING,CZ_TYPE:STRING,CZ_FIPS:INTEGER,CZ_NAME:STRING,WFO:STRING,BEGIN_DATE_TIME:STRING,CZ_TIMEZONE:STRING,END_DATE_TIME:STRING,INJURIES_DIRECT:STRING,INJURIES_INDIRECT:STRING,DEATHS_DIRECT:STRING,DEATHS_INDIRECT:STRING,DAMAGE_PROPERTY:STRING,DAMAGE_CROPS:STRING,SOURCE:STRING,MAGNITUDE:STRING,MAGNITUDE_TYPE:STRING,FLOOD_CAUSE:STRING,CATEGORY:STRING,BEGIN_RANGE:STRING,BEGIN_AZIMUTH:STRING,BEGIN_LOCATION:STRING,END_RANGE:STRING,END_AZIMUTH:STRING,END_LOCATION:STRING,BEGIN_LAT:STRING,BEGIN_LON:STRING,END_LAT:STRING,END_LON:STRING,EPISODE_NARRATIVE:STRING,EVENT_NARRATIVE:STRING,DATA_SOURCE:STRING

Pipeline to move data from cloud Storage to Bigquery

In [None]:
import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from sys import argv
PROJECT_ID = 'sevir-306302'
SCHEMACATALOG = 'id:STRING,file_name:STRING,file_index:INTEGER,img_type:STRING,time_utc:DATETIME,episode_id:STRING,event_id:STRING,event_type:STRING,llcrnrlat:FLOAT,llcrnrlon:FLOAT,urcrnrlat:FLOAT,urcrnrlon:FLOAT,proj:STRING,size_x:INTEGER,size_y:INTEGER,height_m:FLOAT,width_m:FLOAT,data_min:FLOAT,data_max:FLOAT,pct_missing:FLOAT'
SCHEMASTORM = 'BEGIN_YEARMONTH:INTEGER,BEGIN_DAY:INTEGER,BEGIN_TIME:INTEGER,END_YEARMONTH:INTEGER,END_DAY:INTEGER,END_TIME:INTEGER,EPISODE_ID:STRING,EVENT_ID:INTEGER,STATE:STRING,STATE_FIPS:INTEGER,YEAR:INTEGER,MONTH_NAME:STRING,EVENT_TYPE:STRING,CZ_TYPE:STRING,CZ_FIPS:INTEGER,CZ_NAME:STRING,WFO:STRING,BEGIN_DATE_TIME:STRING,CZ_TIMEZONE:STRING,END_DATE_TIME:STRING,INJURIES_DIRECT:STRING,INJURIES_INDIRECT:STRING,DEATHS_DIRECT:STRING,DEATHS_INDIRECT:STRING,DAMAGE_PROPERTY:STRING,DAMAGE_CROPS:STRING,SOURCE:STRING,MAGNITUDE:STRING,MAGNITUDE_TYPE:STRING,FLOOD_CAUSE:STRING,CATEGORY:STRING,BEGIN_RANGE:STRING,BEGIN_AZIMUTH:STRING,BEGIN_LOCATION:STRING,END_RANGE:STRING,END_AZIMUTH:STRING,END_LOCATION:STRING,BEGIN_LAT:STRING,BEGIN_LON:STRING,END_LAT:STRING,END_LON:STRING,EPISODE_NARRATIVE:STRING,EVENT_NARRATIVE:STRING,DATA_SOURCE:STRING'


def dropcolums(data):
    del data['minute_offsets']
    return data

def convert_types(data):
    """Converts string values to their appropriate type."""
    data['height_m'] = float(data['height_m']) if 'height_m' in data else None
    data['width_m'] = float(data['width_m']) if 'width_m' in data else None
    return data

def dropcolumsstorm(data):
    del data['TOR_F_SCALE'],
    del data['TOR_LENGTH'],
    del data['TOR_WIDTH'],
    del data['TOR_OTHER_WFO'],
    del data['TOR_OTHER_CZ_STATE'],
    del data['TOR_OTHER_CZ_FIPS'],
    del data['TOR_OTHER_CZ_NAME']
    return data

  
p = beam.Pipeline(options=PipelineOptions(flags=argv,
    runner='DataflowRunner',
    project='sevir-306302',
    job_name='catalog',
    temp_location='gs://sevir-306302/temp',
    staging_location='gs://sevir-306302/tempstg',
    region='us-central1'))
(p | 'ReadDataCatalog' >> beam.io.ReadFromText('gs://sevir-306302/Catalog/CATALOG.csv', skip_header_lines =1)
       | 'SplitCatalog' >> beam.Map(lambda x: x.split(','))
       | 'format to dictCatalog' >> beam.Map(lambda x: {"id": x[0], "file_name": x[1], "file_index": x[2], 
                                                 "img_type": x[3],"time_utc": x[4], "minute_offsets": x[5], "episode_id": x[6], 
                                                 "event_id": x[7], "event_type": x[8], "llcrnrlat": x[9], "llcrnrlon": x[10], 
                                                 "urcrnrlat": x[11],"urcrnrlon": x[12], "proj": x[13], "size_x": x[14], 
                                                 "size_y": x[15], "height_m": x[16], "width_m": x[17],"data_min": x[18], 
                                                 "data_max": x[19], "pct_missing": x[20]})
                                     
      # | 'DelIncompleteData' >> beam.Filter(discard_incomplete)
       | 'ConvertypesCatalog' >> beam.Map(convert_types)
       | 'DelUnwantedDataCatalog' >> beam.Map(dropcolums)

       | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           '{0}:sevirdataset.catalogcsv'.format(PROJECT_ID),
           schema=SCHEMACATALOG,
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))


(p | 'ReadDataStorm' >> beam.io.ReadFromText('gs://sevir-306302/Storm/StormEvents_details_new.csv', skip_header_lines =1)
       | 'SplitStormData' >> beam.Map(lambda x: x.split(','))
       | 'format to dict Storm' >> beam.Map(lambda x: {"BEGIN_YEARMONTH": x[0], "BEGIN_DAY": x[1], "BEGIN_TIME": x[2], 
                                                 "END_YEARMONTH": x[3],"END_DAY": x[4], "END_TIME": x[5], "EPISODE_ID": x[6], 
                                                 "EVENT_ID": x[7], "STATE": x[8], "STATE_FIPS": x[9], "YEAR": x[10], 
                                                 "MONTH_NAME": x[11],"EVENT_TYPE": x[12], "CZ_TYPE": x[13], "CZ_FIPS": x[14], 
                                                 "CZ_NAME": x[15], "WFO": x[16], "BEGIN_DATE_TIME": x[17],"CZ_TIMEZONE": x[18], 
                                                 "END_DATE_TIME": x[19], "INJURIES_DIRECT": x[20],
                                                 "INJURIES_INDIRECT": x[21], "DEATHS_DIRECT": x[22], "DEATHS_INDIRECT": x[23],"DAMAGE_PROPERTY": x[24],
                                                 "DAMAGE_CROPS": x[25], "SOURCE": x[26], "MAGNITUDE": x[27],"MAGNITUDE_TYPE": x[28],
                                                 "FLOOD_CAUSE": x[29], "CATEGORY": x[30], "TOR_F_SCALE": x[31],"TOR_LENGTH": x[32],
                                                 "TOR_WIDTH": x[33], "TOR_OTHER_WFO": x[34], "TOR_OTHER_CZ_STATE": x[35],"TOR_OTHER_CZ_FIPS": x[36],
                                                 "TOR_OTHER_CZ_NAME": x[37], "BEGIN_RANGE": x[38], "BEGIN_AZIMUTH": x[39],"BEGIN_LOCATION": x[40],
                                                 "END_RANGE": x[41], "END_AZIMUTH": x[42], "END_LOCATION": x[43],"BEGIN_LAT": x[44],
                                                 "BEGIN_LON": x[45], "END_LAT": x[46], "END_LON": x[47],"EPISODE_NARRATIVE": x[48],
                                                 "EVENT_NARRATIVE": x[49], "DATA_SOURCE": x[50]})
                                     
      # | 'DelIncompleteData' >> beam.Filter(discard_incomplete)
      # | 'Convertypes_storm' >> beam.Map(convert_types_storm)
       | 'DelUnwantedDataStorm' >> beam.Map(dropcolumsstorm)

       | 'WriteToBigQueryStorm' >> beam.io.WriteToBigQuery(
           '{0}:sevirdataset.storm_details'.format(PROJECT_ID),
           schema=SCHEMASTORM,
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
result = p.run()