In [1]:
import pandas as pd
import numpy as np

data = pd.read_csv('input/train.csv', nrows = 5_000_000, 
                  usecols = ['key', 'pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude'])

data = data.dropna()

# Remove latitude and longtiude outliers
data = data.loc[data['pickup_latitude'].between(40, 42)]
data = data.loc[data['pickup_longitude'].between(-75, -72)]
data = data.loc[data['dropoff_latitude'].between(40, 42)]
data = data.loc[data['dropoff_longitude'].between(-75, -72)]

In [2]:
data_rounded = data.round(3).groupby(['pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude'])['key'].\
                            count().reset_index().\
                            drop(columns = 'key')

len(data_rounded)

2793390

In [3]:
data_rounded['pickup'] = data_rounded['pickup_latitude'].astype(str) + "," + data_rounded['pickup_longitude'].astype(str)
data_rounded['dropoff'] = data_rounded['dropoff_latitude'].astype(str) + "," + data_rounded['dropoff_longitude'].astype(str)
data_rounded['l'] = data_rounded['pickup'] + "|" + data_rounded['dropoff']
data_rounded.head()

Unnamed: 0,pickup_latitude,pickup_longitude,dropoff_latitude,dropoff_longitude,pickup,dropoff,l
0,40.034,-74.276,40.034,-74.276,"40.034,-74.276","40.034,-74.276","40.034,-74.276|40.034,-74.276"
1,40.053,-74.968,40.041,-74.96,"40.053,-74.968","40.041,-74.96","40.053,-74.968|40.041,-74.96"
2,40.054,-73.017,40.057,-72.987,"40.054,-73.017","40.057,-72.987","40.054,-73.017|40.057,-72.987"
3,40.057,-72.963,40.087,-73.036,"40.057,-72.963","40.087,-73.036","40.057,-72.963|40.087,-73.036"
4,40.061,-73.532,40.065,-73.574,"40.061,-73.532","40.065,-73.574","40.061,-73.532|40.065,-73.574"


In [4]:
import googlemaps

with open('/home/ec2-user/keys/gmap.txt', 'r') as f:
    gmap_key = str(f.read()).strip()
    
# Authenticate with google maps
gmaps = googlemaps.Client(key=gmap_key)

In [5]:
def row_proc(l):
    pickup, dropoff = l.split('|')
    geocode_result = gmaps.distance_matrix(pickup,dropoff)
    #print (geocode_result)
    try:
        distance = float(geocode_result['rows'][0]['elements'][0]['distance']['text'].split()[0])
        duration = geocode_result['rows'][0]['elements'][0]['duration']['text'].split()
        
        if len(duration)==4:
            mins = float(duration[0])*60 + float(duration[2])
        
        elif len(duration) > 4:
            print(geocode_result)
            
        else:
            mins = float(duration[0])
    except:
        mins = np.nan
        distance = np.nan
        
    try:
        origin = str(geocode_result['origin_addresses'][0])
        dest = str(geocode_result['destination_addresses'][0])
    
    except:
        origin = np.nan
        distance = np.nan
        
    return (distance, mins, origin, dest)

In [6]:
pickup, dropoff = data_rounded.loc[1100, 'l'].split('|')
geocode_result = gmaps.distance_matrix(pickup, dropoff)

geocode_result

{'destination_addresses': ['2565 Ocean Pkwy, Brooklyn, NY 11235, USA'],
 'origin_addresses': ['551 Avenue Y, Brooklyn, NY 11235, USA'],
 'rows': [{'elements': [{'distance': {'text': '0.4 km', 'value': 397},
     'duration': {'text': '2 mins', 'value': 107},
     'status': 'OK'}]}],
 'status': 'OK'}

In [9]:
row_proc(data_rounded.loc[15000, 'l'])

(29.8,
 44.0,
 'Terminal 1, 1 J F K Airport, Jamaica, NY 11430, USA',
 '525 E 13th St, New York, NY 10009, USA')

In [10]:
row_proc(data_rounded.loc[2000000, 'l'])

(2.3,
 8.0,
 '680 Madison Ave, New York, NY 10065, USA',
 '105 E 83rd St, New York, NY 10028, USA')

## Time How Long it Takes for 100 records

Use apply method and record 3 trials.

In [11]:
%%timeit -n 3 -r 1
data_rounded.loc[:99, 'l'].apply(lambda x: row_proc(x))

7.25 s ± 0 ns per loop (mean ± std. dev. of 1 run, 3 loops each)


## Multithreading

In [13]:
import threading
from queue import Queue
import requests
import bs4
from timeit import default_timer as timer

### Time Multithreading with 100 threads and 100 records

This should be faster!

In [14]:
%%timeit -n 3 -r 1

print_lock = threading.Lock()

l_list = list(data_rounded.loc[:99, 'l'])

# Create a new queue
l_queue = Queue()

# List to hold results
run_l = []
distances = []
durations = []
origins = []
dests = []

# Function to take an element from the queue and execute task
def process_queue():
    while True:
        # Get next element from the queue
        current_l = l_queue.get()
        # Record the coordinates
        run_l.append(current_l)
        
        # Run the function
        r = row_proc(current_l)
        
        # Record the results
        distances.append(r[0])
        durations.append(r[1])
        origins.append(r[2])
        dests.append(r[3])
        
        # Signal to the queue that the task is done
        l_queue.task_done()

# Start 100 threads
for i in range(100):
    t = threading.Thread(target = process_queue)
    # Set daemon to be true
    t.daemon = True
    t.start()
    
# Put each element of the list on the queue
for current_l in l_list:
    l_queue.put(current_l)

# Execture the queue
l_queue.join()

end = timer()

1.04 s ± 0 ns per loop (mean ± std. dev. of 1 run, 3 loops each)


A seven time speed-up!

In [15]:
len(data_rounded) * (1 / 100) / 3600

7.759416666666667

Estimated number of hours.

#### Multithreading for real

Method for tracking progress.

In [16]:
import time
for i in range(10):
    time.sleep(1)
    print(f'{100 * (i / 10)}% complete', end = '\r')

90.0% complete

## Apply on test data first

We'll start out using the test data since it's smaller.

In [17]:
test = pd.read_csv('input/test.csv', 
                  usecols = ['key', 'pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 
                             'dropoff_longitude'])
test_rounded = test.round(3).groupby(['pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude'])['key'].\
                            count().reset_index().\
                            drop(columns = 'key')

len(test_rounded)

9870

In [18]:
test_rounded['pickup'] = test_rounded['pickup_latitude'].astype(str) + "," + test_rounded['pickup_longitude'].astype(str)
test_rounded['dropoff'] = test_rounded['dropoff_latitude'].astype(str) + "," + test_rounded['dropoff_longitude'].astype(str)
test_rounded['l'] = test_rounded['pickup'] + "|" + test_rounded['dropoff']
test_rounded.head()

Unnamed: 0,pickup_latitude,pickup_longitude,dropoff_latitude,dropoff_longitude,pickup,dropoff,l
0,40.573,-74.221,40.569,-74.217,"40.573,-74.221","40.569,-74.217","40.573,-74.221|40.569,-74.217"
1,40.583,-74.252,40.588,-74.263,"40.583,-74.252","40.588,-74.263","40.583,-74.252|40.588,-74.263"
2,40.588,-73.974,40.593,-73.973,"40.588,-73.974","40.593,-73.973","40.588,-73.974|40.593,-73.973"
3,40.605,-73.98,40.603,-73.973,"40.605,-73.98","40.603,-73.973","40.605,-73.98|40.603,-73.973"
4,40.623,-73.988,40.623,-73.988,"40.623,-73.988","40.623,-73.988","40.623,-73.988|40.623,-73.988"


In [19]:
row_proc(test_rounded.loc[1, 'l'])

(2.0,
 4.0,
 '443 Federal Blvd, Carteret, NJ 07008, USA',
 '1392 Rahway Ave, Avenel, NJ 07001, USA')

In [20]:
row_proc(test_rounded.loc[1230, 'l'])

(3.8,
 12.0,
 '139 E Houston St, New York, NY 10002, USA',
 '308 Grand St, Brooklyn, NY 11211, USA')

In [21]:
start = timer()

print_lock = threading.Lock()

l_list = list(test_rounded['l'])

# Create a new queue
l_queue = Queue()

# List to hold results
run_l = []
distances = []
durations = []
origins = []
dests = []
TRACKER = 0

# Function to take an element from the queue and execute task
def process_queue():
    while True:
        global TRACKER
        TRACKER += 1
        # Get next element from the queue
        current_l = l_queue.get()
        # Record the coordinates
        run_l.append(current_l)
        
        # Run the function
        r = row_proc(current_l)
        
        # Record the results
        distances.append(r[0])
        durations.append(r[1])
        origins.append(r[2])
        dests.append(r[3])
        
        if TRACKER % 1000 == 0:
            time_per_record = (timer() - start) / TRACKER 
            estimated_time = ((len(test_rounded) - TRACKER) * time_per_record) / 3600
            print(f'{round(100 * (TRACKER / len(test_rounded)), 2)}% complete. Estimated time remaining: {round(estimated_time, 2)} hours.', end = '\r')
            
        # Signal to the queue that the task is done
        l_queue.task_done()

# Start 100 threads
for i in range(100):
    t = threading.Thread(target = process_queue)
    # Set daemon to be true
    t.daemon = True
    t.start()
    
# Put each element of the list on the queue
for current_l in l_list:
    l_queue.put(current_l)

# Execture the queue
l_queue.join()

end = timer()

91.19% complete. Estimated time remaining: 0.0 hours..20.26% complete. Estimated time remaining: 0.02 hours.20.26% complete. Estimated time remaining: 0.02 hours.30.4% complete. Estimated time remaining: 0.02 hours.

In [22]:
print(f'{round(end - start, 2)} seconds elapsed.')

103.08 seconds elapsed.


That was only 10000 records. The real dataset has almost 3,000,000.

## Post-Processing

In [23]:
test_results = pd.DataFrame({'l': run_l, 'distance': distances, 'duration': durations,
                             'origin': origins, 'destination': dests})
test_results.head()

Unnamed: 0,l,distance,duration,origin,destination
0,"40.573,-74.221|40.569,-74.217",2.0,4.0,"443 Federal Blvd, Carteret, NJ 07008, USA","1392 Rahway Ave, Avenel, NJ 07001, USA"
1,"40.583,-74.252|40.588,-74.263",1.0,1.0,"1745 59th St, Brooklyn, NY 11204, USA","1745 59th St, Brooklyn, NY 11204, USA"
2,"40.588,-73.974|40.593,-73.973",1.0,4.0,"101 Abbi Rd, Carteret, NJ 07008, USA","Waterfront Fitness Trail, Carteret, NJ 07008, USA"
3,"40.605,-73.98|40.603,-73.973",0.8,4.0,"729 E 2nd St, Brooklyn, NY 11218, USA","1711 44th St, Brooklyn, NY 11204, USA"
4,"40.623,-73.988|40.623,-73.988",1.1,5.0,"1739a W 7th St, Brooklyn, NY 11223, USA","48 Lake St, Brooklyn, NY 11223, USA"


In [25]:
test_rounded = test_rounded.merge(test_results, on = ['l'], how = 'left')

rounded_cols = []
for l in ['pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude']:
    r = f'{l}-rounded'
    rounded_cols.append(r)
    test[r] = test[l].round(3)
    
test_rounded.columns = rounded_cols + list(test_rounded.columns[4:])
test = test.merge(test_rounded, on = rounded_cols, how = 'left')
test.head()

Unnamed: 0,key,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,pickup_latitude-rounded,pickup_longitude-rounded,dropoff_latitude-rounded,dropoff_longitude-rounded,pickup,dropoff,l,distance,duration,origin,destination
0,2015-01-27 13:08:24.0000002,-73.97332,40.763805,-73.98143,40.743835,40.764,-73.973,40.744,-73.981,"40.764,-73.973","40.744,-73.981","40.764,-73.973|40.744,-73.981",4.5,14.0,"786 5th Ave, New York, NY 10019, USA","3855 30th St, Long Island City, NY 11101, USA"
1,2015-01-27 13:08:24.0000003,-73.986862,40.719383,-73.998886,40.739201,40.719,-73.987,40.739,-73.999,"40.719,-73.987","40.739,-73.999","40.719,-73.987|40.739,-73.999",2.4,13.0,"408b Broadway, New York, NY 10013, USA","37 E 7th St, New York, NY 10003, USA"
2,2011-10-08 11:53:44.0000002,-73.982524,40.75126,-73.979654,40.746139,40.751,-73.983,40.746,-73.98,"40.751,-73.983","40.746,-73.98","40.751,-73.983|40.746,-73.98",2.2,9.0,"469 7th Avenue, 469 Seventh Ave, 7th Floor, Ne...","357 W 16th St, New York, NY 10011, USA"
3,2012-12-01 21:12:12.0000002,-73.98116,40.767807,-73.990448,40.751635,40.768,-73.981,40.752,-73.99,"40.768,-73.981","40.752,-73.99","40.768,-73.981|40.752,-73.99",4.8,21.0,"675 Park Ave, Brooklyn, NY 11206, USA","31 W 19th St, New York, NY 10011, USA"
4,2012-12-01 21:12:12.0000003,-73.966046,40.789775,-73.988565,40.744427,40.79,-73.966,40.744,-73.989,"40.79,-73.966","40.744,-73.989","40.79,-73.966|40.744,-73.989",7.4,15.0,"101 W 96th St, New York, NY 10025, USA","659 12th Ave, New York, NY 10011, USA"


In [26]:
features_to_keep = ['key', 'distance', 'duration', 'origin', 'destination']

test[features_to_keep].to_csv('input/test_rounded_distances-3digits.csv', index = False)

# Repeat with Entire Dataset

In [None]:
start = timer()

print_lock = threading.Lock()

l_list = list(data_rounded['l'])

# Create a new queue
l_queue = Queue()

# List to hold results
run_l = []
distances = []
durations = []
origins = []
dests = []
TRACKER = 0

# Function to take an element from the queue and execute task
def process_queue():
    while True:
        global TRACKER
        TRACKER += 1
        # Get next element from the queue
        current_l = l_queue.get()
        # Record the coordinates
        run_l.append(current_l)
        
        # Run the function
        r = row_proc(current_l)
        
        # Record the results
        distances.append(r[0])
        durations.append(r[1])
        origins.append(r[2])
        dests.append(r[3])
        
        if TRACKER % 100000 == 0:
            time_per_record = (timer() - start) / TRACKER 
            estimated_time = ((len(data_rounded) - TRACKER) * time_per_record) / 3600
            print(f'{round(100 * (TRACKER / len(data_rounded)), 2)}% complete. Estimated time remaining: {round(estimated_time, 2)} hours.', end = '\r')
            
        # Signal to the queue that the task is done
        l_queue.task_done()

# Start 100 threads
for i in range(100):
    t = threading.Thread(target = process_queue)
    # Set daemon to be true
    t.daemon = True
    t.start()
    
# Put each element of the list on the queue
for current_l in l_list:
    l_queue.put(current_l)

# Execture the queue
l_queue.join()

end = timer()

In [None]:
print(f'{round(end - start, 2)} seconds elapsed.')

In [None]:
data_results = pd.DataFrame({'l': run_l, 'distance': distances, 'duration': durations,
                             'origin': origins, 'destination': dests})

data_rounded = data_rounded.merge(data_results, on = ['l'], how = 'left')

rounded_cols = []
for l in ['pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude']:
    r = f'{l}-rounded'
    rounded_cols.append(r)
    data[r] = data[l].round(3)
    
data_rounded.columns = rounded_cols + list(data_rounded.columns[4:])
data = data.merge(data_rounded, on = rounded_cols, how = 'left')

In [None]:
features_to_keep = ['key', 'distance', 'duration', 'origin', 'destination']

data[features_to_keep].to_csv('input/rounded_distances-3digits.csv', index = False)