In [1]:
import os
import pandas as pd
from google.cloud import bigquery
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "../bigquery_creds.json"
client = bigquery.Client()         # Start the BigQuery Client
# Input your Query Syntax here; You may try it first at https://console.cloud.google.com/bigquery
QUERY = ("""
    SELECT *
    FROM `bigquery-public-data.noaa_historic_severe_storms.storms_*`
    WHERE event_end_time > cast('2020-01-01' as datetime)
    limit 10000000;
""")
query_job = client.query(QUERY)    # Start Query API Request
query_result = query_job.result()  # Get Query Result
weather_df = query_result.to_dataframe()   # Save the Query Resultto Dataframe
# ---------------------------------------------
# ---- Continue Data Analysis with your DF ----
# ---------------------------------------------

In [2]:
weather_df.sample(3).T

Unnamed: 0,98124,92623,47953
episode_id,150231,148199,146388
event_id,918062,894194,879711
state,Alabama,Kansas,New jersey
state_fips_code,1,20,34
event_type,thunderstorm wind,thunderstorm wind,hail
cz_type,C,C,C
cz_fips_code,33,197,25
cz_name,COLBERT,WABAUNSEE,MONMOUTH
wfo,HUN,TOP,PHI
event_begin_time,2020-08-27 15:13:00,2020-05-04 09:01:00,2020-04-21 13:52:00


In [3]:
events = weather_df[['event_begin_time', 'event_end_time']].drop_duplicates()

In [4]:
len(events)

72693

In [5]:
events.event_end_time.max()

Timestamp('2021-08-31 23:59:00')

In [6]:
weather_df.columns

Index(['episode_id', 'event_id', 'state', 'state_fips_code', 'event_type',
       'cz_type', 'cz_fips_code', 'cz_name', 'wfo', 'event_begin_time',
       'event_timezone', 'event_end_time', 'injuries_direct',
       'injuries_indirect', 'deaths_direct', 'deaths_indirect',
       'damage_property', 'damage_crops', 'source', 'magnitude',
       'magnitude_type', 'flood_cause', 'tor_f_scale', 'tor_length',
       'tor_width', 'tor_other_wfo', 'location_index', 'event_range',
       'event_azimuth', 'reference_location', 'event_latitude',
       'event_longitude', 'event_point'],
      dtype='object')

In [7]:
weather_df[['event_type']].drop_duplicates()

Unnamed: 0,event_type
0,hail
5597,heat
6737,flood
8156,sleet
8170,drought
10765,tornado
11738,blizzard
11902,wildfire
12137,avalanche
12161,dense fog


In [8]:
weather_df[['source']].drop_duplicates()

Unnamed: 0,source
0,Public
4,Broadcast Media
5,Social Media
6,Fire Department/Rescue
7,COOP Observer
10,Trained Spotter
14,Emergency Manager
18,CoCoRaHS
37,Storm Chaser
41,Amateur Radio


In [9]:
weather_df[['wfo']].drop_duplicates()

Unnamed: 0,wfo
0,HGX
3,JAN
6,TWC
7,LUB
8,TSA
...,...
12615,EKA
12756,ASO
13991,LOX
14086,BOI


In [10]:
pd.options.mode.chained_assignment = None
# num of severe weather events and their duration by fips
d = weather_df[['event_id', 'event_type', 'event_begin_time', 'event_end_time', 'cz_fips_code', 'state_fips_code', 'event_latitude', 'event_longitude']]
d['event_duration'] = d['event_end_time'] - d['event_begin_time']
d.sample(3).T

Unnamed: 0,50662,74988,65147
event_id,891597,920516,885388
event_type,hail,flash flood,high wind
event_begin_time,2020-06-04 17:00:00,2020-09-10 12:34:00,2020-04-12 10:20:00
event_end_time,2020-06-04 17:00:00,2020-09-10 15:00:00,2020-04-12 15:30:00
cz_fips_code,5,31,47
state_fips_code,56,24,20
event_latitude,44.8,39.0287,
event_longitude,-105.5,-77.0871,
event_duration,0 days 00:00:00,0 days 02:26:00,0 days 05:10:00


In [11]:
d.groupby(['cz_fips_code']).agg({'event_id': 'count', 'event_duration': 'max'})

Unnamed: 0_level_0,event_id,event_duration
cz_fips_code,Unnamed: 1_level_1,Unnamed: 2_level_1
1,1821,30 days 23:59:00
10,488,30 days 23:59:00
100,69,12 days 00:00:00
101,738,30 days 23:59:00
102,117,30 days 23:59:00
...,...,...
95,645,30 days 23:59:00
96,76,30 days 23:59:00
97,653,30 days 23:59:00
98,83,30 days 23:59:00


In [35]:
import geopy
import pandas as pd
from geopy.extra.rate_limiter import RateLimiter
from geopy.exc import GeocoderUnavailable, GeocoderRateLimited
from time import sleep


def get_zipcode(row, lat_lon_field):
    geolocator = geopy.Nominatim(user_agent='cra')
    lat_lon = tuple(row[lat_lon_field].split('_'))
    location = geolocator.reverse(lat_lon)
    raw = location.raw
    if 'address' in raw.keys():
        if 'postcode' in raw['address'].keys():
            return {'postcode': raw['address']['postcode']}
        else:
            return raw['address']
    else:
        return raw


In [13]:
weather_df

Unnamed: 0,episode_id,event_id,state,state_fips_code,event_type,cz_type,cz_fips_code,cz_name,wfo,event_begin_time,...,tor_length,tor_width,tor_other_wfo,location_index,event_range,event_azimuth,reference_location,event_latitude,event_longitude,event_point
0,154212,929077,Texas,48,hail,C,39,BRAZORIA,HGX,2021-01-06 17:31:00,...,,0,,1,1.81,W,MANVEL COYLE ARPT,29.400,-95.4300,POINT(-95.43 29.4)
1,154212,929080,Texas,48,hail,C,167,GALVESTON,HGX,2021-01-06 18:05:00,...,,0,,1,0.69,S,DICKINSON,29.460,-95.0500,POINT(-95.05 29.46)
2,154212,929079,Texas,48,hail,C,167,GALVESTON,HGX,2021-01-06 18:21:00,...,,0,,1,0.69,N,KEMAH,29.540,-95.0200,POINT(-95.02 29.54)
3,154418,930323,Mississippi,28,hail,C,29,COPIAH,JAN,2021-01-25 23:03:00,...,,0,,,,,,,,
4,154418,930324,Mississippi,28,hail,C,101,NEWTON,JAN,2021-01-25 23:45:00,...,,0,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
107810,155390,937267,E pacific,86,marine thunderstorm wind,Z,655,INNER WATERS FROM POINT MUGU TO SAN MATEO POIN...,LOX,2020-12-28 00:30:00,...,,0,,1,45.11,E,SANTA MONICA BASIN BUOY (46025),33.711,-118.2663,POINT(-118.2663 33.711)
107811,151265,913996,Atlantic south,87,marine tropical depression,Z,630,BISCAYNE BAY,MFL,2020-08-01 01:00:00,...,,0,,,,,,,,
107812,152468,918364,Atlantic south,87,marine tropical depression,Z,671,DEERFIELD BEACH TO OCEAN REEF FL 20 TO 60NM,MFL,2020-09-12 00:00:00,...,,0,,,,,,,,
107813,152468,918365,Atlantic south,87,marine tropical depression,Z,670,JUPITER INLET TO DEERFIELD BEACH FL 20 TO 60NM,MFL,2020-09-12 00:00:00,...,,0,,,,,,,,


## Convert lat long to zip codes

In [14]:
# Coalesce weather df lon lat into one column
weather_df['lat_lon'] = weather_df['event_latitude'].astype(str) + '_' +  weather_df['event_longitude'].astype(str)

In [15]:
nn_weather_df = weather_df[(~weather_df.lat_lon.isna()) & (weather_df.lat_lon != 'nan_nan')][[
    'event_id', 'lat_lon'
]]

In [16]:
nn_weather_df

Unnamed: 0,event_id,lat_lon
0,929077,29.4_-95.43
1,929080,29.46_-95.05
2,929079,29.54_-95.02
7,930415,33.0356_-102.0328
8,931477,36.8881_-95.2697
...,...,...
107806,935268,24.85_-80.62
107807,935269,24.7263_-81.0477
107808,935270,24.7263_-81.0477
107809,937266,34.0088_-118.4999


In [36]:
nn_weather_df.iloc[1]

event_id          929080
lat_lon     29.46_-95.05
Name: 1, dtype: object

In [37]:
get_zipcode(nn_weather_df.iloc[1], 'lat_lon')

{'postcode': '77539'}

In [38]:
len(nn_weather_df)

55616

In [None]:
from tqdm import tqdm
from time import sleep

with tqdm(total=len(nn_weather_df)) as pbar:
    for row in nn_weather_df.iterrows():
        pbar.set_description(f"{row[1]['lat_lon']}")
        zc = get_zipcode(row[1], lat_lon_field='lat_lon')
        sleep(5)
        pbar.set_description(f'{zc}')
        pbar.update()

44.74_-93.13:   0%|          | 37/55616 [03:15<81:37:52,  5.29s/it]                                                                                                                                         

In [24]:
result = nn_weather_df.apply(lambda row: get_zipcode(row, lat_lon_field='lat_lon'), axis=1)

In [25]:
result

In [33]:
# from dask.distributed import Client, LocalCluster
#
# cluster = LocalCluster(
#     n_workers=16,
#     processes=True,
# )
# client = Client(cluster)
# client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 54715 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:54715/status,

0,1
Dashboard: http://127.0.0.1:54715/status,Workers: 16
Total threads: 16,Total memory: 64.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:54716,Workers: 16
Dashboard: http://127.0.0.1:54715/status,Total threads: 16
Started: Just now,Total memory: 64.00 GiB

0,1
Comm: tcp://127.0.0.1:54777,Total threads: 1
Dashboard: http://127.0.0.1:54778/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:54723,
Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-bf4fsged,Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-bf4fsged

0,1
Comm: tcp://127.0.0.1:54773,Total threads: 1
Dashboard: http://127.0.0.1:54775/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:54726,
Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-sv1tbve9,Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-sv1tbve9

0,1
Comm: tcp://127.0.0.1:54780,Total threads: 1
Dashboard: http://127.0.0.1:54781/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:54729,
Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-glfhxzsg,Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-glfhxzsg

0,1
Comm: tcp://127.0.0.1:54796,Total threads: 1
Dashboard: http://127.0.0.1:54798/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:54728,
Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-9c4sxqlu,Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-9c4sxqlu

0,1
Comm: tcp://127.0.0.1:54795,Total threads: 1
Dashboard: http://127.0.0.1:54797/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:54734,
Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-0s5v3sg3,Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-0s5v3sg3

0,1
Comm: tcp://127.0.0.1:54756,Total threads: 1
Dashboard: http://127.0.0.1:54757/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:54721,
Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-cvv54f16,Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-cvv54f16

0,1
Comm: tcp://127.0.0.1:54786,Total threads: 1
Dashboard: http://127.0.0.1:54787/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:54731,
Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-1zi89pjt,Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-1zi89pjt

0,1
Comm: tcp://127.0.0.1:54753,Total threads: 1
Dashboard: http://127.0.0.1:54754/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:54719,
Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-ucdmydqy,Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-ucdmydqy

0,1
Comm: tcp://127.0.0.1:54768,Total threads: 1
Dashboard: http://127.0.0.1:54769/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:54725,
Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-163bn4h6,Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-163bn4h6

0,1
Comm: tcp://127.0.0.1:54790,Total threads: 1
Dashboard: http://127.0.0.1:54793/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:54733,
Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-apmbuxmv,Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-apmbuxmv

0,1
Comm: tcp://127.0.0.1:54762,Total threads: 1
Dashboard: http://127.0.0.1:54763/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:54722,
Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-ncylp2np,Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-ncylp2np

0,1
Comm: tcp://127.0.0.1:54783,Total threads: 1
Dashboard: http://127.0.0.1:54784/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:54730,
Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-shmw9u85,Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-shmw9u85

0,1
Comm: tcp://127.0.0.1:54765,Total threads: 1
Dashboard: http://127.0.0.1:54766/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:54724,
Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-u7k98qom,Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-u7k98qom

0,1
Comm: tcp://127.0.0.1:54771,Total threads: 1
Dashboard: http://127.0.0.1:54772/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:54727,
Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-4zmjdt8k,Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-4zmjdt8k

0,1
Comm: tcp://127.0.0.1:54789,Total threads: 1
Dashboard: http://127.0.0.1:54791/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:54732,
Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-tj02uqjw,Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-tj02uqjw

0,1
Comm: tcp://127.0.0.1:54759,Total threads: 1
Dashboard: http://127.0.0.1:54760/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:54720,
Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-tn2_6tmm,Local directory: /Users/donald/DataspellProjects/cra/notebooks/2021-11-23-severe-weather/dask-worker-space/worker-tn2_6tmm


In [34]:
# import dask.dataframe as dd

In [35]:
# ddf = dd.from_pandas(nn_weather_df, npartitions=16)

In [36]:
# ddf.head()

Unnamed: 0,event_id,lat_lon
0,929077,29.4_-95.43
1,929080,29.46_-95.05
2,929079,29.54_-95.02
7,930415,33.0356_-102.0328
8,931477,36.8881_-95.2697


In [40]:
# result = ddf.apply(lambda row: get_zipcode(row, lat_lon_field='lat_lon'), axis=1, meta=('lat_lon', 'O'))

In [41]:
# r = result.compute()

Function:  subgraph_callable-4fed667b-67b7-49c0-967f-ed9e9790
args:      (      event_id            lat_lon
69920   929782     37.256_-79.669
69921   926983    19.929_-155.723
69922   926995  19.9445_-155.2556
69923   926996  19.7491_-155.1442
69924   930687   37.2377_-77.2263
...        ...                ...
85664   870833       30.23_-81.71
85665   870729       30.08_-82.45
85666   870837       30.24_-82.14
85667   870738       30.64_-81.59
85668   870747       29.29_-82.11

[3476 rows x 2 columns])
kwargs:    {}
Exception: 'GeocoderUnavailable(\'HTTPSConnectionPool(host=\\\'nominatim.openstreetmap.org\\\', port=443): Max retries exceeded with url: /reverse?lat=37.256&lon=-79.669&format=json&addressdetails=1 (Caused by ReadTimeoutError("HTTPSConnectionPool(host=\\\'nominatim.openstreetmap.org\\\', port=443): Read timed out. (read timeout=1)"))\')'

Function:  subgraph_callable-4fed667b-67b7-49c0-967f-ed9e9790
args:      (      event_id           lat_lon
90216   889928      30.55_-84

GeocoderUnavailable: HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Max retries exceeded with url: /reverse?lat=37.256&lon=-79.669&format=json&addressdetails=1 (Caused by ReadTimeoutError("HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Read timed out. (read timeout=1)"))