# Create descriptive statistics for NYC Yellow Cab data set

In this notebook, 

## Launch a cluster

The first step is to spin up a Dask Cluster. In Coiled, this is done by creating a `coiled.Cluster` instance, there are [several keyword arguments](https://docs.coiled.io/user_guide/api.html#coiled.Cluster) you can use to specify the details of your cluster further. Please read the [cluster creation documentation](https://docs.coiled.io/user_guide/cluster_creation.html) to know more.

Note that we will give a name to this cluster, if you don't specify this keyword argument, clusters will be given a unique randomly generated name.

In [1]:
import coiled

cluster = coiled.Cluster(name="taxi-analysis", n_workers=10)

Output()

Found software environment build




Once a cluster has been created (you can see the status on your [Coiled dashboard](https://cloud.coiled.io/)), you can connect Dask to the cluster by creating a `distributed.Client` instance.

In [2]:
from dask.distributed import Client

client = Client(cluster)
client


+---------+---------------+---------------+---------------+
| Package | client        | scheduler     | workers       |
+---------+---------------+---------------+---------------+
| blosc   | None          | 1.10.2        | 1.10.2        |
| lz4     | None          | 3.1.3         | 3.1.3         |
| python  | 3.9.0.final.0 | 3.9.4.final.0 | 3.9.4.final.0 |
+---------+---------------+---------------+---------------+


0,1
Client  Scheduler: tls://ec2-3-237-186-89.compute-1.amazonaws.com:8786  Dashboard: http://ec2-3-237-186-89.compute-1.amazonaws.com:8787,Cluster  Workers: 9  Cores: 18  Memory: 72.00 GiB


## Analyze data in the cloud

Now that we have our cluster running and Dask connected to it, let's run a computation. This example will run the computation on about 84 million rows.

In [7]:
import dask.dataframe as dd

taxi_full = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv",
    dtype={
        "payment_type": "UInt8",
        "VendorID": "UInt8",
        "passenger_count": "UInt8",
        "RatecodeID": "UInt8",
    },
    storage_options={"anon": True},
    blocksize="16 MiB",
).persist()

In [13]:
print(taxi_full.tip_amount.mean().compute())
print(taxi_full.trip_distance.mean().compute())
print(taxi_full.fare_amount.mean().compute())
print(len(taxi_full))

2.195064126989438
3.0009277213281353
13.3439912007745
84399019


In [14]:
taxi_full_wtips = taxi_full[taxi_full['tip_amount']>0]

In [15]:
len(taxi_full_wtips)/len(taxi_full)

0.6899768941627153

In [17]:
print(taxi_full_wtips.tip_amount.mean().compute(), taxi_full_wtips.tip_amount.std().compute())
print(taxi_full_wtips.trip_distance.mean().compute())
print(taxi_full_wtips.fare_amount.mean().compute())
print(len(taxi_full_wtips))

3.1814971796327165 18.76555968483491
2.9988582169197033
13.294814358769841
58233373



+---------+---------------+---------------+---------------+
| Package | client        | scheduler     | workers       |
+---------+---------------+---------------+---------------+
| blosc   | None          | 1.10.2        | 1.10.2        |
| lz4     | None          | 3.1.3         | 3.1.3         |
| python  | 3.9.0.final.0 | 3.9.4.final.0 | 3.9.4.final.0 |
+---------+---------------+---------------+---------------+
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError


In [None]:
coiled.delete_cluster(name="taxi-analysis")
client.close()