# Data rendering

In previous steps of our analysis, we have populated a database with some typical open data and enhanced it with home-made clustering analysis. Now comes the time of preparing access to our handsome database!

For each rendered data, the objective is to produce a `json` version of the information, that will be sent downstream to API clients.

## Introduction

In [None]:
from datetime import datetime, date, timedelta
from dateutil.parser import parse
from itertools import groupby
import json
import os

In [None]:
import pandas as pd
from sqlalchemy import create_engine

## Configuration

In [None]:
DATADIR = "../data"

In [None]:
HOST = "localhost"
PORT = 5432
USER = "osboxes"
DBNAME = "jitenshea"

## Utilities

In [None]:
def get_engine():
    url = "postgresql://{user}@{host}:{port}/{dbname}".format(user=USER, host=HOST, port=PORT, dbname=DBNAME)
    return create_engine(url)
engine = get_engine()

In [None]:
def station_geojson(stations, feature_list):
    """Process station data into GeoJSON
    """
    result = []
    for data in stations:
        result.append(
            {"type": "Feature",
             "geometry": {
                 "type": "Point",
                 "coordinates": [data['x'], data['y']]
             },
             "properties": {k: data[k] for k in feature_list}
            })
    return {"type": "FeatureCollection", "features": result}

In [None]:
def clustered_station_geojson(stations):
    """Process station data into GeoJSON

    Parameters
    ----------
    stations : list of dicts
        Clustered stations

    Returns
    -------
    dict
        Clustered stations formatted as a GeoJSon object
    """
    result = []
    for data in stations:
        result.append(
            {"type": "Feature",
             "geometry": {
                 "type": "Point",
                 "coordinates": [data['x'], data['y']]
             },
             "properties": {
                 "id": data['id'],
                 "cluster_id": data['cluster_id'],
                 "name": data['name'],
                 "start": data['start'],
                 "stop": data['stop']
             }})
    return {"type": "FeatureCollection", "features": result}

In [None]:
def parse_timestamp(str_timestamp):
    """Parse a string and convert it to a datetime

    ISO 8601 format, i.e.
      - YYYY-MM-DD
      - YYYY-MM-DDThh
      - YYYY-MM-DDThhmm
    """
    try:
        dt = parse(str_timestamp)
    except Exception as e:
        api.abort(422, "date from the request cannot be parsed: {}".format(e))
    return dt

## Render the station list

*Use case:* A user wants to have information about two stations, assuming that he knows the station IDs

In [None]:
ids = [10001, 10002]

One simply has to define a `select` query to retrieve the station data...

In [None]:
station_query = """
SELECT id, name, address, city, nb_stands, ST_X(geom) as x, ST_Y(geom) as y
FROM lyon.station
WHERE id IN %(id_list)s
"""

In [None]:
rset = engine.execute(station_query, id_list=tuple(str(x) for x in ids)).fetchall()

One gets a list of rows that correspond to asked IDs.

In [None]:
rset

The only remaining work is to tune the data format so as to get a `json`-compatible version of the data:

In [None]:
{"data": [dict(zip(x.keys(), x)) for x in rset]}

In a nutshell, we design a function to reproduce the results within the API:

In [None]:
def render_station_list(ids):
    station_query = """
SELECT id, name, address, city, nb_stands, ST_X(geom) as x, ST_Y(geom) as y
FROM lyon.station
WHERE id IN %(id_list)s
"""
    rset = engine.execute(station_query, id_list=tuple(str(x) for x in ids)).fetchall()
    return {"data": [dict(zip(x.keys(), x)) for x in rset]}

## Render the freshest bike availability status 

*Use case:* A user wants to get the up-to-date availability data in the bike-sharing network.

In [None]:
freshest_avl_query = """
WITH latest AS (
SELECT id, timestamp, available_bikes as nb_bikes, rank() over (partition by id order by timestamp desc) AS rank
FROM lyon.timeseries
WHERE timestamp >= %(min_date)s
)
SELECT
P.id, P.timestamp, P.nb_bikes, S.name, S.nb_stands, ST_X(S.geom) AS x, ST_Y(S.geom) AS y
FROM latest AS P
JOIN lyon.station AS S USING(id)
WHERE P.rank=1
ORDER BY id
LIMIT %(limit)s
"""

In [None]:
min_date = datetime.now() - timedelta(2)
limit = 3
rset = engine.execute(freshest_avl_query, min_date=min_date, limit=limit)
freshest_avl_result = [dict(zip(row.keys(), row)) for row in rset]
latest_date = max(x["timestamp"] for x in freshest_avl_result)

In [None]:
{"date": latest_date, "data": freshest_avl_result}

Instead of rendering `x` and `y` coordinates as classic features, one can use `geojson` format in order to provide a geotool-compatible format:

In [None]:
station_geojson(freshest_avl_result, feature_list=['id', 'name', 'timestamp', 'nb_bikes', 'nb_stands'])

In a nutshell, we design another function to reproduce the results within the API:

In [None]:
def render_availability(limit, n_day_before):
    freshest_avl_query = """
WITH latest AS (
SELECT id, timestamp, available_bikes as nb_bikes, rank() over (partition by id order by timestamp desc) AS rank
FROM lyon.timeseries
WHERE timestamp >= %(min_date)s
)
SELECT
P.id, P.timestamp, P.nb_bikes, S.name, S.nb_stands, ST_X(S.geom) AS x, ST_Y(S.geom) AS y
FROM latest AS P
JOIN lyon.station AS S USING(id)
WHERE P.rank=1
ORDER BY id
LIMIT %(limit)s
"""
    min_date = datetime.now() - timedelta(n_day_before)
    rset = engine.execute(freshest_avl_query, min_date=min_date, limit=limit)
    result = [dict(zip(row.keys(), row)) for row in rset]
    latest_date = max(x["timestamp"] for x in result)
    # return {"date": latest_date, "data": freshest_avl_result}
    return station_geojson(result, feature_list=['id', 'name', 'timestamp', 'nb_bikes', 'nb_stands'])

## Render a bike availability timeseries

*Use case:* A user wants to get the history of bike availability on a given station, between two dates.

In [None]:
station_ids = [1001]
stop = datetime(2019, 8, 12, 10, 0)
start = stop - timedelta(hours=1)
start, stop

In [None]:
timeseries_query = """
SELECT T.*, S.name AS name, S.nb_stands AS nb_stands
FROM lyon.timeseries AS T
LEFT JOIN lyon.station AS S USING(id)
WHERE id IN %(id_list)s AND timestamp >= %(start)s AND timestamp < %(stop)s
ORDER BY id, timestamp
"""    

In [None]:
rset = engine.execute(timeseries_query, id_list=tuple(str(x) for x in station_ids), start=start, stop=stop)
data = [dict(zip(x.keys(), x)) for x in rset]
values = []
for k, group in groupby(data, lambda x: x['id']):
    group = list(group)
    values.append({
        "id": k,
        "name": group[0]['name'],
        "nb_stands": group[0]['nb_stands'],
        "ts": [x['timestamp'] for x in group],
        "available_bikes": [x['available_bikes'] for x in group]
    })
{"data": values}

In a nutshell, the rendering function:

In [None]:
def render_timeseries(station_ids, start, stop):
    timeseries_query = """
SELECT T.*, S.name AS name, S.nb_stands AS nb_stands
FROM lyon.timeseries AS T
LEFT JOIN lyon.station AS S USING(id)
WHERE id IN %(id_list)s AND timestamp >= %(start)s AND timestamp < %(stop)s
ORDER BY id, timestamp
"""
    rset = engine.execute(timeseries_query, id_list=tuple(str(x) for x in station_ids), start=start, stop=stop)
    data = [dict(zip(x.keys(), x)) for x in rset]
    values = []
    for k, group in groupby(data, lambda x: x['id']):
        group = list(group)
        values.append({
            "id": k,
            "name": group[0]['name'],
            "nb_stands": group[0]['nb_stands'],
            "ts": [x['timestamp'] for x in group],
            "available_bikes": [x['available_bikes'] for x in group]
        })
    return {"data": values}

## Render the station clusters

*Use case:* A user wants to recover the classification of shared-bike stations in Lyon.

In [None]:
cluster_query = """
WITH ranked_clusters AS (
SELECT
cs.station_id AS id,
cs.cluster_id,
cs.start AS start,
cs.stop AS stop,
st.name AS name,
st.geom AS geom,
rank() OVER (ORDER BY stop DESC) AS rank
FROM lyon.cluster AS cs
JOIN lyon.station AS st
ON st.id = cs.station_id)
SELECT id, cluster_id, start, stop, name,
st_x(geom) as x,
st_y(geom) as y
FROM ranked_clusters
WHERE rank=1
"""

In [None]:
rset = engine.execute(cluster_query)
data = {"data": [dict(zip(rset.keys(), row)) for row in rset]}

Similarly to the availability data, we get some `(x,y)` points, hence we can propose a `geojson` version of the data.

In [None]:
clustered_station_geojson(data["data"])

As previously, we propose a rendering function:

In [None]:
def render_clusters():
    cluster_query = """
WITH ranked_clusters AS (
SELECT
cs.station_id AS id,
cs.cluster_id,
cs.start AS start,
cs.stop AS stop,
st.name AS name,
st.geom AS geom,
rank() OVER (ORDER BY stop DESC) AS rank
FROM lyon.cluster AS cs
JOIN lyon.station AS st
ON st.id = cs.station_id)
SELECT id, cluster_id, start, stop, name,
st_x(geom) as x,
st_y(geom) as y
FROM ranked_clusters
WHERE rank=1
"""
    rset = engine.execute(cluster_query)
    data = {"data": [dict(zip(rset.keys(), row)) for row in rset]}
    # return data
    return clustered_station_geojson(data["data"])

## Render the station cluster centroids

*Use case:* A user would like to extract the typical week day profile of clustered stations in Lyon.

As several clustering may have be computed, we tune the query in order to get the freshest period output.

In [None]:
centroid_query = """
WITH ranked_centroids AS (
SELECT *, rank() OVER (ORDER BY stop DESC) AS rank
FROM lyon.centroid
)
SELECT cluster_id,
h00, h01, h02, h03, h04, h05, h06, h07, h08, h09, h10, h11,
h12, h13, h14, h15, h16, h17, h18, h19, h20, h21, h22, h23,
start, stop
FROM ranked_centroids
WHERE rank=1
"""

In [None]:
centroids = pd.io.sql.read_sql_query(centroid_query, engine)

In [None]:
centroids.set_index("cluster_id", inplace=True)

In [None]:
centroid_result = []
for cluster_id, cluster in centroids.iterrows():
    centroid_result.append({
        "cluster_id": cluster_id,
        "start": cluster["start"],
        "stop": cluster["stop"],
        "hour": list(range(24)),
        "values": [cluster[h] for h in centroids.columns[:-2]]
    })

In [None]:
{"data": centroid_result}

In [None]:
def render_centroids():
    centroid_query = """
WITH ranked_centroids AS (
SELECT *, rank() OVER (ORDER BY stop DESC) AS rank
FROM lyon.centroid
)
SELECT cluster_id,
h00, h01, h02, h03, h04, h05, h06, h07, h08, h09, h10, h11,
h12, h13, h14, h15, h16, h17, h18, h19, h20, h21, h22, h23,
start, stop
FROM ranked_centroids
WHERE rank=1
"""
    centroids = pd.io.sql.read_sql_query(centroid_query, engine)
    centroids.set_index("cluster_id", inplace=True)
    centroid_result = []
    for cluster_id, cluster in centroids.iterrows():
        centroid_result.append({
            "cluster_id": cluster_id,
            "start": cluster["start"],
            "stop": cluster["stop"],
            "hour": list(range(24)),
            "values": [cluster[h] for h in centroids.columns[:-2]]
        })
    return {"data": centroid_result}

## Build the API

In [None]:
from flask import jsonify, Flask, render_template
from flask_restplus import inputs, Resource, Api
from werkzeug.routing import BaseConverter

In [None]:
from jitenshop.webapp import CustomJSONEncoder, ListConverter

In [None]:
api = Api(
    title="jitenshop: small bike-sharing data analysis",
    prefix="/api",
    doc=False,
    version="0.1",
    description="get some simple bike-sharing data from Lyon Open Data portal"
)

In [None]:
availability_parser = api.parser()
availability_parser.add_argument(
    "limit", required=False, type=int, default=1000, dest="limit", location="args",
    help="Number of station to query"
)
availability_parser.add_argument(
    "n_days_before", required=False, type=int, default=365, dest="n_days_before", location="args",
    help="Number of days to query before today"
)
@api.route("/lyon/availability")
class CityStationList(Resource):
    @api.doc(parser=availability_parser, description="Latest bike availability at stations")
    def get(self):
        args = availability_parser.parse_args()
        limit = args["limit"]
        n_days_before = args["n_days_before"]
        return jsonify(render_availability(limit, n_days_before))

In [None]:
@api.route("/lyon/stations/<list:ids>")
class CityStation(Resource):
    @api.doc(description="Shared-bike stations")
    def get(self, ids):
        return jsonify(render_station_list(ids))

In [None]:
timeseries_parser = api.parser()
timeseries_parser.add_argument(
    "start", required=True, dest="start", location="args",
    help="Start date YYYY-MM-DDThhmm"
)
timeseries_parser.add_argument(
    "stop", required=True, dest="stop", location="args",
    help="Stop date YYYY-MM-DDThhmm"
)
@api.route("/lyon/timeseries/<list:ids>")
class TimeseriesStation(Resource):
    """Render the bike availability timeseries in Lyon between two dates of
    interest
    """
    @api.doc(parser=timeseries_parser,
             description="Bike availability timeseries")
    def get(self, ids):
        args = timeseries_parser.parse_args()
        start = parse_timestamp(args['start'])
        stop = parse_timestamp(args['stop'])
        return jsonify(render_timeseries(ids, start, stop))

In [None]:
@api.route("/lyon/clusters")
class CityClusteredStation(Resource):
    @api.doc(description="Clustered stations according to k-means algorithm")
    def get(self):
        return jsonify(render_clusters())

@api.route("/lyon/centroids")
class CityClusterCentroids(Resource):
    @api.doc(description="Centroids of k-means clusters")
    def get(self):
        return jsonify(render_centroids())

## Build the web application with Flask

In [None]:
app = Flask(
    __name__,
    template_folder="../jitenshop/webapp/templates",
)
app.url_map.converters['list'] = ListConverter
app.json_encoder = CustomJSONEncoder

In [None]:
@app.route("/")
def index():
    return render_template("index.html")
@app.route("/doc")
def swagger_ui():
    return render_template("swagger-ui.html")
@app.route("/lyon")
def city_view():
    return render_template("city.html")
@app.route("/lyon/<int:station_id>")
def station_view(station_id):
    return render_template("station.html", station_id=station_id)

In [None]:
api.init_app(app)
app.run(port=7997)