## SQL Ingestion Pipeline Documentation

Data is populated into the database using the `sqlIngest` package. `sqlIngest` is comprised of a `DataHandler` class with various methods for initializing, importing, and updating data in the SQL database.

### DataHandler

In [1]:
# Adding to python path for one-off relative import
import pandas as pd
import sys
sys.path.append('../server/src/services/')
from sqlIngest import DataHandler

The `DataHandler` object is the central tool in `sqlIngest`. It can be used to load data from Socrata or CSV sources, perform formatting, and write that data back to CSV or into a database instance. It has the following dependencies:

- `pandas` for data handling and manipulation
- `sqlAlchemy` for database operations
- `sodapy` for Socrata operations

In [2]:
# Initialize instance of DataHandler
loader = DataHandler()

It is first necessary to load configuration information from `settings.cfg`. In this tutorial `settings.example.cfg` is specified, but you will need to modify this file and resave it as `settings.cfg`. Include the database connection string and user token for your machine and account respectively.

In [3]:
# Load configuration file
loader.loadConfig(configFilePath='../server/src/settings.example.cfg')
# This initializes several values
print('file path:\t\t%s' % loader.configFilePath)
print('database string:\t%s' % loader.dbString)
print('socrata token:\t\t%s' % loader.token)

Loading config file ../server/src/settings.example.cfg
file path:		../server/src/settings.example.cfg
database string:	postgres://REDACTED:REDACTED@localhost:5432/postgres
socrata token:		None


We are able to fetch data from the City of Los Angeles Socrata data stores by specifying the year we are interested in. Data can be fetched either in small increments specified by paging over the database in multiple queries or fetched as full-year chunks. The recommended page size for Socrata is 1000 entries, but larger page sizes are allowed. Speed will be relative to page size and query size. 

__NOTE:__ For unknown reasons, 2015 data struggles with timeouts during paging.

In [4]:
# Fetch partial dataset from Socrata
# (Need Socrata API key for significant number of queries)
loader.fetchSocrata(year=2019, querySize=1000, pageSize=1000)
loader.data.head()



Retrieving partial Socrata query...
1000 records retrieved in 0.02 minutes


Unnamed: 0,location,zipcode,suffix,srnumber,updateddate,closeddate,cd,address,createdbyuserorganization,createddate,...,status,tbmrow,direction,anonymous,addressverified,actiontaken,streetname,ncname,apc,tbmpage
0,"{'latitude': '34.0141771575', 'needs_recoding'...",90023,ST,1-1515100608,2020-01-25T13:44:49.000,2020-01-25T13:40:56.000,14,"3656 E NOAKES ST, 90023",Self Service,2019-12-17T12:59:11.000,...,Closed,2,E,Y,Y,SR Created,NOAKES,BOYLE HEIGHTS NC,East Los Angeles APC,675
1,"{'latitude': '34.1580125971', 'needs_recoding'...",91403,BLVD,1-1515695853,2020-01-25T13:03:42.000,2020-01-25T12:59:48.000,4,"4812 N SEPULVEDA BLVD, 91403",Self Service,2019-12-18T08:59:09.000,...,Closed,3,N,N,Y,SR Created,SEPULVEDA,SHERMAN OAKS NC,South Valley APC,561
2,"{'latitude': '34.1522072867', 'needs_recoding'...",91602,AVE,1-1522232521,2020-01-25T13:02:42.000,2020-01-25T13:00:03.000,4,"4500 N PLACIDIA AVE, 91602",ITA,2019-12-30T10:21:52.000,...,Closed,4,N,N,Y,SR Created,PLACIDIA,GREATER TOLUCA LAKE NC,South Valley APC,563
3,"{'latitude': '34.151094412', 'needs_recoding':...",91423,AVE,1-1516064971,2020-01-25T13:02:30.000,2020-01-25T12:59:55.000,4,"4427 N MURIETTA AVE, 91423",BOS,2019-12-18T13:47:54.000,...,Closed,4,N,N,Y,SR Created,MURIETTA,SHERMAN OAKS NC,South Valley APC,562
4,"{'latitude': '34.1677902364', 'needs_recoding'...",91401,AVE,1-1516920141,2020-01-25T13:01:02.000,2020-01-25T12:57:41.000,4,"5340 N WOODMAN AVE, 91401",BOS,2019-12-19T15:12:01.000,...,Closed,2,N,N,Y,SR Created,WOODMAN,SHERMAN OAKS NC,South Valley APC,562


In [5]:
# Fetch full dataset from Socrata (slow)
loader.fetchSocrataFull(year=2015)
loader.data.head()



Downloading 2015 data from Socrata data source...
	Download Complete: 0.7 minutes


Unnamed: 0,location,zipcode,suffix,:@computed_region_qz3q_ghft,:@computed_region_k96s_3jcv,srnumber,:@computed_region_kqwf_mjcx,updateddate,closeddate,cd,...,direction,anonymous,addressverified,actiontaken,streetname,ncname,apc,tbmpage,servicedate,mobileos
0,"{'latitude': '34.2478180903', 'needs_recoding'...",91042,BLVD,3222,4,1-88226601,1,2016-01-02T14:29:52.000,2016-01-02T14:29:52.000,7,...,W,N,Y,SR Created,FOOTHILL,SUNLAND-TUJUNGA NC,North Valley APC,504,,
1,"{'latitude': '33.9789717768', 'needs_recoding'...",90003,ST,22352,787,1-88226481,13,2015-12-31T20:21:03.000,,9,...,W,N,Y,SR Created,67TH,COMMUNITY AND NEIGHBORS FOR NINTH DISTRICT UNI...,South Los Angeles APC,674,,
2,"{'latitude': '34.1509935991', 'needs_recoding'...",91423,ST,19736,333,1-88226431,7,2016-01-06T09:38:51.000,2016-01-06T09:38:51.000,4,...,W,N,Y,SR Created,MOORPARK,SHERMAN OAKS NC,South Valley APC,562,2016-01-06T00:00:00.000,
3,"{'latitude': '34.0459242166', 'needs_recoding'...",90035,,23666,875,1-88214281,6,2016-01-02T09:04:07.000,2016-01-02T09:04:07.000,5,...,W,N,Y,SR Created,CRESTA,SOUTH ROBERTSON NC,West Los Angeles APC,632,2016-01-02T00:00:00.000,
4,"{'latitude': '34.0459242166', 'needs_recoding'...",90035,DR,23666,875,1-88213271,6,2016-01-02T09:02:10.000,2016-01-02T09:02:11.000,5,...,W,N,Y,SR Created,CRESTA,SOUTH ROBERTSON NC,West Los Angeles APC,632,2016-01-02T00:00:00.000,


Once we have imported the data, we need to perform a cleaning step in order to standardize it for input into the database. This removes columns that we are not interested in tracking and makes sure that data types are correctly formatted for the SQL import.

In [6]:
# Cleaning data for consistency before SQL import
loader.cleanData()
loader.data.head()

Cleaning data...
	column createdbyuserorganization missing - substituting NaN values
	column :@computed_region_qz3q_ghft not in defined set - dropping column
	column :@computed_region_k96s_3jcv not in defined set - dropping column
	column :@computed_region_kqwf_mjcx not in defined set - dropping column
	column :@computed_region_2dna_qi2s not in defined set - dropping column
	column :@computed_region_tatf_ua23 not in defined set - dropping column
	column :@computed_region_ur2y_g4cx not in defined set - dropping column
	Cleaning Complete: 0.0 minutes


Unnamed: 0,srnumber,createddate,updateddate,actiontaken,owner,requesttype,status,requestsource,createdbyuserorganization,mobileos,...,location,tbmpage,tbmcolumn,tbmrow,apc,cd,cdmember,nc,ncname,policeprecinct
0,1-88226601,2015-12-31 21:59:26,2016-01-02T14:29:52.000,SR Created,BOS,Dead Animal Removal,Closed,Call,,,...,"{'latitude': '34.2478180903', 'needs_recoding'...",504,B,5,North Valley APC,7,Felipe Fuentes,10.0,SUNLAND-TUJUNGA NC,FOOTHILL
1,1-88226481,2015-12-31 20:15:44,2015-12-31T20:21:03.000,SR Created,BOS,Dead Animal Removal,Cancelled,Call,,,...,"{'latitude': '33.9789717768', 'needs_recoding'...",674,C,7,South Los Angeles APC,9,Curren D. Price Jr.,86.0,COMMUNITY AND NEIGHBORS FOR NINTH DISTRICT UNI...,NEWTON
2,1-88226431,2015-12-31 20:02:50,2016-01-06T09:38:51.000,SR Created,BOS,Bulky Items,Closed,Call,,,...,"{'latitude': '34.1509935991', 'needs_recoding'...",562,A,4,South Valley APC,4,David Ryu,26.0,SHERMAN OAKS NC,VAN NUYS
3,1-88214281,2015-12-31 18:59:28,2016-01-02T09:04:07.000,SR Created,BOS,Bulky Items,Closed,Email,,,...,"{'latitude': '34.0459242166', 'needs_recoding'...",632,H,5,West Los Angeles APC,5,,,SOUTH ROBERTSON NC,
4,1-88213271,2015-12-31 18:58:38,2016-01-02T09:02:10.000,SR Created,BOS,Bulky Items,Closed,Email,,,...,"{'latitude': '34.0459242166', 'needs_recoding'...",632,H,5,West Los Angeles APC,5,Paul Koretz,61.0,SOUTH ROBERTSON NC,WEST LOS ANGELES


After cleaning we can output the data as a CSV file if desired. Since the `loader.data` object is a pandas dataframe, we can also write it out using any of the associated dataframe methods like `.to_csv`.

In [7]:
# Write data out as CSV
loader.saveCsvFile('../../testFile.csv')

Once the data has been cleaned, it is ready for import into the database implementation. By default, the `ingestData` method will use the `ingestMethod='replace'` parameter, which __overwrites the existing staging table in the database__ if you don't desire this functionality, you can specify `ingestMethod='append'`, but be aware that this could lead to duplicate rows and associated errors if used incorrectly.

In [8]:
# Ingest data into database
loader.ingestData(ingestMethod='replace')

Inserting data into Postgres instance...
	Ingest Complete: 1.0 minutes


We are also able to run the full process by using the `populateFullDatabase` method. Be aware that you must still run the initialization and config portions of the script before calling this method. The `yearRange` parameter expects a python `range` object. Keep in mind that the range object uses python indexing, so you will have to add 1 to the endpoint year.

In [9]:
# Run full ingestion pipeline
loader.populateFullDatabase(yearRange=range(2015,2016))



Performing fresh Postgres population from Socrata data sources
Downloading 2015 data from Socrata data source...
	Download Complete: 0.6 minutes
Cleaning data...
	column createdbyuserorganization missing - substituting NaN values
	column :@computed_region_qz3q_ghft not in defined set - dropping column
	column :@computed_region_k96s_3jcv not in defined set - dropping column
	column :@computed_region_kqwf_mjcx not in defined set - dropping column
	column :@computed_region_2dna_qi2s not in defined set - dropping column
	column :@computed_region_tatf_ua23 not in defined set - dropping column
	column :@computed_region_ur2y_g4cx not in defined set - dropping column
	Cleaning Complete: 0.0 minutes
Inserting data into Postgres instance...
	Ingest Complete: 1.3 minutes
All Operations Complete: 1.9 minutes


sqlIngest can also run partial updates using a paging request system. Those requests are then checked against the database. If the same srnumber is present in the database, the more recently queried record is inserted.

In [10]:
loader.fetchSocrata(year=2019, querySize=2000, pageSize=2000)
loader.cleanData()
loader.updateDatabase()



Retrieving partial Socrata query...
2000 records retrieved in 0.03 minutes
Cleaning data...
	Cleaning Complete: 0.0 minutes
Updating database with new records...
Operation Complete: 2000 inserts, 0 updates in 1.54 minutes
