In [11]:
# run in engineering environment
import os
import pandas as pd
import numpy as np
import datetime
from google.cloud import bigquery
import geopandas
from shapely import wkt

## Testing pipeline

Import data for California only

In [2]:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="/Users/amykim/Documents/Metis/Course_3_Engineering/Project/BigQuery/data-engineering-345807-a02ba30a47dd.json"
client = bigquery.Client()

QUERY = ('''

    SELECT * FROM `bigquery-public-data.utility_us.zipcode_area` z,`bigquery-public-data.noaa_lightning.lightning_*` l 
        WHERE ST_CONTAINS(ST_GeogFromText(z.zipcode_geom) , l.center_point_geom) and state_name = 'California'

    ''')

query_job = client.query(QUERY)  # API request
query_result = query_job.result() 
df= query_result.to_dataframe()

#1m 0.3s

Write to pickle

In [3]:
df.to_pickle('pickles/lightning_CA')

Write to csv

In [5]:
df=pd.read_pickle('pickles/lightning_CA')
df.shape
df.to_csv('lightning_CA.csv')

#1m 15.6s

### Import to SQLite using SQL Alchemy
Pipeline starts from here?

In [2]:
from sqlalchemy import create_engine

In [None]:
df=pd.read_csv('lightning_CA.csv')

engine = create_engine('sqlite:///lightning.db', echo=True)
sqlite_connection = engine.connect()
sqlite_table = "lightning_CA"
df.to_sql(sqlite_table,sqlite_connection,if_exists='replace')

#

In [3]:
from sqlalchemy import create_engine
import pandas as pd

def create_df():
    engine= create_engine('sqlite:///lightning.db')
    df=pd.read_sql(
    '''
    SELECT date, 
    zipcode, 
    city, 
    county,
    state_name as state, 
    number_of_strikes as count_lightning, 
    center_point_geom as center_point
    FROM lightning_CA
    '''
    ,engine)

    return df


In [4]:
df=create_df()
#1m 39.1s

Clean data

In [5]:
def clean_dates(df):    
    df['date']=pd.to_datetime(df['date'])
    df['year']=pd.DatetimeIndex(df['date']).year
    df['month']=pd.DatetimeIndex(df['date']).month
    return df


In [6]:
df=clean_dates(df)

In [8]:
df.city.unique()
df.groupby(["center_point","year",]).sum("count_lightning").reset_index()

Unnamed: 0,center_point,year,zipcode,count_lightning,month
0,POINT(-114.2 34.3),1987,369068,10,38
1,POINT(-114.2 34.3),1988,922670,28,73
2,POINT(-114.2 34.3),1989,276801,4,23
3,POINT(-114.2 34.3),1990,1568539,53,123
4,POINT(-114.2 34.3),1991,1107204,40,97
...,...,...,...,...,...
75050,POINT(-124.3 40.6),2014,95536,9,9
75051,POINT(-124.3 40.6),2015,191072,2,24
75052,POINT(-124.3 40.6),2016,95536,3,4
75053,POINT(-124.3 40.6),2017,382144,11,13


In [9]:
def clean_geo(df):
    df['center_point'] = geopandas.GeoSeries.from_wkt(df['center_point'])
    gdf = geopandas.GeoDataFrame(df,geometry='center_point')
    gdf['lon'] = gdf['center_point'].x
    gdf['lat'] = gdf['center_point'].y
    return gdf

In [12]:
gdf= clean_geo(df)
#24s

In [11]:
gdf

Unnamed: 0,date,zipcode,city,county,state,count_lightning,center_point,year,month,lon,lat
0,2020-03-08,96021,"Corning city, Rancho Tehama Reserve CDP, Richf...",Tehama County,California,8,POINT (-122.20000 39.90000),2020,3,-122.2,39.9
1,2020-08-11,96119,Madeline,Lassen,California,8,POINT (-120.60000 41.00000),2020,8,-120.6,41.0
2,2020-08-16,94552,"Union City city, Hayward city, Sunol CDP, Fair...",Alameda County,California,8,POINT (-122.10000 37.80000),2020,8,-122.1,37.8
3,2020-08-16,94560,Newark city,Alameda County,California,8,POINT (-122.00000 37.50000),2020,8,-122.0,37.5
4,2020-08-17,96025,Dunsmuir city,Siskiyou County,California,8,POINT (-122.40000 41.20000),2020,8,-122.4,41.2
...,...,...,...,...,...,...,...,...,...,...,...
283460,1998-07-26,96132,Termo,Lassen,California,4,POINT (-120.00000 41.00000),1998,7,-120.0,41.0
283461,2003-09-05,96132,Termo,Lassen,California,8,POINT (-120.00000 40.90000),2003,9,-120.0,40.9
283462,2003-07-24,96132,Termo,Lassen,California,14,POINT (-120.40000 40.90000),2003,7,-120.4,40.9
283463,1992-08-13,96132,Termo,Lassen,California,5,POINT (-120.70000 41.00000),1992,8,-120.7,41.0


In [87]:
def zipcode_summary(gdf):
    summary=gdf.groupby(['zipcode','city','county','year'])['count_lightning'].sum().reset_index()
    return summary

In [89]:
summary=zipcode_summary(gdf)
summary

Unnamed: 0,zipcode,city,county,year,count_lightning
0,90027,Los Angeles city,Los Angeles County,1987,6
1,90027,Los Angeles city,Los Angeles County,1988,7
2,90027,Los Angeles city,Los Angeles County,1990,6
3,90027,Los Angeles city,Los Angeles County,1991,4
4,90027,Los Angeles city,Los Angeles County,1992,3
...,...,...,...,...,...
25027,96161,"Kingvale CDP, Truckee town","Nevada County, Placer County, Nevada County",2015,384
25028,96161,"Kingvale CDP, Truckee town","Nevada County, Placer County, Nevada County",2016,46
25029,96161,"Kingvale CDP, Truckee town","Nevada County, Placer County, Nevada County",2017,207
25030,96161,"Kingvale CDP, Truckee town","Nevada County, Placer County, Nevada County",2018,93


In [90]:
summary[summary['zipcode']==92364]

Unnamed: 0,zipcode,city,county,year,count_lightning
4366,92364,Nipton,San Bernardino,1987,283
4367,92364,Nipton,San Bernardino,1988,3287
4368,92364,Nipton,San Bernardino,1989,324
4369,92364,Nipton,San Bernardino,1990,1045
4370,92364,Nipton,San Bernardino,1991,844
4371,92364,Nipton,San Bernardino,1992,846
4372,92364,Nipton,San Bernardino,1993,485
4373,92364,Nipton,San Bernardino,1994,1076
4374,92364,Nipton,San Bernardino,1995,391
4375,92364,Nipton,San Bernardino,1996,1230


In [75]:
gdf[(gdf['zipcode']==92364) & (gdf['year']==1988)]['count_lightning'].sum()

3287

In [67]:
stats1 =(summary2
        .groupby(['zipcode','city','county'])['count_lightning']
        .agg(['mean','std','min','max'])
        .sort_values(['mean'], ascending = False)
        .reset_index())
stats1['rank'] =stats1['mean'].rank(ascending=False).astype(int)
stats1=stats1.round({"mean":2, "std":2})
stats1

Unnamed: 0,zipcode,city,county,mean,std,min,max,rank
0,92364,Nipton,San Bernardino,1510.12,939.62,40,4193,1
1,92332,Essex,San Bernardino,1389.27,807.88,60,3996,2
2,93634,Big Creek CDP,Fresno County,1213.27,600.28,144,2748,3
3,92277,Twentynine Palms city,San Bernardino County,1178.32,864.35,1,4025,4
4,93514,"Mesa CDP, Round Valley CDP, Bishop city, Swall...","Inyo County, Mono County",782.82,389.70,149,1742,5
...,...,...,...,...,...,...,...,...
889,95564,Samoa CDP,Humboldt County,2.24,1.48,1,7,890
890,95125,"San Jose city, Campbell city",Santa Clara County,2.12,1.45,1,6,891
891,95129,"San Jose city, Saratoga city",Santa Clara County,2.11,1.76,1,6,892
892,95120,San Jose city,Santa Clara County,2.05,1.15,1,4,893


In [132]:
def zipcode_summary_time(gdf):
    graph_data=gdf.groupby(["zipcode","date"])['count_lightning'].sum().reset_index()
    graph_data['zipcode'] = graph_data['zipcode'].astype(str)
    
    totals=pd.DataFrame(graph_data.groupby(['date'])['count_lightning'].sum().reset_index())
    totals.insert(0,'zipcode','Total')
    
    graph_data=graph_data.append(totals,ignore_index=True)
    return graph_data

In [133]:
graph_data=zipcode_summary_time(gdf)

  graph_data=graph_data.append(totals,ignore_index=True)


In [134]:
graph_data

Unnamed: 0,zipcode,date,count_lightning
0,90027,1987-09-24,1
1,90027,1987-11-04,2
2,90027,1987-11-05,2
3,90027,1987-12-29,1
4,90027,1988-04-21,5
...,...,...,...
133412,Total,2020-11-19,21
133413,Total,2020-12-13,1
133414,Total,2020-12-26,13
133415,Total,2020-12-28,89


In [94]:
def yearly_stats(summary): #yearly stats
    stats1=(summary
        .groupby(['zipcode','city','county'])['count_lightning']
        .agg(['mean','std','min','max'])
        .sort_values(['mean'], ascending = False)
        .reset_index())
    stats1['rank'] =stats1['mean'].rank(ascending=False).astype(int)
    return stats1

In [96]:
stats1=yearly_stats(summary)
stats1

Unnamed: 0,zipcode,city,county,mean,std,min,max,rank
0,92364,Nipton,San Bernardino,1510.121212,939.616669,40,4193,1
1,92332,Essex,San Bernardino,1389.272727,807.882350,60,3996,2
2,93634,Big Creek CDP,Fresno County,1213.272727,600.282135,144,2748,3
3,92277,Twentynine Palms city,San Bernardino County,1178.323529,864.349211,1,4025,4
4,93514,"Mesa CDP, Round Valley CDP, Bishop city, Swall...","Inyo County, Mono County",782.818182,389.698478,149,1742,5
...,...,...,...,...,...,...,...,...
889,95564,Samoa CDP,Humboldt County,2.235294,1.480262,1,7,890
890,95125,"San Jose city, Campbell city",Santa Clara County,2.117647,1.452685,1,6,891
891,95129,"San Jose city, Saratoga city",Santa Clara County,2.105263,1.760516,1,6,892
892,95120,San Jose city,Santa Clara County,2.050000,1.145931,1,4,893


In [102]:
def overall_stats(summary):
    stats2=pd.DataFrame((summary['count_lightning']
            .agg(['mean','std','min','max'])).reset_index())
    return stats2

In [103]:
stats2=pd.DataFrame((summary['count_lightning']
            .agg(['mean','std','min','max'])).reset_index())

In [104]:
stats2=overall_stats(summary)
stats2

Unnamed: 0,index,count_lightning
0,mean,51.306608
1,std,154.942108
2,min,1.0
3,max,4193.0


In [110]:
stats2['count_lightning'][3]

51.306607542345795

In [25]:
def write_to_csv(summary):
    summary.to_csv('lightning].csv', index=False)
    return

## Testing Streamlit 

In [31]:
import streamlit as st

In [48]:
def load_data(path):
    df = pd.read_csv(path)
    return df

stats=load_data('stats.csv')

In [42]:
rank_to_filter = st.slider('rank', 1,100,(1,100)) 
rank_to_filter
rank_to_filter[1]


100

In [61]:
filtered_data = stats1[stats1['rank'].between(rank_to_filter[0],rank_to_filter[1])]
filtered_data[['rank','zipcode','county','mean','std','min','max']]
write=st.write(filtered_data[['rank','zipcode','county','mean','std','min','max']]
)

In [None]:
st.subheader(f'Top {rank_to_filter}')

st.map(filtered_data)

## Testing unit tests

In [145]:
from unittest import TestCase
import unittest
import datetime
import pandas as pd
from summaries import zipcode_summary, zipcode_summary_time, yearly_stats, overall_stats


In [184]:
class TestSummaries (unittest.TestCase):

    def test_zipcode_summary(self):
        input_df=pd.DataFrame([
        {'zipcode':12345,'city':'ABC','county':'County1','year':'1234','count_lightning':3},
        {'zipcode':12345,'city':'LAC','county':'County2','year':'1234','count_lightning':3},
        {'zipcode':12345,'city':'LAC','county':'County2','year':'1234','count_lightning':5},
        {'zipcode':12345,'city':'DEF','county':'County3','year':'1234','count_lightning':3}
        ])

        expected_output_df = pd.DataFrame([
        {'zipcode':12345,'city':'ABC','county':'County1','year':'1234','count_lightning':3},
        {'zipcode':12345,'city':'DEF','county':'County3','year':'1234','count_lightning':3},
        {'zipcode':12345,'city':'LAC','county':'County2','year':'1234','count_lightning':8}
        ])

        actual_output_df= zipcode_summary(input_df)
        self.assertEqual(actual_output_df.shape,expected_output_df.shape,msg='Equal')

    def test_zipcode_summary_time(self):
        input_df=pd.DataFrame([
        {'zipcode':12345,'date':datetime.datetime(1,2,3),'count_lightning':3},
        {'zipcode':23456,'date':datetime.datetime(1,2,3),'count_lightning':3},
        {'zipcode':12345,'date':datetime.datetime(1,2,3),'count_lightning':4}
        ])
        expected_output_df = pd.DataFrame([
        {'zipcode':12345,'date':datetime.datetime(1,2,3),'count_lightning':7},
        {'zipcode':23456,'date':datetime.datetime(1,2,3),'count_lightning':3},
        {'zipcode':'Total','date':datetime.datetime(1,2,3),'count_lightning':10}
        ])
        actual_output_df= zipcode_summary_time(input_df)
        self.assertEqual(actual_output_df.shape,expected_output_df.shape,msg='Equal')


In [185]:
unittest.main(argv=['first-is-ignored'],exit =False)

  graph_data=graph_data.append(totals,ignore_index=True)
.
----------------------------------------------------------------------
Ran 2 tests in 0.069s

OK


<unittest.main.TestProgram at 0x7fe8de127910>

In [160]:
input_df=pd.DataFrame([
{'zipcode':12345,'city':'ABC','county':'County1','year':'1234','count_lightning':3},
{'zipcode':12345,'city':'LAC','county':'County2','year':'1234','count_lightning':3},
{'zipcode':12345,'city':'LAC','county':'County2','year':'1234','count_lightning':5},
{'zipcode':12345,'city':'DEF','county':'County3','year':'1234','count_lightning':3}
])
expected_output_df = pd.DataFrame([
{'zipcode':12345,'city':'ABC','county':'County1','year':'1234','count_lightning':3},
{'zipcode':12345,'city':'DEF','county':'County3','year':'1234','count_lightning':3},
{'zipcode':12345,'city':'LAC','county':'County2','year':'1234','count_lightning':8}

])


In [161]:
input_df

Unnamed: 0,zipcode,city,county,year,count_lightning
0,12345,ABC,County1,1234,3
1,12345,LAC,County2,1234,3
2,12345,LAC,County2,1234,5
3,12345,DEF,County3,1234,3


In [162]:
expected_output_df

Unnamed: 0,zipcode,city,county,year,count_lightning
0,12345,ABC,County1,1234,3
1,12345,DEF,County3,1234,3
2,12345,LAC,County2,1234,8


In [163]:
actual_output_df= zipcode_summary(input_df)
actual_output_df

Unnamed: 0,zipcode,city,county,year,count_lightning
0,12345,ABC,County1,1234,3
1,12345,DEF,County3,1234,3
2,12345,LAC,County2,1234,8


In [None]:
actual_output_df= zipcode_summary(input_df)
#assert actual_output_df.shape==expected_output_df.shape


In [165]:
self.assertEqual(actual_output_df,expected_output_df,msg='Equal')

NameError: name 'self' is not defined

In [178]:
input_df=pd.DataFrame([
{'zipcode':12345,'date':datetime.datetime(1,2,3),'count_lightning':3},
{'zipcode':23456,'date':datetime.datetime(1,2,3),'count_lightning':3},
{'zipcode':12345,'date':datetime.datetime(1,2,3),'count_lightning':4}
])
expected_output_df = pd.DataFrame([
{'zipcode':12345,'date':datetime.datetime(1,2,3),'count_lightning':7},
{'zipcode':23456,'date':datetime.datetime(1,2,3),'count_lightning':3},
])

In [177]:
datetime.datetime(1,2,3)

datetime.datetime(1, 2, 3, 0, 0)

In [183]:
zipcode_summary_time(input_df)

  graph_data=graph_data.append(totals,ignore_index=True)


Unnamed: 0,zipcode,date,count_lightning
0,12345,0001-02-03 00:00:00,7
1,23456,0001-02-03 00:00:00,3
2,Total,0001-02-03 00:00:00,10
