In [1]:
import pandas as pd
import boto3
import numpy as np
import os
import matplotlib.pyplot as plt
from datetime import datetime   
from dateutil import relativedelta
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.cluster import KMeans
from shapely.wkt import loads
import geopandas as gpd
from sklearn.neighbors import NearestNeighbors, KNeighborsRegressor
from sklearn.neighbors.regression import check_array, _get_weights

def get_matching_s3_keys(bucket, prefix='', suffix='', contain=''):
    """
    Generate the keys in an S3 bucket.

    :param bucket: Name of the S3 bucket.
    :param prefix: Only fetch keys that start with this prefix (optional).
    :param suffix: Only fetch keys that end with this suffix (optional).
    """
    s3 = boto3.client('s3')
    kwargs = {'Bucket': bucket}

    # If the prefix is a single string (not a tuple of strings), we can
    # do the filtering directly in the S3 API.
    if isinstance(prefix, str):
        kwargs['Prefix'] = prefix

    while True:

        # The S3 API response is a large blob of metadata.
        # 'Contents' contains information about the listed objects.
        resp = s3.list_objects_v2(**kwargs)
        for obj in resp['Contents']:
            key = obj['Key']
            if key.startswith(prefix) and key.endswith(suffix) and (contain in key):
                yield key

        # The S3 API is paginated, returning up to 1000 keys at a time.
        # Pass the continuation token into the next response, until we
        # reach the final page (when this field is missing).
        try:
            kwargs['ContinuationToken'] = resp['NextContinuationToken']
        except KeyError:
            break


class MedianKNNRegressor(KNeighborsRegressor):

    def predict(self, X):
        X = check_array(X, accept_sparse='csr')

        neigh_dist, neigh_ind = self.kneighbors(X)

        weights = _get_weights(neigh_dist, self.weights)

        _y = self._y
        
        if _y.ndim == 1:
            _y = _y.reshape((-1, 1))

        ######## Begin modification
        if weights is None:
            y_pred = np.median(_y[neigh_ind], axis=1)
        else:
            # y_pred = weighted_median(_y[neigh_ind], weights, axis=1)
            raise NotImplementedError("weighted median")
        ######### End modification

        if self._y.ndim == 1:
            y_pred = y_pred.ravel()

        return y_pred    
    

In [2]:
fd = 'shapes_development/patch_osm_ordered_v2/state=al/part-00113-3b31d96d-445d-4d0c-84b5-47f1f7f29c08.c000.csv'

ofile = fd.replace('patch_osm_ordered_delivery_v2', 'patch_osm_ordered_v2')

try:
    for p in get_matching_s3_keys('inrixprod-volumes',
                                  prefix=ofile):
        print( "Already completed: {}".format(ofile) )
except KeyError:
    print('not completed')

Already completed: shapes_development/patch_osm_ordered_v2/state=al/part-00113-3b31d96d-445d-4d0c-84b5-47f1f7f29c08.c000.csv


In [8]:
src = 'data_resources_temp'
dest = 'outdata_temp'
model = True
frc_specific_ratios = True
num = 'cross_counts'
ignore_list = []

for fg in get_matching_s3_keys('inrixprod-volumes',
                               prefix= 'shapes_development/patch_aadt_complete/',
                               suffix='.parquet.snappy'):
    ignore_list.append(fg)

for fg in get_matching_s3_keys('inrixprod-volumes',
                               prefix='shapes_development/patch_aadt_by_state/',
                               suffix='.csv'):

    dest_s3 = fg.replace('shapes_development/patch_aadt_by_state/', 'shapes_development/patch_aadt_complete/')[:48]

    if dest_s3 in ignore_list:
        continue

    s3 = boto3.client('s3')

    if not os.path.exists(src):
        os.mkdir(src)
    if not os.path.exists(dest):
        os.mkdir(dest)

    pth_trip = os.path.join(src, 'trip_paths_byminute.csv')

    s3.download_file('inrixprod-volumes',
                     fg,
                     pth_trip)

    df_trip = pd.read_csv(pth_trip, escapechar='\\')
    mxc = df_trip['minute_counts'].max()

    df_trip['hpms_volume'] = df_trip['hpms_volume'] * 1.0 / 7

    df_trip['crossing_norm'] = (df_trip['cross_counts'] - df_trip['cross_counts'].mean()) / df_trip[
        'cross_counts'].std()
    df_trip['frc_norm'] = (df_trip['frc'] - df_trip['frc'].mean()) / df_trip['frc'].std()

    df_trip['latitude'] = df_trip['geometry'].apply(lambda x: loads(x).centroid.y)
    df_trip['longitude'] = df_trip['geometry'].apply(lambda x: loads(x).centroid.x)

    df_trip['lat_norm'] = (df_trip['latitude'] - df_trip['latitude'].mean()) / df_trip['latitude'].std()
    df_trip['lon_norm'] = (df_trip['longitude'] - df_trip['longitude'].mean()) / df_trip['longitude'].std()

    df_miss = df_trip[df_trip['hpms_volume'].apply(lambda x: np.isnan(x))]
    df_trip = df_trip[df_trip['hpms_volume'].apply(lambda x: ~np.isnan(x))]

    df_trip['prob'] = df_trip['minute_counts'].apply(lambda x: -np.log(1 - np.sqrt(x * 1.0 / mxc)))
    df_miss['prob'] = df_miss['minute_counts'].apply(lambda x: -np.log(1 - np.sqrt(x * 1.0 / mxc)))

    df_trip = df_trip[df_trip['prob'].apply(lambda x: ~np.isinf(x))]

    pred_list = ['latitude', 'longitude', 'lat_norm', 'lon_norm', 'frc_norm', 'crossing_norm']

    df_trip['ratio'] = df_trip['cross_counts'] / df_trip['hpms_volume']
    df_miss['ratio'] = df_miss['cross_counts'] / df_miss['hpms_volume']

    trip_gr = df_trip[['ratio', 'frc']].groupby('frc').quantile([0.1, 0.9]).reset_index()
    trip_gr = trip_gr.pivot(index='frc', columns='level_1', values='ratio').reset_index()
    trip_gr.columns = ['frc', 'mins', 'maxs']

    df_trip = df_trip.merge(trip_gr, on='frc')

    df_trip = df_trip[
        pred_list + ['segid', 'roadname', 'frc', 'geometry', 'hpms_volume', 'cross_counts', 'minute_counts', 'prob',
                     'ratio', 'mins', 'maxs']]
    df_miss = df_miss[
        pred_list + ['segid', 'roadname', 'frc', 'geometry', 'hpms_volume', 'cross_counts', 'minute_counts', 'prob',
                     'ratio']]

    df_train = df_trip[(df_trip['ratio'] > 0.01) & \
                       (df_trip['ratio'] < 10.0) & \
                       (df_trip['ratio'].apply(lambda x: ~np.isnan(x)))].copy()

    df_test = df_trip.copy()
    df_misst = df_miss.copy()

    mod = MedianKNNRegressor(n_neighbors=300, n_jobs=8)
    pred = ['lat_norm', 'lon_norm', 'crossing_norm']

    mod.fit(df_train[pred], df_train['ratio'])

    df_test['ratio_prd'] = mod.predict(df_test[pred])
    df_misst['ratio_prd'] = mod.predict(df_misst[pred])

    df_test['l360v'] = df_test['cross_counts'] / df_test['ratio_prd']
    df_misst['l360v'] = df_misst['cross_counts'] / df_misst['ratio_prd']

    df_test['raw_error'] = df_test['hpms_volume'] - df_test['l360v']
    df_test['abs_error'] = df_test['raw_error'].apply(np.abs)
    df_test['perc_error'] = df_test['raw_error'] / ((df_test['hpms_volume'] + df_test['l360v']) * 0.5)
    df_test['perc_abs_error'] = df_test['perc_error'].apply(np.abs)

    df_misst['raw_error'] = -1
    df_misst['abs_error'] = -1
    df_misst['perc_error'] = -1
    df_misst['perc_abs_error'] = -1

    cls = ['segid', 'roadname', 'frc', 'geometry', 'hpms_volume', 'l360v', 'cross_counts', 'raw_error', 'abs_error',
           'perc_error', 'perc_abs_error', 'ratio_prd']

    df_test = df_test[cls]
    df_misst = df_misst[cls]

    df_misst.columns = ['segid', 'roadname', 'frc', 'geometry', 'hpms_volume', 'predicted_volume', 'cross_counts',
                        'raw_error', 'abs_error', 'perc_error', 'perc_abs_error', 'ratio']

    df_test.columns = ['segid', 'roadname', 'frc', 'geometry', 'hpms_volume', 'predicted_volume', 'cross_counts',
                       'raw_error', 'abs_error', 'perc_error', 'perc_abs_error', 'ratio']

    df_test.to_parquet(os.path.join(dest, 'simple_observe.parquet.snappy'), index=False)

    df_misst.to_parquet(os.path.join(dest, 'simple_predict.parquet.snappy'), index=False)

    s3 = boto3.client('s3')

    s3.upload_file(os.path.join(dest, 'simple_observe.parquet.snappy'),
                   'inrixprod-volumes',
                   dest_s3 + 'simple_observe.parquet.snappy')

    s3.upload_file(os.path.join(dest, 'simple_predict.parquet.snappy'),
                   'inrixprod-volumes',
                   dest_s3 + 'simple_predict.parquet.snappy')

    print('{} Complete'.format(dest_s3))

shapes_development/patch_aadt_complete/state=al/
shapes_development/patch_aadt_complete/state=ar/
shapes_development/patch_aadt_complete/state=az/
shapes_development/patch_aadt_complete/state=ca/
shapes_development/patch_aadt_complete/state=co/
shapes_development/patch_aadt_complete/state=ct/
shapes_development/patch_aadt_complete/state=de/
shapes_development/patch_aadt_complete/state=fl/
shapes_development/patch_aadt_complete/state=ga/
shapes_development/patch_aadt_complete/state=ia/
shapes_development/patch_aadt_complete/state=id/
shapes_development/patch_aadt_complete/state=il/
shapes_development/patch_aadt_complete/state=in/
shapes_development/patch_aadt_complete/state=ks/
shapes_development/patch_aadt_complete/state=ky/
shapes_development/patch_aadt_complete/state=la/
shapes_development/patch_aadt_complete/state=ma/
shapes_development/patch_aadt_complete/state=md/
shapes_development/patch_aadt_complete/state=me/
shapes_development/patch_aadt_complete/state=mi/
shapes_development/p