## Overview

In this tutorial, we will be looking at how to launch and connect to an Arkouda server and what that means, before exploring some of the Arkouda API and trying out some data exploration on the NYC taxi cab green data.

1. Launching and connecting to an Arkouda server
2. Read Parquet taxi cab data
3. Data exploration
    - descriptive statistics
    - histogram
    - logical indexing
4. Create a lookup table and DataFrame
     - GroupBy
     - Broadcast
5. Working with NumPy/Pandas

## Download data
https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2024-01.parquet - green taxi cab Parquet

https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv - lookup table CSV

## Installing Arkouda

Installation intructions can be found at: 

And Docker images can be found at: https://github.com/Bears-R-Us/arkouda-contrib/tree/main/arkouda-docker

## Connect to the Arkouda server

When using Arkouda, a Python client connects to an Arkouda server that is running in a separate process. This Arkouda server is a parallel, distributed Chapel application that stores arrays, executes commands, and communicate a human-understandable response back to the client.

In order to connect to an Arkouda server, you must ensure one has been started by:
1. Launch an Arkouda server: `./arkouda_server -nl <number-of-locales>`
2. Import Arkouda in your Python client: `import arkouda as ak`
3. Connect to the server, passing the hostname and portnumber of the server: `ak.connect(<hostname>, <port>)`
    - This server can be running on your local machine, a supercomputer, the cloud, or anywhere! 

In [None]:
import arkouda as ak
ak.connect('localhost', 5555) 

In [None]:
import pandas as pd
import numpy as np
import math
import matplotlib.pyplot as plt
import gc

## Read Parquet Taxi Cab Data

Today, Arkouda supports a couple of staple file formats:
- Parquet: data science columnar file format
- HDF5: HPC file format designed for large data sets
- CSV: Standard file format for small data sets

The NYC Taxi cab data is given in Parquet format, a file format readable by Arkouda.

When possible, it is recommended to use Parquet or HDF5 for IO in Arkouda, since the performance with those formats are much better than CSV.

When writing files in Arkouda, one file per locale is written to disk. For example, if you are writing a large dataset that is distributed across a 16-locale server, 16 files will be written, 1 per locale, each named with the pattern `<filename-prefix>_LOCALE####`.

There are currently efforts to add support for additional file formats, such as Zarr and NetCDF, as well as efforts to improve the performance of existing formats.

In [None]:
columns = ['VendorID',
           'lpep_pickup_datetime',
           'lpep_dropoff_datetime',
           'fare_amount',
           'PULocationID',
           'DOLocationID']
data = ak.read('./green_tripdata_2024-01.parquet', columns)

In [None]:
data

In [None]:
data = ak.DataFrame(data)
data

In [None]:
rows = data['VendorID'].size
numbytes = sum(v.size*v.itemsize for v in data.values())

In [None]:
print(f'{rows:,} rows\n{numbytes:,} bytes')

## Data Exploration

Arrays in Arkouda are called `pdarray`s. When you create a `pdarray`, your Python client essentially stores a reference to the data that is actually stored in the parallel, distributed object store of the Arkouda server. What this means is that calls and queries operating on the pdarray will be sent to the Arkouda server, where they are executed in parallel and then returned in a human-readable way, without ever having to store the data in the memory of the machine where the Python client is running.

Arkouda's interface is based off of the NumPy/Pandas APIs so as to provide an interface that doesn't require a steep learning curve for data scientists who are used to traditional single-locale tools.

Today, Arkouda supports a subset of the NumPy/Pandas operations as well as some specific functions that come in handy when working with distributed data.

Some key supported features:
- `groupby`
- `argsort`
- `DataFrame`
- `Random`
- set operations
- ...

More information on the Arkouda API can be found at: https://bears-r-us.github.io/arkouda/

### Descriptive statistics

Supports standard NumPy functionality...

These statistics functions are methods on the Arkouda `pdarray` type, so that means they will send a command to the Arkouda server to handle the operaton in a parallel, distributed fashion.

In [None]:
def describe(x):
    fmt = 'mean: {}\nstd: {}\nmin: {}\nmax: {}'
    if x.dtype == ak.float64:
        fmt = fmt.format(*['{:.2f}' for _ in range(4)])
    print(fmt.format(x.mean(), x.std(), x.min(), x.max()))

In [None]:
describe(data['fare_amount'])

#### Histogram

Arkouda supports transferring of distribtued arrays back to the client through ZMQ (messaging layer, similar to TCP sockets) and converting them to NumPy ndarrays. 

This can be useful for taking a portion of an Arkouda array to operate on at a smaller scale and interoparate with existing Python tools, like PySpark. In this example, a single column of our Arkouda array is converted to a NumPy array in order to work with MatPlotLib.

In [None]:
import numpy as np
from matplotlib import pyplot as plt

def hist(x, bins, log=True):
    assert bins > 0
    h, bins = ak.histogram(x, bins)
    plt.bar(bins[:-1].to_ndarray(), h.to_ndarray(), width=bins[1]-bins[0])
    if log:
        plt.yscale('log')

In [None]:
hist(data['fare_amount'], 100)

#### Logical Indexing (Filters)

When creating our filter variable, `nonneg`, this will actually be a `pdarray`, meaning that the data is still being stored in the distributed object store of the Arkouda server and not the Python client. Remember, the Python client only has references to `pdarray`s, not the actual data.

With the Arkouda paradigm, writing code with pdarrays looks like any other Python code, but is leveraging the parallel, distributed server. This enables scientists who are not familiar with parallel and distributed concepts to write code that scales and performs well.

In [None]:
data[data['fare_amount'] > 1000]

In [None]:
non_neg = data['fare_amount'] >= 0
print(f'{non_neg.sum() / non_neg.size : .1%} of fares are non-negative')

In [None]:
describe(data['fare_amount'][non_neg])

In [None]:
data = ak.DataFrame({k:v[non_neg] for k, v in data.items()})

In [None]:
data

In [None]:
data['lpep_dropoff_datetime'] - data['lpep_pickup_datetime']

In [None]:
describe(data['lpep_dropoff_datetime'] - data['lpep_pickup_datetime'])

# Taxi Zone Lookup Table

### Use method 1: CSV -> Pandas -> Arkouda
Arkouda supports working with Pandas DataFrames, which can be used to augment large Arkouda pdarrays or DataFrames, or converted to Arkouda pdarrays to get better performance.

In [None]:
import pandas as pd

def cvt_to_string(v):
    try:
        if v == '':
            return 'N/A'
        else:
            return str(v)
    except:
        return 'N/A'

cvt = {'Borough':cvt_to_string, 'Zone':cvt_to_string, 'service zone':cvt_to_string}
tzlut = pd.read_csv('/Users/ben.mcdonald/data/taxi_zone_lookup.csv', converters=cvt)

top_row = pd.DataFrame({'LocationID': [0], 'Borough': ['N/A'], 'service_zone':['N/A']})
tzlut = pd.concat([top_row, tzlut]).reset_index(drop=True)

In [None]:
tzlut

#### Convert Pandas DF to Arkouda DF

In [None]:
def ak_create_from_df(df):
    akdict = {}
    for cname in df.keys():
        if df[cname].dtype.name == 'object':
            akdict[cname] = ak.from_series(df[cname],dtype=str)
        else:
            akdict[cname] = ak.from_series(df[cname])
    return ak.DataFrame(akdict)

In [None]:
aktzlut = ak_create_from_df(tzlut)

In [None]:
aktzlut

#### Apply Lookup Table

After ensuring that our array is zero-up indexed, we can enhance our Arkouda dictionary by broadcasting values.

In [None]:
(aktzlut['LocationID'] == ak.arange(aktzlut['LocationID'].size)).all()

In [None]:
data['PUBorough'] = aktzlut['Borough'][data['PULocationID']]
data['DOBorough'] = aktzlut['Borough'][data['DOLocationID']]

In [None]:
data['PUZone'] = aktzlut['Zone'][data['PULocationID']]
data['DOZone'] = aktzlut['Zone'][data['DOLocationID']]

In [None]:
data

In [None]:
data[data['fare_amount'] > 1000]

#### GroupBy: Construct a Graph
Define graph from PULocationID -> DOLocationID

The bread and butter of what makes Arkouda valuable is it's sort (and hence GroupBy). If large sorts are a bottleneck, Arkouda is likely a great option! 

The GroupBy functionality in Arkouda has spawned off another project from NJIT (Arachne) that can be thought of as the "NetworkX of supercompters". Arachne provides interactive large-scale graph algorithms with the Python frontend provided by Arkouda.

In [None]:
byloc = ak.GroupBy([data['PULocationID'], data['DOLocationID']])

In [None]:
byloc.unique_keys

Edge weight is the number of rides from our grouping (pickup location to drooff location)

In [None]:
(u, v), w = byloc.size()

In [None]:
u, v, w

#### Broadcast: Find Rides with Anomalous Fares
Compute mean and stddev of fare by (pickup, dropoff)

In [None]:
_, mf = byloc.mean(data['fare_amount'])

In [None]:
sf = (byloc.sum(data['fare_amount']**2)[1] / w) - mf**2

Broadcast group values back to ride dataframe to compute z-score of rides

In [None]:
data['fare_mean'] = byloc.broadcast(mf, permute=True)
data['fare_std'] = byloc.broadcast(sf, permute=True)

In [None]:
data

In [None]:
data['fare_z'] = (data['fare_amount'] - data['fare_mean']) / (data['fare_std'] + 1)

In [None]:
hist(data['fare_z'], 100)

#### Bring Small Result Set Back to Pandas
Now, we've explored our data, we've discovered insights, and now we know what we need to look at: rides that are extraordinarily expensive! 

To work with that how you would any other Python code, the pdarray can be converted from the server side as a pdarray to the client side as a NumPy ndarray or Pandas DataFrame.

In [None]:
exorbitant = (data['fare_z'] > 2)
exdf = pd.DataFrame({k: v[exorbitant].to_ndarray() for k, v in data.items()})

In [None]:
exdf.head(5)

In [None]:
worst = data['fare_z'].argmax()
{k:v[worst] for k, v in data.items()}

### Disconnect From or Shutdown Server

In [None]:
# ak.disconnect()
#ak.shutdown()