# IMPORTANT! 
## Rationale for approach...

The data discovery and exploration phase of the capstone 
illuminated the complexity of applying multiple operations 
to a small dataset. 

The raw Atlantic Hurricane Database 2 (HURDAT2) is a multiline 
text file with ~55K records. By Spark standards, this is small.

As discovered, using Spark to transform the storm data 
seems to take disproportionately longer to 
complete relative to our other datasets, because of the 
excess overhead given the size of the cluster (cores and executors).

Upon consultation with real world Data Engineers,  I was reminded:

> **Good reasons to use Spark:**  
> You actually have big data.  
> You think your data might be big in the future, and need to be ready.  
> You have medium data where the Spark startup time is worth it when running locally because you will then use multiple cores.  

And advised to switch to Pandas for complex transformations.

Pivoted the solution; knowing I intend to create a final fact table by combining a large dataset with
the storms data. I optimize the storms dataset BEFORE loading to Spark. 

Storms data optimization using Pandas: 
- set headers  
- drop unneeded columns 
- data comes in as a multiline txt...
    - level 0 = storm id, storm name and count of the storm metadata records that follow  
    - level 1 = storm metadata records  
- 'flattened' the dataset (where storm id and storm name must be matched to n number of metadata records)  
- filter the dataset by named storms  
- decode a subset of fields (parse storm id, record identifier and storm type)  
- converted time to numeric  
- using the Saffir-Simpson Wind Scale categories as reference, map numeric categories to named storms
- strip coordinates of characters
- map coordinates to state code using geopy and an open source map API, where the named storm made landfall
- write csv to: 'home/workspace/source_weather/storms_data.csv'

In [1]:
import pandas as pd
import zipfile
from zipfile import ZipFile
import json
import os
import glob
import numpy as np

# Storm Data

## sourced: 
HURDAT2 description: https://www.aoml.noaa.gov/hrd/hurdat/Data_Storm.html \
HURDAT2 legend: https://www.aoml.noaa.gov/hrd/hurdat/hurdat2-format.pdf \
HURDAT2 dataset: https://www.nhc.noaa.gov/data/hurdat/hurdat2-1851-2021-041922.txt

In [2]:
## one time read of the raw source data into Udacity workspace

# import urllib.request 
# urllib.request.urlretrieve('https://www.nhc.noaa.gov/data/hurdat/hurdat2-1851-2021-041922.txt', '/home/workspace/source_weather/hurdat2-1851-2021-041922.txt')

In [3]:
# read file ('https://www.nhc.noaa.gov/data/hurdat/hurdat2-1851-2021-041922.txt')
# there is no header
# name columns

import pandas as pd
HURDAT2 = pd.read_csv (r'/home/workspace/source_weather/hurdat2-1851-2021-041922.txt', header=None,
                  names=['storm_id',
                        'storm_name',
                        'associated_records',
                        'storm_date',
                        'storm_time',
                        'rec_identifier',
                        'storm_type',
                        'latitude',
                        'longitude',
                        'max_sustained_wind(kt)',
                        'minimum_pressure(mbar)',
                        "34_kt_wind_radii_maximum_extent_ne_quadrant_(in_nautical_miles)",
                        "34_kt_wind_radii_maximum_extent_se_quadrant_(in_nautical_miles)",
                        "34_kt_wind_radii_maximum_extent_sw_quadrant_(in_nautical_miles)",
                        "34_kt_wind_radii_maximum_extent_nw_quadrant_(in_nautical_miles)",
                        "50_kt_wind_radii_maximum_extent_ne_quadrant_(in_nautical_miles)",
                        "50_kt_wind_radii_maximum_extent_se_quadrant_(in_nautical_miles)",
                        "50_kt_wind_radii_maximum_extent_sw_quadrant_(in_nautical_miles)",
                        "50_kt_wind_radii_maximum_extent_nw_quadrant_(in_nautical_miles)",
                        "64_kt_wind_radii_maximum_extent_ne_quadrant_(in_nautical_miles)",
                        "64_kt_wind_radii_maximum_extent_se_quadrant_(in_nautical_miles)",
                        "64_kt_wind_radii_maximum_extent_sw_quadrant_(in_nautical_miles)",
                        "64_kt_wind_radii_maximum_extent_nw_quadrant_(in_nautical_miles)"
                        ])

In [4]:
print(HURDAT2.shape)
# HURDAT2.head(18)

(55437, 23)


## remove leading and trailing characters in headers

In [5]:
HURDAT2_obj = HURDAT2.select_dtypes(['object'])
HURDAT2[HURDAT2_obj.columns] = HURDAT2_obj.apply(lambda x: x.str.strip())

## remove unneeded columns

In [6]:
# check headers

HURDAT2.columns

Index(['storm_id', 'storm_name', 'associated_records', 'storm_date',
       'storm_time', 'rec_identifier', 'storm_type', 'latitude', 'longitude',
       'max_sustained_wind(kt)', 'minimum_pressure(mbar)',
       '34_kt_wind_radii_maximum_extent_ne_quadrant_(in_nautical_miles)',
       '34_kt_wind_radii_maximum_extent_se_quadrant_(in_nautical_miles)',
       '34_kt_wind_radii_maximum_extent_sw_quadrant_(in_nautical_miles)',
       '34_kt_wind_radii_maximum_extent_nw_quadrant_(in_nautical_miles)',
       '50_kt_wind_radii_maximum_extent_ne_quadrant_(in_nautical_miles)',
       '50_kt_wind_radii_maximum_extent_se_quadrant_(in_nautical_miles)',
       '50_kt_wind_radii_maximum_extent_sw_quadrant_(in_nautical_miles)',
       '50_kt_wind_radii_maximum_extent_nw_quadrant_(in_nautical_miles)',
       '64_kt_wind_radii_maximum_extent_ne_quadrant_(in_nautical_miles)',
       '64_kt_wind_radii_maximum_extent_se_quadrant_(in_nautical_miles)',
       '64_kt_wind_radii_maximum_extent_sw_quadrant_(i

In [7]:
"""
The raw data is multiline... 
level0 data aligns under columns 0-2.
level1 data aligns under columns 3-10.

For this project, not all columns are needed.
Drop the columns that don't apply being 
mindful of the headers. 
"""

HURDAT2.drop(HURDAT2.columns[8:23], axis=1, inplace=True)

In [8]:
# change to lowercase

HURDAT2['storm_name'] = HURDAT2['storm_name'].str.lower()

In [9]:
# check df shape

HURDAT2.shape

(55437, 8)

## storm identifiers level 0

In [10]:
"""
- filter for records where the storm_id = AL (atlantic basin)

raw data is multiline...
- focus here is on level0 data
- storm_id is the unqiue identifier
- storm_name is the name assigned to the storm
- associated_records are metadata records (level1)
that belong to the unique storm_id
"""

storm_level0_df = HURDAT2[HURDAT2['storm_id'].str.contains('AL*')]

In [11]:
storm_level0_df = storm_level0_df[['storm_id', 'storm_name', 'associated_records']]

In [12]:
storm_level0_df.head(2)

Unnamed: 0,storm_id,storm_name,associated_records
0,AL011851,unnamed,14
15,AL021851,unnamed,1


# flatten storm dataframe

### create a list of dfs containing metadata records per storm_id

In [13]:
# n number of records following the storm_id belong to that id
# must convert str to int

storm_num_ar = storm_level0_df['associated_records'].astype(int) 

In [14]:
"""
under each storm_id in level0 
is n number of metadata level1 records 
(associated_records).

using a loop... 
create a list of dataframes using the associated  \
record count for each storm identifier matched on \
index to the HURDAT2 data.
"""

list_dfs = []
for item in storm_num_ar.index:
    # using the index, gets the number of records \
    # associated with each storm
    associated_row_cnt = storm_num_ar.loc[item]  
    
    # using the index, selects the records associated \
    # which each storm and transforms those rows into \
    # new dfs
    new_df = pd.DataFrame(HURDAT2.loc[item+1:item + associated_row_cnt])
    
    # the headers of each new df are renamed
    new_df = new_df.rename(columns={
            'storm_id': 'storm_date',
            'storm_name': 'storm_time',
            'associated_records': 'rec_identifier',
            'storm_date': 'storm_type',
            'storm_time': 'latitude',
            'rec_identifier': 'longitude',
            'storm_type': 'max_sustained_wind(kt)',
            'latitude': 'minimum_pressure(mbar)',
    })
    
#     print(new_df)
    
    # add new columns, assign values from HURDAT2 data
    new_df['storm_id'] = HURDAT2['storm_id'][item]
    new_df['storm_name'] = HURDAT2['storm_name'][item]
    new_df['associated_records'] = HURDAT2['associated_records'][item]   

    # reorder columns
    new_df = new_df[['storm_id',
                    'storm_name',
                    'associated_records',
                    'storm_date',
                    'storm_time',
                    'rec_identifier',
                    'storm_type',
                    'latitude',
                    'longitude',
                    'max_sustained_wind(kt)',
                    'minimum_pressure(mbar)'
    ]]
    
    

    # output a list of dfs
    list_dfs.append(new_df)

In [15]:
# ensure the record count is accurate to the full set
print(len(HURDAT2))

55437


### concatentate all dfs into a single df

In [16]:
# concat the list of dfs into a single df \
# (concat since the indexes match)
# this 'flattens' the dataset

flattenStorms_df = pd.concat(list_dfs )
print(flattenStorms_df.shape)
flattenStorms_df.tail(5)

(53501, 11)


Unnamed: 0,storm_id,storm_name,associated_records,storm_date,storm_time,rec_identifier,storm_type,latitude,longitude,max_sustained_wind(kt),minimum_pressure(mbar)
55432,AL212021,wanda,54,20211107,0,,TS,37.4N,37.4W,35.0,1003.0
55433,AL212021,wanda,54,20211107,600,,TS,38.1N,36.4W,35.0,1004.0
55434,AL212021,wanda,54,20211107,1200,,LO,39.2N,34.9W,35.0,1006.0
55435,AL212021,wanda,54,20211107,1800,,LO,40.9N,32.8W,40.0,1006.0
55436,AL212021,wanda,54,20211108,0,,LO,43.2N,29.7W,40.0,1006.0


### filter for: named storms

In [18]:
"""
since we're looking for a possible 
relationship with baby name trends, 
storm data must be filtered by named storms
"""

namedStorms_df = flattenStorms_df[flattenStorms_df['storm_name']!= 'unnamed']

In [19]:
namedStorms_df['storm_name'].drop_duplicates().describe()

count       311
unique      311
top       irene
freq          1
Name: storm_name, dtype: object

### cleanup: time

In [35]:
def convertTime(df, item):    
    """
    transforms time to datetime
    in a series
    """
    df[item] = df[item].apply(lambda x:pd.to_datetime(str(x), format='%H%M')).dt.time
    return df

In [36]:
stormsCSV = namedStorms_df.copy()
convertTime(stormsCSV, 'storm_time')

Unnamed: 0,storm_id,storm_name,associated_records,storm_date,storm_time,rec_identifier,storm_type,latitude,longitude,max_sustained_wind(kt),minimum_pressure(mbar)
22686,AL011950,able,51,19500812,00:00:00,,TS,17.1N,55.5W,35.0,-999.0
22687,AL011950,able,51,19500812,06:00:00,,TS,17.7N,56.3W,40.0,-999.0
22688,AL011950,able,51,19500812,12:00:00,,TS,18.2N,57.4W,45.0,-999.0
22689,AL011950,able,51,19500812,18:00:00,,TS,19.0N,58.6W,50.0,-999.0
22690,AL011950,able,51,19500813,00:00:00,,TS,20.0N,60.0W,50.0,-999.0
22691,AL011950,able,51,19500813,06:00:00,,TS,20.7N,61.1W,50.0,-999.0
22692,AL011950,able,51,19500813,12:00:00,,TS,21.3N,62.2W,55.0,-999.0
22693,AL011950,able,51,19500813,18:00:00,,TS,22.0N,63.2W,55.0,997.0
22694,AL011950,able,51,19500814,00:00:00,,TS,22.7N,63.8W,60.0,995.0
22695,AL011950,able,51,19500814,06:00:00,,TS,23.1N,64.6W,60.0,-999.0


### parse: date

In [37]:
def parseDate(df, date):
    """
    parses the storm_date into a series 
    for date (day, month and year, too, if needed)
    """
    df[date] = df[date].apply(lambda x:pd.to_datetime(str(x)))
#     df[day] = df[date].dt.day
#     df[month] = df[date].dt.month
#     df[year] = df[date].dt.year
    return df

In [38]:
parseDate(stormsCSV, 'storm_date')

Unnamed: 0,storm_id,storm_name,associated_records,storm_date,storm_time,rec_identifier,storm_type,latitude,longitude,max_sustained_wind(kt),minimum_pressure(mbar)
22686,AL011950,able,51,1950-08-12,00:00:00,,TS,17.1N,55.5W,35.0,-999.0
22687,AL011950,able,51,1950-08-12,06:00:00,,TS,17.7N,56.3W,40.0,-999.0
22688,AL011950,able,51,1950-08-12,12:00:00,,TS,18.2N,57.4W,45.0,-999.0
22689,AL011950,able,51,1950-08-12,18:00:00,,TS,19.0N,58.6W,50.0,-999.0
22690,AL011950,able,51,1950-08-13,00:00:00,,TS,20.0N,60.0W,50.0,-999.0
22691,AL011950,able,51,1950-08-13,06:00:00,,TS,20.7N,61.1W,50.0,-999.0
22692,AL011950,able,51,1950-08-13,12:00:00,,TS,21.3N,62.2W,55.0,-999.0
22693,AL011950,able,51,1950-08-13,18:00:00,,TS,22.0N,63.2W,55.0,997.0
22694,AL011950,able,51,1950-08-14,00:00:00,,TS,22.7N,63.8W,60.0,995.0
22695,AL011950,able,51,1950-08-14,06:00:00,,TS,23.1N,64.6W,60.0,-999.0


### decode: storm_type

In [39]:
def decodeFromDict(abrvs, decode, df, item):    
    """
    using a list of abbrevations and a list
    of decodes, zips together the two lists
    into an ordered dict, then maps the 
    decoded values to a new series
    """
    from collections import OrderedDict as od
    
    # returns numpy object converted to a list
    x = od(zip(abrvs,decode))

    for abrv in abrvs: 
        df[item] = df[item].map(x)
        return df

In [40]:
# look to ensure the abbrv order matches the decode str

storm_type_abrv = stormsCSV["storm_type"].drop_duplicates().tolist()
print(storm_type_abrv)

decode = ['tropical storm',
          'hurricane',
          'extratropical storm', 
          'tropical depression', 
          'disturbance(any)',
          'subtropical depression', 
          'subtropical storm',  
          'low - not tropical-subtropical-or-extratropical',
          'tropical wave']

decodeFromDict(storm_type_abrv, decode, stormsCSV, 'storm_type')

['TS', 'HU', 'EX', 'TD', 'DB', 'SD', 'SS', 'LO', 'WV']


Unnamed: 0,storm_id,storm_name,associated_records,storm_date,storm_time,rec_identifier,storm_type,latitude,longitude,max_sustained_wind(kt),minimum_pressure(mbar)
22686,AL011950,able,51,1950-08-12,00:00:00,,tropical storm,17.1N,55.5W,35.0,-999.0
22687,AL011950,able,51,1950-08-12,06:00:00,,tropical storm,17.7N,56.3W,40.0,-999.0
22688,AL011950,able,51,1950-08-12,12:00:00,,tropical storm,18.2N,57.4W,45.0,-999.0
22689,AL011950,able,51,1950-08-12,18:00:00,,tropical storm,19.0N,58.6W,50.0,-999.0
22690,AL011950,able,51,1950-08-13,00:00:00,,tropical storm,20.0N,60.0W,50.0,-999.0
22691,AL011950,able,51,1950-08-13,06:00:00,,tropical storm,20.7N,61.1W,50.0,-999.0
22692,AL011950,able,51,1950-08-13,12:00:00,,tropical storm,21.3N,62.2W,55.0,-999.0
22693,AL011950,able,51,1950-08-13,18:00:00,,tropical storm,22.0N,63.2W,55.0,997.0
22694,AL011950,able,51,1950-08-14,00:00:00,,tropical storm,22.7N,63.8W,60.0,995.0
22695,AL011950,able,51,1950-08-14,06:00:00,,tropical storm,23.1N,64.6W,60.0,-999.0


In [41]:
# Emily is the canary in the coalmine for decoding the storm_type \
# if the order of the abbrv to decode is off, this test doesn't work
# if the date conversion doesn't work, this test doesn't work
# if this is blank, there is an issue

stormsCSV[(stormsCSV['storm_name']=='emily') \
               & (stormsCSV['storm_type']=='hurricane') \
               \
#                # storm_year as str
               & (stormsCSV['storm_date']== '1987-09-22') \
               & (stormsCSV['max_sustained_wind(kt)'] > 100)\
              ]

Unnamed: 0,storm_id,storm_name,associated_records,storm_date,storm_time,rec_identifier,storm_type,latitude,longitude,max_sustained_wind(kt),minimum_pressure(mbar)
38220,AL121987,emily,30,1987-09-22,18:00:00,,hurricane,16.7N,69.1W,110.0,958.0


### decode: rec_identifier

In [42]:
namedStorms_df['rec_identifier'].unique()

array(['', 'L', 'R', 'I', 'P', 'T', 'W', 'C', 'S', 'G'], dtype=object)

In [43]:
# look to see if the array order matches the decode str

storm_rec_identifier_abrv = stormsCSV["rec_identifier"].drop_duplicates().tolist()

decode = ['',
          'landfall',
          'additional detail on the intensity when rapid changes are underway', 
          'intensity peak', 
          'minimum in central pressure',
          'additional detail on the position',
          'maximum sustained wind speed ', 
          'closest approach to a coast',  
          'status change in system',
          'genesis'
         ]

decodeFromDict(storm_rec_identifier_abrv, decode, stormsCSV, 'rec_identifier')

Unnamed: 0,storm_id,storm_name,associated_records,storm_date,storm_time,rec_identifier,storm_type,latitude,longitude,max_sustained_wind(kt),minimum_pressure(mbar)
22686,AL011950,able,51,1950-08-12,00:00:00,,tropical storm,17.1N,55.5W,35.0,-999.0
22687,AL011950,able,51,1950-08-12,06:00:00,,tropical storm,17.7N,56.3W,40.0,-999.0
22688,AL011950,able,51,1950-08-12,12:00:00,,tropical storm,18.2N,57.4W,45.0,-999.0
22689,AL011950,able,51,1950-08-12,18:00:00,,tropical storm,19.0N,58.6W,50.0,-999.0
22690,AL011950,able,51,1950-08-13,00:00:00,,tropical storm,20.0N,60.0W,50.0,-999.0
22691,AL011950,able,51,1950-08-13,06:00:00,,tropical storm,20.7N,61.1W,50.0,-999.0
22692,AL011950,able,51,1950-08-13,12:00:00,,tropical storm,21.3N,62.2W,55.0,-999.0
22693,AL011950,able,51,1950-08-13,18:00:00,,tropical storm,22.0N,63.2W,55.0,997.0
22694,AL011950,able,51,1950-08-14,00:00:00,,tropical storm,22.7N,63.8W,60.0,995.0
22695,AL011950,able,51,1950-08-14,06:00:00,,tropical storm,23.1N,64.6W,60.0,-999.0


## reference: saffir-simpson wind scale

In [44]:
"""
NURDAT2 data contains no reference to
category. In order to identify category, 
the Saffir_Simpson Wind Scale must be 
referenced. The max_sustained_wind(kt)
can be measured against the min-max of 
each category's range to set the 
category, where the storm_type is 
hurricane
"""

data = [{'category': 1, 
         'sustained_wind(kt)': '64-82', 
         'max_sustained_wind(kt)': 82, 
         'min_sustained_wind(kt)': 64,
         'sustained_wind(mph)': '74-95', 
         'brief_damage_description': \
         'Power outages that could last a few to several days.'},
        
       {'category': 2, 
        'sustained_wind(kt)': '83-95', 
        'max_sustained_wind(kt)': 95, 
        'min_sustained_wind(kt)': 83,
        'sustained_wind(mph)': '96-110', 
        'brief_damage_description': \
        'Near-total power loss is expected \
        with outages that could last from several days to weeks.'},
        
       {'category': 3, 
        'sustained_wind(kt)': '96-112', 
        'max_sustained_wind(kt)': 112, 
        'min_sustained_wind(kt)': 96,
        'sustained_wind(mph)': '111-129', 
        'brief_damage_description': \
        'Electricity and water will be \
        unavailable for several days to weeks after the storm passes.'},
        
       {'category': 4,
        'sustained_wind(kt)': '113-136', 
        'max_sustained_wind(kt)': 136, 
        'min_sustained_wind(kt)': 113,
        'sustained_wind(mph)': '130-156', 
        'brief_damage_description': \
        'Catastrophic damage will occur; most of \
        the area will be uninhabitable for weeks or months.'},
        
       {'category': 5,
        'sustained_wind(kt)': '137+', 
        'min_sustained_wind(kt)': 137, 
        'sustained_wind(mph)': '157+',
        'brief_damage_description': \
        'Catastrophic damage will occur; most of the \
        area will be uninhabitable for weeks or months.'}]


saffir_simpson_scale = pd.DataFrame(data, 
                                    columns=['category', 
                                             'min_sustained_wind(kt)',
                                             'max_sustained_wind(kt)',  
                                             'sustained_wind(kt)',
                                             'brief_damage_description',]) \
                                             .fillna(0)

saffir_simpson_scale['max_sustained_wind(kt)'] = saffir_simpson_scale['max_sustained_wind(kt)'].astype(int)
saffir_simpson_scale['max_sustained_wind(kt)']= np.round(saffir_simpson_scale['max_sustained_wind(kt)'], decimals=0)
# saffir_simpson_scale

### category assignment

In [45]:
def categoryAssignment_HU(df, storm_type_item, max_kts_item):
    """
    determines whether the max_sustained_wind(kt) 
    fall into a saffir-simpson wind scale 
    category and assigns the numeric category if the
    record is a hurricane.    
    """
    
    conditions = [
    (
        (df[storm_type_item] != 'hurricane')
    ),
    (
        (df[storm_type_item] == 'hurricane') &
        (df[max_kts_item] < saffir_simpson_scale['min_sustained_wind(kt)'][0])
    ),
    (
        (df[storm_type_item] == 'hurricane') &
        (df[max_kts_item] <= saffir_simpson_scale['max_sustained_wind(kt)'][0]) &
        (df[max_kts_item] > saffir_simpson_scale['min_sustained_wind(kt)'][0])
    ),
    (
        (df[storm_type_item] == 'hurricane') &
        ((df[max_kts_item] <= saffir_simpson_scale['max_sustained_wind(kt)'][1]) & 
        (df[max_kts_item] > saffir_simpson_scale['min_sustained_wind(kt)'][1]))
    ),

    (
        (df[storm_type_item] == 'hurricane') &
        ((df[max_kts_item] <= saffir_simpson_scale['max_sustained_wind(kt)'][2]) &
        (df[max_kts_item] > saffir_simpson_scale['min_sustained_wind(kt)'][2]))
    ),

    (
        (df[storm_type_item] == 'hurricane') &
        ((df[max_kts_item] <= saffir_simpson_scale['max_sustained_wind(kt)'][3]) &
        (df[max_kts_item] > saffir_simpson_scale['min_sustained_wind(kt)'][3]))
    ),
    (
        (df[storm_type_item] == 'hurricane') &
        (df[max_kts_item] >= saffir_simpson_scale['min_sustained_wind(kt)'][4])
    )
    ]

    choices = [df[storm_type_item],
               'not categorized',
              saffir_simpson_scale['category'][0],
              saffir_simpson_scale['category'][1],
              saffir_simpson_scale['category'][2],
              saffir_simpson_scale['category'][3],
              saffir_simpson_scale['category'][4]]

    df['category'] = np.select(conditions, choices)
    return df

In [46]:
stormsCSV = stormsCSV.copy()
categoryAssignment_HU(stormsCSV, 'storm_type', 'max_sustained_wind(kt)')

Unnamed: 0,storm_id,storm_name,associated_records,storm_date,storm_time,rec_identifier,storm_type,latitude,longitude,max_sustained_wind(kt),minimum_pressure(mbar),category
22686,AL011950,able,51,1950-08-12,00:00:00,,tropical storm,17.1N,55.5W,35.0,-999.0,tropical storm
22687,AL011950,able,51,1950-08-12,06:00:00,,tropical storm,17.7N,56.3W,40.0,-999.0,tropical storm
22688,AL011950,able,51,1950-08-12,12:00:00,,tropical storm,18.2N,57.4W,45.0,-999.0,tropical storm
22689,AL011950,able,51,1950-08-12,18:00:00,,tropical storm,19.0N,58.6W,50.0,-999.0,tropical storm
22690,AL011950,able,51,1950-08-13,00:00:00,,tropical storm,20.0N,60.0W,50.0,-999.0,tropical storm
22691,AL011950,able,51,1950-08-13,06:00:00,,tropical storm,20.7N,61.1W,50.0,-999.0,tropical storm
22692,AL011950,able,51,1950-08-13,12:00:00,,tropical storm,21.3N,62.2W,55.0,-999.0,tropical storm
22693,AL011950,able,51,1950-08-13,18:00:00,,tropical storm,22.0N,63.2W,55.0,997.0,tropical storm
22694,AL011950,able,51,1950-08-14,00:00:00,,tropical storm,22.7N,63.8W,60.0,995.0,tropical storm
22695,AL011950,able,51,1950-08-14,06:00:00,,tropical storm,23.1N,64.6W,60.0,-999.0,tropical storm


In [58]:
# check unique values

stormsCSV['category'].drop_duplicates()

22686                                     tropical storm
22697                                                  1
22705                                                  2
22708                                                  3
22727                                extratropical storm
22738                                tropical depression
22865                                                  4
24115                                                  5
25992                                   disturbance(any)
26116                                    not categorized
28851                             subtropical depression
28854                                  subtropical storm
29315    low - not tropical-subtropical-or-extratropical
36124                                      tropical wave
Name: category, dtype: object

In [60]:
# check emily as a use case

stormsCSV[ \
        (
        (stormsCSV['storm_type'] == 'hurricane') &  \
        (stormsCSV['max_sustained_wind(kt)'] > 64) \
        ) & \
         (stormsCSV['storm_name'] == 'emily')
         ]

Unnamed: 0,storm_id,storm_name,associated_records,storm_date,storm_time,rec_identifier,storm_type,latitude,longitude,max_sustained_wind(kt),minimum_pressure(mbar),category
36209,AL131981,emily,47,1981-09-04,00:00:00,,hurricane,34.6N,63.6W,65.0,976.0,1
36210,AL131981,emily,47,1981-09-04,06:00:00,,hurricane,35.3N,62.7W,70.0,974.0,1
36211,AL131981,emily,47,1981-09-04,12:00:00,,hurricane,36.2N,61.9W,70.0,972.0,1
36212,AL131981,emily,47,1981-09-04,18:00:00,,hurricane,37.1N,61.2W,70.0,971.0,1
36213,AL131981,emily,47,1981-09-05,00:00:00,,hurricane,38.2N,60.9W,75.0,970.0,1
36214,AL131981,emily,47,1981-09-05,06:00:00,,hurricane,38.6N,60.8W,75.0,968.0,1
36215,AL131981,emily,47,1981-09-05,12:00:00,,hurricane,39.0N,60.8W,75.0,967.0,1
36216,AL131981,emily,47,1981-09-05,18:00:00,,hurricane,39.4N,59.9W,80.0,966.0,1
36217,AL131981,emily,47,1981-09-06,00:00:00,,hurricane,39.9N,59.0W,80.0,967.0,1
36218,AL131981,emily,47,1981-09-06,06:00:00,,hurricane,40.3N,58.4W,80.0,968.0,1


### cleanup: coordinates

In [61]:
def transformCoordinates(df, lat_item, long_item ):
    """
    removes char from lat and long 
    converts lat and long to int
    converts S and W to negative int
    zips transformed lat and long into
    a list of coordinate tuples
    """
    lat_n = pd.to_numeric((df[lat_item].str[:-1]), errors='coerce')
    lat_s = pd.to_numeric((df[lat_item].str[:-1]), errors='coerce')*-1
    long_e = pd.to_numeric((df[long_item].str[:-1]), errors='coerce')
    long_w = pd.to_numeric((df[long_item].str[:-1]), errors='coerce')*-1

    df[lat_item] = np.where(len(df[lat_item].str[-1] =='N'), lat_n, lat_s)  
    df[long_item] = np.where(len(df[long_item].str[-1] =='W'), long_w, long_e)
    df['coordinates'] = list(zip(df[lat_item], df[long_item]))
    return df

In [62]:
transformCoordinates(stormsCSV, 'latitude', 'longitude')

Unnamed: 0,storm_id,storm_name,associated_records,storm_date,storm_time,rec_identifier,storm_type,latitude,longitude,max_sustained_wind(kt),minimum_pressure(mbar),category,coordinates
22686,AL011950,able,51,1950-08-12,00:00:00,,tropical storm,17.1,-55.5,35.0,-999.0,tropical storm,"(17.1, -55.5)"
22687,AL011950,able,51,1950-08-12,06:00:00,,tropical storm,17.7,-56.3,40.0,-999.0,tropical storm,"(17.7, -56.3)"
22688,AL011950,able,51,1950-08-12,12:00:00,,tropical storm,18.2,-57.4,45.0,-999.0,tropical storm,"(18.2, -57.4)"
22689,AL011950,able,51,1950-08-12,18:00:00,,tropical storm,19.0,-58.6,50.0,-999.0,tropical storm,"(19.0, -58.6)"
22690,AL011950,able,51,1950-08-13,00:00:00,,tropical storm,20.0,-60.0,50.0,-999.0,tropical storm,"(20.0, -60.0)"
22691,AL011950,able,51,1950-08-13,06:00:00,,tropical storm,20.7,-61.1,50.0,-999.0,tropical storm,"(20.7, -61.1)"
22692,AL011950,able,51,1950-08-13,12:00:00,,tropical storm,21.3,-62.2,55.0,-999.0,tropical storm,"(21.3, -62.2)"
22693,AL011950,able,51,1950-08-13,18:00:00,,tropical storm,22.0,-63.2,55.0,997.0,tropical storm,"(22.0, -63.2)"
22694,AL011950,able,51,1950-08-14,00:00:00,,tropical storm,22.7,-63.8,60.0,995.0,tropical storm,"(22.7, -63.8)"
22695,AL011950,able,51,1950-08-14,06:00:00,,tropical storm,23.1,-64.6,60.0,-999.0,tropical storm,"(23.1, -64.6)"


## reference: US states

51 records...  

json file that contains the state name, 
state abbrevation and the state code for all
50 US statues  

Used as a reference table to map coordinates

In [63]:
# https://worldpopulationreview.com/states/state-abbreviations

"""
reads in json of state abbrevations for reference
drops unneeded columns
converts to a df
"""

with open('/home/workspace/source_states/states.json') as json_file:
    states_json_data = json.load(json_file)
    for item in states_json_data:
        del item['Abbrev']

state_ref = pd.DataFrame(states_json_data).rename(str.lower, axis='columns').reset_index(drop = True)

state_ref['state_id'] = state_ref.index

state_ref = state_ref[['state_id',
                      'code',
                      'state']]

state_ref.head(5)

Unnamed: 0,state_id,code,state
0,0,AL,Alabama
1,1,AK,Alaska
2,2,AZ,Arizona
3,3,AR,Arkansas
4,4,CA,California


## map coordinates to state

In [66]:
"""
this transformation was made possible 
through collaboration with:
Jasmine Cawley and Jason Petruno.

relies on geopy and an open source map API, to get
the state code from an address by doing a reverse lookup 
iterating through tuples of storm coordinates where
the storm made landfall, then maps that state code
to the list of 'states we care about' from a dict of
states made from the state reference table, then inserts
a new series where the records match on index. 

expect 5-10 minutes to run. 
"""

! pip install pygeocoder
! pip install geopy

def geolocCoordinates_toState(df):
    import certifi
    import ssl
    from geopy.geocoders import Nominatim
    import geopy

    ctx = ssl.create_default_context(cafile=certifi.where())
    geopy.geocoders.options.default_ssl_context = ctx
    geolocator = Nominatim(user_agent = 'http', timeout=None)

    state_withCode = dict(zip(state_ref['state'], state_ref['code']))
    states_we_care_about = state_withCode.keys()

    storm_state_code = None
    no_state = 0
    data = []

    # namedStorms_df[storm_state] = ""
    df['storm_state_code'] = ""

    df.reset_index()
    for index, row in df.iterrows():

        # If the storm datapoint indicates landfall
        if row['rec_identifier'] == 'landfall':
#         if row['storm_type'] == 'hurricane':

            #Get a location object fort this row's lat//lng 
            location = geolocator.reverse(row['coordinates'])

            # defensive coding! 
            if location is not None:
                storm_state = location.raw.get('address').get('state')
    #             print(f"{storm_state}")

                # make sure we're in the 50 states so we can get the state's code, otherwise keep going
                if storm_state not in states_we_care_about:
                    no_state +=1
                    continue
                else:
                    storm_state_code = state_withCode[storm_state]

                    # now we can add the state and the code back into the data frame
        #             namedStorms_df.loc[[index],[storm_state]] = storm_state
                    df.loc[[index],['storm_state_code']] = storm_state_code
        else:
            no_state +=1
    #         print('Not applicable to the US')

Collecting pygeocoder
  Downloading https://files.pythonhosted.org/packages/3b/79/2cf3a4dfe54705bbf07cbb25940078dfa595608aa4ecb9f0aaaae9faba08/pygeocoder-1.2.5.tar.gz
Building wheels for collected packages: pygeocoder
  Running setup.py bdist_wheel for pygeocoder ... [?25ldone
[?25h  Stored in directory: /root/.cache/pip/wheels/7c/4c/00/d05c66c4af5411c554c91b8079732c8a0359c2226fb8c01031
Successfully built pygeocoder
Installing collected packages: pygeocoder
Successfully installed pygeocoder-1.2.5
Collecting geopy
[?25l  Downloading https://files.pythonhosted.org/packages/e1/e1/45f25e3d3acf26782888f847de7c958a2807a039210fb1016cc3fb9555c4/geopy-2.2.0-py3-none-any.whl (118kB)
[K    100% |████████████████████████████████| 122kB 4.8MB/s ta 0:00:01
[?25hCollecting geographiclib<2,>=1.49 (from geopy)
  Downloading https://files.pythonhosted.org/packages/df/60/d1d4c4944f9726228faa80fbe2206c8ddfd9757791b2de2facb8818c5d74/geographiclib-1.52-py3-none-any.whl
Installing collected packages: ge

In [67]:
# # takes a bit but it works...
# # stormsCSV is the dataset being written to csv for use in Spark

geolocCoordinates_toState(stormsCSV)



In [68]:
# check

stormsCSV.shape

(26634, 14)

In [69]:
# # write semi-transformed storm data to csv 
# # ONLY NEEDS TO BE RUN ONCE.
# once uploaded to the workspace, it will be read into S3 with the other sources

stormsCSV.to_csv('source_weather/storms_data.csv', encoding='utf-8', index=False)