## ETL Data pipeline

In [697]:
# import dependencies 
import requests 
import pandas as pd
import numpy as np
import time 
from datetime import datetime
from sqlalchemy import create_engine
# psycopg2 works in tandem with sqlalchemy 
import psycopg2
import config1

### Extraction 

In [723]:
url = "https://data.austintexas.gov/resource/fdj4-gpfu.json?$limit=1000&$where=occ_date between '2018-01-01T00:00:00.000' and '2020-12-31T00:00:00.000'"

In [435]:
# optional parameter for aditonal api parameters/filters
def extract_data(api_endpoint,parameters=None): 
    data_request = requests.get(url=api_endpoint, params=parameters)
    
    data_df = pd.DataFrame.from_records(data_request.json())
    print(pd.DataFrame({'Rows Retrieved':{0:len(data_df)}}))
    
    return data_df

# print no of rows retrieved 

### Transform   

In [724]:
data_df = extract_data(url)

   Rows Retrieved
0            1000


In [725]:
data_df = clean_data(data_df)

   Rows Dropped
0            28


In [453]:
def clean_data(data_df):
    
    #drop columns 
    drop_columns = ['location', 'x_coordinate', 'y_coordinate',
                    'occ_date', 'occ_time','rep_date','rep_time','category_description', 'address',
                    'ucr_category','census_tract','sector','pra','council_district']
    
    data_df.drop(drop_columns, axis=1, inplace=True)
    
    #drop null values 
    # drop rows with missing data
    column_checklist = ['incident_report_number', 'crime_type', 'ucr_code', 'family_violence',
                       'occ_date_time', 'rep_date_time', 'location_type', 'zip_code',
                         'latitude', 'longitude','district']
    row_counter = 0
    for i in column_checklist:
        if data_df[i].isnull().sum() > 0:
            row_counter += data_df[i].isnull().sum()
            data_df.drop(data_df[data_df[i].isnull()].index, inplace=True)
    print(pd.DataFrame({'Rows Dropped':{0:row_counter}}))
        
    
          
    #fill in clearance data with placeholders 
    data_df['clearance_date'].fillna(value='0000-00-00T00:00:00.000',inplace=True)
    data_df['clearance_status'].fillna(value='N', inplace=True)
        
    #clean and convert datetiem columns 
    data_df['occ_date_time'] = data_df['occ_date_time'].apply(lambda x: x.replace('T',' '))
    data_df['rep_date_time'] = data_df['rep_date_time'].apply(lambda x: x.replace('T',' '))
    data_df['clearance_date'] = data_df['clearance_date'].apply(lambda x: x.replace('T',' '))
    
    #rename columns 
    column_names = {'ucr_code':'offense_code','occ_date_time':'occurred_date',
                    'rep_date_time':'reported_date','crime_type':'offense_type'}
    
    data_df.rename(columns=column_names, inplace=True)
    
    return data_df
    


In [726]:
crime_test_df, location_test_df, offense_test_df = create_tables(data_df)

In [727]:
crime_test_df.head()
location_test_df.head()
offense_test_df.head()

Unnamed: 0,incident_report_number,offense_code,family_violence,occurred_date,reported_date,zip_code,council_district,district,latitude,longitude,clearance_status,clearance_date,location_code
0,201811444,3400,N,2018-01-01 16:58:00.000,2018-01-01 16:58:00.000,78745,5,1,30.19737037,-97.80960579,N,0000-00-00 00:00:00.000,0
1,20195024716,2717,N,2018-01-01 14:00:00.000,2019-06-17 10:20:00.000,78747,5,7,30.14716858,-97.77960983,N,2019-06-18 00:00:00.000,1
2,201811667,300,N,2018-01-01 19:47:00.000,2018-01-01 19:47:00.000,78753,4,1,30.36582905,-97.69453919,C,2018-01-03 00:00:00.000,2
3,201810550,1803,N,2018-01-01 02:51:00.000,2018-01-01 06:49:00.000,78744,2,6,30.19247824,-97.73132061,N,0000-00-00 00:00:00.000,0
4,20205045035,4022,N,2018-01-01 12:00:00.000,2020-11-06 10:37:00.000,78758,4,4,30.38646396,-97.69903136,N,2020-11-06 00:00:00.000,1


Unnamed: 0,location_code,location_type
0,0,STREETS / HWY / ROAD / ALLEY
1,1,RESIDENCE / HOME
2,2,DEPARTMENT / DISCOUNT STORE
7,3,OTHER / UNKNOWN
9,4,PARKING LOTS / GARAGE


Unnamed: 0,offense_code,offense_type
0,3400,FAMILY DISTURBANCE
1,2717,CRUELTY TO ANIMALS
2,300,AGG ROBBERY/DEADLY WEAPON
3,1803,POSSESSION OF MARIJUANA
4,4022,IDENTITY THEFT


In [657]:
def create_tables(cleaned_data_df):
     
    #offense_type_table 
    offense_df = cleaned_data_df[['offense_code','offense_type']].copy()
    offense_df.drop_duplicates(subset='offense_code',inplace=True)
    
    #create incident_location_table 
    # double brackets needed to create series, works like "to_frame" but is inplace 
    location_df = cleaned_data_df[['location_type']].copy()
    location_df.drop_duplicates(inplace=True)
    location_df['location_code'] = np.arange(len(location_df))
    
    #create mapping for location_code column
    location_map_df = location_df.copy(deep=True)
    location_map_df.set_index('location_type', inplace=True) 
    location_mapper = location_map_df.to_dict()['location_code']
        
    
    # rearrange location_df
    location_df = location_df[['location_code','location_type']]
    
    #crime_incidents_table 
    #create encoded location_code column
    crime_incident_df = cleaned_data_df.copy()
    crime_incident_df['location_code'] = crime_incident_df['location_type'] \
        .apply(lambda x: location_mapper[x])
    
    #drop repetitive offense and location columns 
    drop_column_2 = ['offense_type','location_type']
    crime_incident_df.drop(drop_column_2,axis=1, inplace=True)
    
    
    return crime_incident_df, location_df, offense_df 


### Load Data 

In [729]:
def load_data_0(crime_test_df,location_test_df,offense_test_df):
    #setup database connection 
    database = f"postgres://{config.db_user}:{config1.db_password}@localhost:5432/austin_crime"
    engine = create_engine(database)
    
    
    #import large crime table in chuncksize to track progess 
    rows_imported = 0
    # get the start_time from time.time()
    start_time = time.time()
    for data in crime_test_df.to_sql(name='crime_incidents', con=engine,
                                        if_exists='replace', chunksize=100):
        print(f'importing rows {rows_imported} to {rows_imported + len(data)}...', end='')
        rows_imported += len(data)
        
        # add elapsed time to final print out
        print(f'Done. {time.time() - start_time} total seconds elapsed')
        
    
    location_test_df.to_sql(name='incident_location', con=engine, if_exists='replace')
    offense_test_df.to_sql(name='offense_type', con=engine, if_exists='replace')
    
    

In [758]:
def load_data(crime_test_df,location_test_df,offense_test_df):
    #setup database connection 
    database = f"postgres://{config.db_user}:{config1.db_password}@localhost:5432/austin_crime"
    engine = create_engine(database)
    
    print('loading crime_incident table')
    start_time = time.time()
    crime_test_df.to_sql(name='crime_incidents', con=engine,
                                        if_exists='replace', chunksize=100000)
    print(f'{time.time() - start_time} seconds to load crime table \n')
    
    print('loading incident_location table \n')
    location_test_df.to_sql(name='incident_location', con=engine, if_exists='replace')
    
    print('loading offense_type table')
    offense_test_df.to_sql(name='offense_type', con=engine, if_exists='replace')
    
    

In [759]:
load_data(crime_test_df,location_test_df,offense_test_df)

loading crime_incident table
0.3630547523498535 seconds to load crime table 

loading incident_location table 

loading offense_type table
