In [1]:
import time
import grequests
import geopandas as gpd
import pandas as pd
from shapely.geometry import LineString
from geoalchemy2.types import Geometry
import datetime
import psycopg2
from sqlalchemy import create_engine, types
from credentials import secrets

In [2]:
df1, df2 = pd.read_csv('data/kaggle/test.csv'), pd.read_csv('data/kaggle/train.csv')
df2 = df2[df1.columns]
df = pd.concat([df1, df2]).reset_index(drop=True).drop(columns=['id', 'vendor_id', 'store_and_fwd_flag'])
df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'])

In [3]:
df.head()

Unnamed: 0,pickup_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
0,2016-06-30 23:59:58,1,-73.988129,40.732029,-73.990173,40.75668
1,2016-06-30 23:59:53,1,-73.964203,40.679993,-73.959808,40.655403
2,2016-06-30 23:59:47,1,-73.997437,40.737583,-73.98616,40.729523
3,2016-06-30 23:59:41,1,-73.95607,40.7719,-73.986427,40.730469
4,2016-06-30 23:59:33,1,-73.970215,40.761475,-73.96151,40.75589


## Create DB and table to store routes

In [4]:
engine = create_engine("postgresql://{user}:{password}@localhost:5432/nyctaxis".format(
    user=secrets['user'],
    password=secrets['password']
))
conn = psycopg2.connect(
    host="localhost",
    port=5432,
    database="nyctaxis", 
    user=secrets["user"], 
    password=secrets["password"]
)
cur = conn.cursor()

In [None]:
# From terminal we create the db and enable postgis libraries 
# psql -U postgres
# CREATE DATABASE nyctaxis;
# \c nyctaxis;
# CREATE EXTENSION postgis;

In [5]:
sql = """
    DROP TABLE IF EXISTS routes;

    CREATE TABLE routes(
        -- id SERIAL PRIMARY KEY,
        passenger_count INTEGER,
        length REAL,
        pickup_datetime TIMESTAMP,
        startp GEOMETRY(POINT, 0),
        endp GEOMETRY(POINT, 0),
        route GEOMETRY(LINESTRING, 4326)
    );
"""
cur.execute(sql)
conn.commit()
print('Table routes created.')

Table routes created.


In [6]:
url = "http://192.168.0.116:5000/route/v1/driving/{slng},{slat};{elng},{elat}?overview=full&geometries=geojson"
insert_sql = "INSERT INTO routes(pickup_datetime, passenger_count, route, startp, endp, length) VALUES (%s, %s, %s, %s, %s, %s);"

rejected = []
tot_start = time.time()
step = 5000
last = 0
start_date, end_date = df.pickup_datetime.min(), df.pickup_datetime.max() #+ datetime.timedelta(days=7)
period = df[(df.pickup_datetime>= start_date) & (df.pickup_datetime<end_date)].reset_index(drop=True)
print('Total routes to construct:', len(period))
for new in period.index[::step]:
    start = time.time()
    block = period.loc[last:new]
    urls = block.apply(lambda r: url.format(
        slng=r['pickup_longitude'],
        slat=r['pickup_latitude'], 
        elng=r['dropoff_longitude'], 
        elat=r['dropoff_latitude']), axis=1)
    
    rs = (grequests.get(u) for u in urls)
    responses = grequests.map(rs)
    routes = []
    mask = []
    for res in responses:
        try:
            routes.append(LineString(res.json()['routes'][0]['geometry']['coordinates']))
            mask.append(True)
        except:
            rejected.append(res)
            mask.append(False)
    
    valids = block.loc[mask]    
    routes = (gpd.GeoDataFrame(valids, geometry=routes, crs='EPSG:4326')
        .rename(columns={'geometry': 'route'})
        .set_geometry('route')
        .drop(columns=['pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude'])
     )
    routes['startp'] = gpd.points_from_xy(valids.pickup_longitude, valids.pickup_latitude, crs='EPSG:4326')
    routes['endp'] = gpd.points_from_xy(valids.dropoff_longitude, valids.dropoff_latitude, crs='EPSG:4326')
    routes['length'] = routes.to_crs('EPSG:32618').geometry.length

#    cur.executemany(insert_sql, [tuple(r.values) for _, r in routes.iterrows()])
    routes.to_postgis('routes', engine, if_exists='append', index=False,
                      dtype={
                          'route': Geometry(geometry_type='LINESTRING', srid=4326),
                          'startp': Geometry(geometry_type='POINT', srid=4326),
                          'endp': Geometry(geometry_type='POINT', srid=4326),
                      }
     )

    print(f'Requests {last}:{new} took {round(time.time()-start, 1)} secs.')
    last = new

print(f'Calculated {len(period)} in {round((time.time()-tot_start)/3660, 2)} hours.')

Total routes to construct: 2083777
Requests 0:0 took 0.4 secs.
Requests 0:5000 took 50.3 secs.
Requests 5000:10000 took 31.3 secs.
Requests 10000:15000 took 43.4 secs.
Requests 15000:20000 took 31.3 secs.
Requests 20000:25000 took 32.0 secs.
Requests 25000:30000 took 132.6 secs.
Requests 30000:35000 took 39.4 secs.
Requests 35000:40000 took 51.0 secs.
Requests 40000:45000 took 35.7 secs.
Requests 45000:50000 took 30.3 secs.
Requests 50000:55000 took 50.3 secs.
Requests 55000:60000 took 33.2 secs.
Requests 60000:65000 took 37.4 secs.
Requests 65000:70000 took 124.1 secs.
Requests 70000:75000 took 31.7 secs.
Requests 75000:80000 took 32.0 secs.
Requests 80000:85000 took 55.0 secs.
Requests 85000:90000 took 32.9 secs.
Requests 90000:95000 took 32.4 secs.
Requests 95000:100000 took 94.4 secs.
Requests 100000:105000 took 30.9 secs.
Requests 105000:110000 took 55.6 secs.


Traceback (most recent call last):
  File "src/gevent/greenlet.py", line 906, in gevent._gevent_cgreenlet.Greenlet.run
  File "/home/tomas/miniconda3/envs/psql_course/lib/python3.8/site-packages/grequests.py", line 74, in send
    self.response = self.session.request(self.method,
  File "/home/tomas/miniconda3/envs/psql_course/lib/python3.8/site-packages/requests/sessions.py", line 528, in request
    prep = self.prepare_request(req)
  File "/home/tomas/miniconda3/envs/psql_course/lib/python3.8/site-packages/requests/sessions.py", line 453, in prepare_request
    auth = get_netrc_auth(request.url)
  File "/home/tomas/miniconda3/envs/psql_course/lib/python3.8/site-packages/requests/utils.py", line 179, in get_netrc_auth
    from netrc import netrc, NetrcParseError
KeyboardInterrupt
2021-06-28T19:28:10Z <Greenlet at 0x7fb8182367b0: <bound method AsyncRequest.send of <grequests.AsyncRequest object at 0x7fb8311c4cd0>>(stream=False)> failed with KeyboardInterrupt



KeyboardInterrupt: 

In [7]:
cur.execute("ALTER TABLE routes ADD COLUMN id SERIAL PRIMARY KEY;")
conn.commit()

In [8]:
cur.execute("""
    UPDATE routes 
    SET 
        startp = ST_SetSRID(startp, 4326),
        endp = ST_SetSRID(endp, 4326)
""")
conn.commit()