Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connecting SQL Ingest to app routes #118

Merged
merged 3 commits into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*.csv filter=lfs diff=lfs merge=lfs -text
*.tsv filter=lfs diff=lfs merge=lfs -text
7 changes: 7 additions & 0 deletions server/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,17 @@ itsdangerous==1.1.0
Jinja2==2.10.3
MarkupSafe==1.1.1
multidict==4.5.2
numpy==1.17.4
pandas==0.25.3
psycopg2==2.8.4
python-dateutil==2.8.1
pytz==2019.3
requests==2.22.0
requests-async==0.5.0
rfc3986==1.3.2
sanic==19.9.0
six==1.13.0
SQLAlchemy==1.3.11
ujson==1.35
urllib3==1.25.7
uvloop==0.14.0
Expand Down
39 changes: 33 additions & 6 deletions server/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,24 @@
from services.time_to_close import time_to_close
from services.frequency import frequency
from services.ingress_service import ingress_service
from configparser import ConfigParser


app = Sanic(__name__)
app.config.from_pyfile(os.path.join(os.getcwd(),'settings.cfg'))

def configure_app():
# Settings initialization
config = ConfigParser()
settings_file = os.path.join(os.getcwd(),'settings.cfg')
config.read(settings_file)
app.config['Settings'] = config


@app.route('/')
async def index(request):
return json('You hit the index')


@app.route('/timetoclose')
async def timetoclose(request):
ttc_worker = time_to_close()
Expand All @@ -20,6 +30,7 @@ async def timetoclose(request):

return json(return_data)


@app.route('/requestfrequency')
async def requestfrequency(request):
freq_worker = frequency()
Expand All @@ -28,28 +39,44 @@ async def requestfrequency(request):

return json(return_data)


@app.route('/sample-data')
async def sample_route(request):
sample_dataset = {'cool_key':['value1', 'value2'], app.config['REDACTED']:app.config['REDACTED']}
return json(sample_dataset)

@app.route('/injest')
async def injest(request):
ingress_worker = ingress_service()
return_data = ingress_worker.injest()

@app.route('/ingest', methods=["POST"])
async def ingest(request):
'''Accept POST requests with a list of datasets to import\
based on the YearMapping. Body parameter format is \
{"sets": ["YearMappingKey","YearMappingKey","YearMappingKey"]}'''

ingress_worker = ingress_service(config=app.config['Settings'])
return_data = {'response':'ingest ok'}

for dataSet in request.json.get("sets", None):
target_data = app.config["Settings"]["YearMapping"][dataSet]
return_data = await ingress_worker.ingest(from_dataset=target_data)

return json(return_data)


@app.route('/update')
async def update(request):
ingress_worker = ingress_service()
return_data = ingress_worker.update()
return json(return_data)


@app.route('/delete')
async def delete(request):
ingress_worker = ingress_service()
return_data = ingress_worker.delete()
return json(return_data)



if __name__ == '__main__':
app.run(host=app.config['HOST'], port=app.config['PORT'], debug=app.config['DEBUG'])
configure_app()
app.run(host=app.config['Settings']['Server']['HOST'], port=app.config['Settings']['Server']['PORT'], debug=app.config['Settings']['Server']['DEBUG'])
22 changes: 18 additions & 4 deletions server/src/services/ingress_service.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
from .sqlIngest import DataHandler

class ingress_service(object):
def __init__(self):
pass
def __init__(self, config=None):
self.config = config


def injest(self):
return {'response':'injest ok'}
async def ingest(self, from_dataset=None):
loader = DataHandler(config=self.config)
loader.loadData(fileName=from_dataset)
loader.cleanData()
loader.ingestData()
return {'response':'ingest ok'}

def update(self):
return {'response':'update ok'}
Expand All @@ -13,3 +20,10 @@ def delete(self):

def hello_world(self):
return {'response':'hello from frequency service'}

if __name__ == "__main__":
from configparser import ConfigParser
config = ConfigParser()
config.read('../settings.cfg')
worker = ingress_service(config = config)
worker.ingest()
46 changes: 30 additions & 16 deletions server/src/sqlIngest.py → server/src/services/sqlIngest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from sqlalchemy.types import Integer, Text, String, DateTime, Float
from sqlalchemy import create_engine
import pandas as pd
Expand All @@ -6,26 +7,40 @@
import logging

class DataHandler:
def __init__(self):
def __init__(self, config=None, configFilePath=None, separator=','):
self.data = None
self.config = None
self.dbString = None
self.csvPath = None
self.configFilePath = None
self.config = config
self.dbString = None if not self.config else self.config['Database']['DB_CONNECTION_STRING']
self.filePath = None
self.configFilePath = configFilePath
self.separator = separator


def loadConfig(self, configFilePath):
'''Load and parse config data'''
if self.config:
print('Config already exists at %s. Nothing to load.' % self.configFilePath)
return

print('Loading config file %s' % self.configFilePath)
self.configFilePath = configFilePath
config = ConfigParser()
config.read(configFilePath)
self.config = config
self.dbString = config['Main']['DB_CONNECTION_STRING']
self.csvPath = "%s311data.tsv" % (config['Main']['CSV_DIRECTORY'])
def loadData(self):
self.dbString = config['Database']['DB_CONNECTION_STRING']


def loadData(self, fileName="311data"):
'''Load dataset into pandas object'''
print('Loading dataset %s' % self.csvPath)
self.data = pd.read_table(self.csvPath,
sep='\t',
if self.separator == ',':
dataFile = fileName + ".csv"
else:
dataFile = fileName + ".tsv"

self.filePath = os.path.join(self.config['Database']['DATA_DIRECTORY'], dataFile )
print('Loading dataset %s' % self.filePath)
self.data = pd.read_table(self.filePath,
sep=self.separator,
na_values=['nan'],
dtype={
'SRNumber':str,
Expand Down Expand Up @@ -62,6 +77,7 @@ def loadData(self):
'NCName':str,
'PolicePrecinct':str
})

def cleanData(self):
'''Perform general data filtering'''
print('Cleaning 311 dataset...')
Expand All @@ -80,12 +96,13 @@ def cleanData(self):
data['service_created'] = data.ServiceDate-data.CreatedDate
# drop NA values and reformat closed_created in units of hours
data = data[~data.closed_created.isna()]
# New column: closed_created in units of days
# New column: closed_created in units of days
data['closed_createdD'] = data.closed_created / pd.Timedelta(days=1)
# xFUTURE: Geolocation/time clustering to weed out repeat requests
# xFUTURE: Decide whether ServiceDate or ClosedDate are primary metric
# xFUTURE: Removal of feedback and other categories
self.data = data

def ingestData(self):
'''Set up connection to database'''
print('Inserting data into Postgres instance...')
Expand Down Expand Up @@ -140,10 +157,7 @@ def ingestData(self):

if __name__ == "__main__":
loader = DataHandler()
loader.loadConfig('settings.cfg')
loader.loadConfig('../settings.cfg')
loader.loadData()
loader.cleanData()
loader.ingestData()



13 changes: 11 additions & 2 deletions server/src/settings.example.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
[Main]
[Server]
DEBUG = True
HOST = 0.0.0.0
PORT = 5000
DB_CONNECTION_STRING = postgres://REDACTED:REDACTED@somehost/postgres

[Database]
DB_CONNECTION_STRING = postgres://REDACTED:REDACTED@localhost:5432/postgres
DATA_DIRECTORY = .

[Api]
REDACTED = REDACTED

[YearMapping]
2018_MINI = 2018_mini
2018_FULL = 311data
3 changes: 3 additions & 0 deletions server/src/static/2018_full.csv
Git LFS file not shown
3 changes: 3 additions & 0 deletions server/src/static/2018_mini.csv
Git LFS file not shown