# Analyzing a large dataset on the IP2I SLURM Farm

## The New York City taxi dataset

In [104]:
import pprint
import pathlib

taxis = pathlib.Path("/gridgroup/cms/cbernet/test_data/nyc_taxis")

t2011 = sorted(list((taxis / "2011").glob("*.csv")))
pprint.pprint(t2011)

[PosixPath('/gridgroup/cms/cbernet/test_data/nyc_taxis/2011/yellow_tripdata_2011-01.csv'),
 PosixPath('/gridgroup/cms/cbernet/test_data/nyc_taxis/2011/yellow_tripdata_2011-02.csv'),
 PosixPath('/gridgroup/cms/cbernet/test_data/nyc_taxis/2011/yellow_tripdata_2011-03.csv'),
 PosixPath('/gridgroup/cms/cbernet/test_data/nyc_taxis/2011/yellow_tripdata_2011-04.csv'),
 PosixPath('/gridgroup/cms/cbernet/test_data/nyc_taxis/2011/yellow_tripdata_2011-05.csv'),
 PosixPath('/gridgroup/cms/cbernet/test_data/nyc_taxis/2011/yellow_tripdata_2011-06.csv'),
 PosixPath('/gridgroup/cms/cbernet/test_data/nyc_taxis/2011/yellow_tripdata_2011-07.csv'),
 PosixPath('/gridgroup/cms/cbernet/test_data/nyc_taxis/2011/yellow_tripdata_2011-08.csv'),
 PosixPath('/gridgroup/cms/cbernet/test_data/nyc_taxis/2011/yellow_tripdata_2011-09.csv'),
 PosixPath('/gridgroup/cms/cbernet/test_data/nyc_taxis/2011/yellow_tripdata_2011-10.csv'),
 PosixPath('/gridgroup/cms/cbernet/test_data/nyc_taxis/2011/yellow_tripdata_2011-11.csv'),

In [99]:
[f.stat().st_size / 1024**3 for f in t2011]

[2.3046080265194178,
 2.4255120931193233,
 2.746128797531128,
 2.5176544673740864,
 2.6636097356677055,
 2.5855863811448216,
 2.522386613301933,
 2.2658982882276177,
 2.497904699295759,
 2.6866614799946547,
 2.4851463064551353,
 2.5523328203707933]

## Set up the dask cluster

In [9]:
from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(
    # queue='normal',
    project="cms",
    cores=8,
    memory="64 GB", 
    walltime="02:00:00",
)

  from distributed.utils import tmpfile


In [10]:
cluster.scale(jobs=8) 

In [11]:
from dask.distributed import Client
client = Client(cluster)

In [21]:
cluster.dashboard_link

'http://134.158.83.177:8787/status'

We now have a cluster with 8x8=64 workers, and in total 512 GB of RAM. This is enough to hold the entire taxi dataset in the cluster memory. 

## Dask dataframe : A first look at the data

In [16]:
import dask
import dask.dataframe as dd

In [100]:
df = dd.read_csv(
    taxis / "2011/*.csv", 
    dtype={"tip_amount": "float64", "tolls_amount": "float64"}   # data not clean, need to cast type
)

Let's have a look at the first rows of the dataframe. This is a fast operation, as there is no need to process the full dataframe to get these rows: 

In [101]:
df.head()

Unnamed: 0,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount
0,CMT,2011-01-29 02:38:35,2011-01-29 02:47:07,1,1.2,-74.005254,40.729084,1,N,-73.988697,40.727127,CSH,6.1,0.5,0.5,0.0,0.0,7.1
1,CMT,2011-01-28 10:38:19,2011-01-28 10:42:18,1,0.4,-73.968585,40.759171,1,N,-73.964336,40.764665,CSH,4.1,0.0,0.5,0.0,0.0,4.6
2,CMT,2011-01-28 23:49:58,2011-01-28 23:57:44,3,1.2,-73.98071,40.74239,1,N,-73.987028,40.729532,CSH,6.1,0.5,0.5,0.0,0.0,7.1
3,CMT,2011-01-28 23:52:09,2011-01-28 23:59:21,3,0.8,-73.993773,40.747329,1,N,-73.991378,40.75005,CSH,5.3,0.5,0.5,0.0,0.0,6.3
4,CMT,2011-01-28 10:34:39,2011-01-28 11:25:50,1,5.3,-73.991475,40.749936,1,N,-73.950237,40.775626,CSH,25.3,0.0,0.5,0.0,0.0,25.8


Now, we check the dataframe shape

In [102]:
df.shape

(Delayed('int-ea9ff329-eb17-4ddd-bd53-205b61e5ffe6'), 18)

The last number is the number of columns. 

The first number is the number of rows. To get it, we need to scan the entire dataframe. So let's compute this number on the cluster (execute the following cell and go check your dashboard :)

In [105]:
n_trips = df.shape[0].compute()
n_trips 

176897199

There are 176 million taxi trips in our dataset! 

This is a large text dataset, that cannot be processed easily on a single machine. That's why we use a cluster. 

For first tests, a good practice is to select a sample of this dataset that can fit in the memory of the local machine. Here, we sample with a probability of `1e-5` to get about 2000 taxi trips (check your dashboard :)

In [106]:
sample = df.sample(frac=1e-5)
type(sample)

dask.dataframe.core.DataFrame

Before compute, we have a small dask dataframe. We compute it to turn it into a pandas dataframe:

In [107]:
sample = sample.compute()
print(type(sample))
print(sample.shape)

<class 'pandas.core.frame.DataFrame'>
(1553, 18)


Now, we can use our small sample directly, e.g.: 

In [109]:
sample["tip_amount"].describe()

count    1553.000000
mean        1.053786
std         2.092455
min         0.000000
25%         0.000000
50%         0.000000
75%         1.690000
max        35.000000
Name: tip_amount, dtype: float64

In [110]:
sample.describe()

Unnamed: 0,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code,dropoff_longitude,dropoff_latitude,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount
count,1553.0,1553.0,1553.0,1553.0,1553.0,1553.0,1553.0,1553.0,1553.0,1553.0,1553.0,1553.0,1553.0
mean,1.660013,2.900006,-72.069105,39.700621,1.037991,-72.164291,39.729615,10.461687,0.313587,0.498712,1.053786,0.210109,12.537882
std,1.269814,3.432024,11.722058,6.457388,0.292473,11.43285,6.360309,8.335672,0.354266,0.025351,2.092455,1.03726,10.206442
min,0.0,0.0,-74.041432,-0.017883,1.0,-75.366285,-0.017875,2.5,0.0,0.0,0.0,0.0,3.0
25%,1.0,1.06,-73.992072,40.733898,1.0,-73.991528,40.733035,5.7,0.0,0.5,0.0,0.0,7.1
50%,1.0,1.78,-73.981368,40.752512,1.0,-73.9807,40.752124,8.1,0.0,0.5,0.0,0.0,9.5
75%,2.0,3.3,-73.96611,40.76672,1.0,-73.96313,40.767195,11.7,0.5,0.5,1.69,0.0,13.6
max,6.0,31.99,0.014427,40.940988,5.0,0.014425,40.934107,83.0,1.0,0.5,35.0,12.8,116.8


## Parallel analysis : Probability to get a tip

Let's compute the probability to get a tip. 

First, we design our analysis on our sample. It's easy:

In [111]:
(sample["tip_amount"]>0).sum() / sample.shape[0]

0.42949130714745654

We can also write a small function to do this: 

In [112]:
def tip_prob(df): 
    n_tips = (df["tip_amount"]>0).sum()
    n_trips = df.shape[0]
    return n_tips/n_trips

In [113]:
tip_prob(sample)

0.42949130714745654

To run the computation on the whole dataset, we just pass the full dask dataframe to the function: 

In [114]:
result = tip_prob(df)
result

Delayed('_inner-fbea2e584cc52b1243d6c8cbf5cc07ce')

In [116]:
result.compute()

0.41981193269204903

Terminate your cluster: 

In [124]:
cluster.close()

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7fda2cdddf70>>, <Task finished name='Task-4711952' coro=<Cluster._sync_cluster_info() done, defined at /home/cms/cbernet/miniconda3/envs/dask/lib/python3.9/site-packages/distributed/deploy/cluster.py:104> exception=OSError('Timed out trying to connect to tcp://134.158.83.177:35087 after 30 s')>)
Traceback (most recent call last):
  File "/home/cms/cbernet/miniconda3/envs/dask/lib/python3.9/site-packages/distributed/comm/tcp.py", line 398, in connect
    stream = await self.client.connect(
  File "/home/cms/cbernet/miniconda3/envs/dask/lib/python3.9/site-packages/tornado/tcpclient.py", line 275, in connect
    af, addr, stream = await connector.start(connect_timeout=timeout)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/cms

## Conclusion and outlook

At IP2I, we're probably not going to analyze text files very often.

But this can happen. For example, Denis could use Dask to analyse disk space usage on gridgroup or on the storage element. Dask is also well suited to the analysis of JSON data.

And (dask) dataframes are not limited to text data. For example, they can be used to analyse columnar binary data, or images. 

A few inspirational links: 

* [Dask-ML](https://ml.dask.org/): Dask-ML provides scalable machine learning in Python using Dask alongside popular machine learning libraries like Scikit-Learn, XGBoost, and others.
* [Dask-Image](https://examples.dask.org/applications/image-processing.html)
* [Xarray](https://docs.xarray.dev/en/stable/gallery.html): Wraps Dask Array, offering the same scalability, but with axis labels which add convenience when dealing with complex datasets. A good way to deal with very large images
* [Datashader](https://datashader.org/): plot images that are too large to fit in memory. Can be used in conjunction with dask.
* [The Dask ecosystem](https://docs.dask.org/en/latest/ecosystem.html): the full dask ecosystem


