# Exploring the New York City Taxi Data with Arkouda + Pandas/NumPy

This notebook shows some examples of how to interoperate between Pandas and Arkouda at a small scale on a few-GB workstation. This same notebook would run with a multi-node Arkouda instance on an HPC with TB of data.

Arkouda is not trying to replace Pandas but to allow for some Pandas-style operation at a much larger scale. In our experience Pandas can handle dataframes up to about **500 million rows** on a sufficently capable compute server before performance becomes a real issue. Arkouda breaks the shared memory paradigm and scales its operations to distributed dataframes with **hundreds of billions of rows**, maybe even a trillion. In practice we have run Arkouda server operations on columns of one trillion elements running on 512 compute nodes. This yielded a **>20TB dataframe** in Arkouda.

**Outline**
- Data Preparation
  - Get Data
  - Convert Data
  - Load Data
- Data Exploration
  - Summarization
  - Histograms
  - Logical Indexing/Filtering
  - Time Data
  - Lookup Tables
  - GroupBy-Aggregate
  - Broadcast
  - Integrate with Pandas

# Data Preparation

## Download New York City Taxi Data
----------------------------------
[Yellow Trips Data Dictionary](https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf)

[NYC Yellow Taxi Trip Records Jan 2020](https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2020-01.csv)

[Green Trips Data Dictionary](https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_green.pdf)

[NYC Green  Taxi Trip Records Jan 2020](https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2020-01.csv)

[NYC Taxi Zone Lookup Table](https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv)

[NYC Taxi Zone Shapefile](https://s3.amazonaws.com/nyc-tlc/misc/taxi_zones.zip)

## Convert and Load Data

2 methods:
* Load in python (e.g. with pandas) and transfer to arkouda
  * Good for prototyping with small data
* Convert to HDF5 (in separate process) and read directly with arkouda
  * Best for large datasets
  
#### Dtypes Supported in HDF5
Arkouda can only read these HDF5 dtypes
* int (any width) -> int64
* float (any width) -> float64
* custom string format -> ak.Strings

#### Additional Dtypes in Arkouda
Can cast/convert to these after loading raw data
* bool
* Datetime (from int64)
* Timedelta (from int64)

#### Prefer Integers!
They are fast and versatile (usable with GroupBy, Datetime, Timedelta, bit ops, etc.)

### Describe Data Format

In [None]:
!head /home/reusters/data/green_tripdata_2020-01.csv

In [None]:
%%file NYCTaxi_format.py

import numpy as np

OPTIONS = {}

def YNint(yn):
    return (0, 1)[yn.upper() in 'YES']

def nullint(x):
    try:
        return np.int64(x)
    except:
        return np.int64(-1)

yellow_format = {'sep': ',',
                 'header': 0,
                 'parse_dates':['tpep_dropoff_datetime', 'tpep_pickup_datetime'],
                 'infer_datetime_format': True,
                 'converters': {'store_and_fwd_flag': YNint,
                                'VendorID': nullint,
                                'RatecodeID': nullint,
                                'PULocationID': nullint,
                                'DOLocationID': nullint,
                                'passenger_count': nullint,
                                'payment_type': nullint,
                                'trip_type': nullint}}

OPTIONS['yellow'] = yellow_format

green_format = yellow_format.copy()
green_format['parse_dates'] = ['lpep_dropoff_datetime', 'lpep_pickup_datetime']
OPTIONS['green'] = green_format

### Method 1: CSV --> Pandas --> Arkouda

In [None]:
import pandas as pd
import NYCTaxi_format as taxi
import arkouda as ak
ak.connect(connect_url="tcp://localhost:5555")

In [None]:
pdgreen = pd.read_csv('/home/reusters/data/green_tripdata_2020-01.csv', **taxi.OPTIONS['green'])

In [None]:
# transfer columns of DataFrame to arkouda
def ak_create_akdict_from_df(df):
    akdict = {}
    for cname in df.keys():
        if df[cname].dtype.name == 'object':
            akdict[cname] = ak.from_series(df[cname],dtype=np.str)
        else:
            akdict[cname] = ak.from_series(df[cname])

    return akdict

In [None]:
green_from_pandas = ak_create_akdict_from_df(pdgreen)

### Method 2: CSV --> HDF5 --> Arkouda

Arkouda comes with a CSV to HDF5 converter in the repo. It uses the NYCTaxi_format.py file we defined above.

In [None]:
!python3 /home/reusters/arkouda/converter/csv2hdf.py \
--formats-file=/home/reusters/ArkoudaNotebooks/NYCTaxi_format.py \
--format=green \
--outdir=/home/reusters/data/ \
/home/reusters/data/green_tripdata_2020-01.csv

In [None]:
# Can skip this if already connected above
import arkouda as ak
ak.connect(connect_url="tcp://localhost:5555")

In [None]:
green_from_HDF5 = ak.read_all('/home/reusters/data/green_tripdata_2020-01.hdf')

### Same Result from Both Methods

In [None]:
def frames_are_equal(a, b):
    # Ensure same columns
    if a.keys() != b.keys():
        return False
    # Ensure same column dtypes
    if not all(a[k].dtype == b[k].dtype for k in a):
        return False
    # Compare column values
    for k in a:
        # Workaround until ak.isna() is implemented
        # Because nan != nan
        if a[k].dtype == ak.float64:
            cmp = ak.cast(a[k], 'int64') == ak.cast(b[k], 'int64')
        else:
            cmp = a[k] == b[k]
        if not cmp.all():
            return False
    return True

In [None]:
frames_are_equal(green_from_HDF5, green_from_pandas)

In [None]:
# Proceed with just one, since they are equivalent
data = green_from_HDF5

### Convert Columns to Specialized Dtypes

In [None]:
data['lpep_pickup_datetime'] = ak.Datetime(data['lpep_pickup_datetime'])
data['lpep_dropoff_datetime'] = ak.Datetime(data['lpep_dropoff_datetime'])
data['store_and_fwd_flag'] = (data['store_and_fwd_flag'] == 1)

In [None]:
data

In [None]:
rows = data['VendorID'].size
numbytes = sum(v.size*v.itemsize for v in data.values())

In [None]:
print(f'{rows:,} rows\n{numbytes:,} bytes')

# Exploration

## Descriptive Statistics

In [None]:
def describe(x):
    fmt = 'mean: {}\nstd : {}\nmin : {}\nmax : {}'
    if x.dtype == ak.float64:
        fmt = fmt.format(*['{:.2f}' for _ in range(4)])
    print(fmt.format(x.mean(), x.std(), x.min(), x.max()))

In [None]:
describe(data['fare_amount'])

## Histograms

In [None]:
import numpy as np
from matplotlib import pyplot as plt

def hist(x, bins, log=True):
    assert bins > 0
    # Compute histogram counts in arkouda
    h = ak.histogram(x, bins)
    # Compute bins in numpy
    if isinstance(x, ak.Datetime):
        # Matplotlib has trouble plotting np.datetime64 and np.timedelta64
        bins = ak.date_range(x.min(), x.max(), periods=bins).to_ndarray().astype('int')
    elif isinstance(x, ak.Timedelta):
        bins = ak.timedelta_range(x.min(), x.max(), periods=bins).to_ndarray().astype('int')
    else:
        bins = np.linspace(x.min(), x.max(), bins+1)[:-1]
    # Bring h over to numpy for plotting
    plt.bar(bins, h.to_ndarray(), width=bins[1]-bins[0])
    if log:
        plt.yscale('log')

In [None]:
hist(data['fare_amount'], 100)

## Logical Indexing (Filters)
Find non-negative fares

In [None]:
nonneg = data['fare_amount'] >= 0
print(f'{nonneg.sum() / nonneg.size :.1%} of fares are non-negative')

Select only non-negative fares for computation

In [None]:
describe(data['fare_amount'][nonneg])

Make new data dict with only non-negative fares

In [None]:
data_nonneg = {k:v[nonneg] for k, v in data.items()}

In [None]:
data_nonneg

## Time Data

In [None]:
data['ride_duration'] = data['lpep_dropoff_datetime'] - data['lpep_pickup_datetime']

In [None]:
data['ride_duration']

In [None]:
data['ride_duration'].min(), data['ride_duration'].max()

In [None]:
hist(data['ride_duration'], 100)

## Taxi Zone Lookup Table

### Use Method 1: CSV --> Pandas --> Arkouda

In [None]:
def cvt_to_string(v):
    try:
        if v == '':
            return 'N/A'
        else:
            return str(v)
    except:
        return 'N/A'

# read the taxi-zone-lookup-table
cvt = {'Borough':cvt_to_string, 'Zone':cvt_to_string, 'service_zone':cvt_to_string}
tzlut = pd.read_csv("/home/reusters/data/taxi+_zone_lookup.csv",converters=cvt)

# location id is 1-based, index is 0-based
# fix it up to be aligned with index in data frame
# which means add row zero
top_row = pd.DataFrame({'LocationID': [0], 'Borough': ['N/A'], 'Zone': ['N/A'], 'service_zone': ['N/A']})
tzlut = pd.concat([top_row, tzlut]).reset_index(drop = True)

In [None]:
tzlut

### Convert dataframe to dictionary of Arkouda arrays

In [None]:
# convert data frame with strings and int64 data
aktzlut = ak_create_akdict_from_df(tzlut)

In [None]:
aktzlut

### Apply Lookup Table

In [None]:
(aktzlut['LocationID'] == ak.arange(aktzlut['LocationID'].size)).all()

In [None]:
data['PUBorough'] = aktzlut['Borough'][data['PULocationID']]
data['DOBorough'] = aktzlut['Borough'][data['DOLocationID']]

In [None]:
data['PUZone'] = aktzlut['Zone'][data['PULocationID']]
data['DOZone'] = aktzlut['Zone'][data['DOLocationID']]

In [None]:
data

## GroupBy: Construct a Graph

Directed graph from PULocationID --> DOLocationID

In [None]:
byloc = ak.GroupBy([data['PULocationID'], data['DOLocationID']])

In [None]:
byloc.unique_keys

Edge weight is number of rides

Aggregation methods of `GroupBy` return tuple of (unique_keys, aggregate_values)

In [None]:
(u, v), w = byloc.count()

In [None]:
u, v, w

## Broadcast: Find Rides with Anomalous Fares

Compute mean and std of fare by (pickup, dropoff)

In [None]:
_, mf = byloc.mean(data['fare_amount'])

In [None]:
sf = (byloc.sum(data['fare_amount']**2)[1] / w) - mf**2

Broadcast group values back to ride dataframe to compute z-scores of rides

In [None]:
data['fare_mean'] = byloc.broadcast(mf, permute=True)
data['fare_std'] = byloc.broadcast(sf, permute=True)

In [None]:
data['fare_z'] = (data['fare_amount'] - data['fare_mean']) / (data['fare_std'] + 1)

In [None]:
hist(data['fare_z'], 100)

## Bring Small Result Set Back to Pandas

In [None]:
exorbitant = (data['fare_z'] > 5)
exdf = pd.DataFrame({k: v[exorbitant].to_ndarray() for k, v in data.items()})

In [None]:
exdf

In [None]:
worst = data['fare_z'].argmax()
{k:v[worst] for k, v in data.items()}

## Disconnect from the server or shutdown the server

In [None]:
# disconnect or shutdown the server
#ak.disconnect()
#ak.shutdown()