# NYC Taxi Dataset - DataFrames Analysis from S3

In this notebook, we will walk through an example of using BanyanDataFrames to
perform some data analysis on a NYC Taxi Dataset in S3 containing a few GB
of data. This dataset is taken from the TLC Trip Record Data from nyc.gov.

Run the following cell once, to instantiatiate the Julia environment and install
the packages required for this notebook.

In [None]:
# Install packages
using Pkg
Pkg.instantiate()

## Configuring

We will use Banyan to perform some data analysis on the Iris Dataset. To run this
example, please ensure that you have set up your Banyan account.

Run the first cell below to import `Banyan` and `BanyanDataFrames`.
To configure your AWS credentials, run the second cell below and provide your
AWS credentials when prompted. Banyan does not save your AWS credentials, but
they are needed so that you can run your computation in your AWS account.
Finally, run the third cell below to set your Banyan credentials and configure
Banyan.

You must pass your User ID and API Key to the `configure` function in order
to authenticate. You can find this information on the Account page of the
Banyan Dashboard. After running this cell, your credentials will be saved
in `$HOME/.banyan/banyanconfig.toml` and will be read from that file in the
future. This means that you only need to run this cell once.

In [1]:
# Import packages
using Banyan
using BanyanDataFrames
using Statistics

In [None]:
# Run this cell to configure the AWS CLI. When prompted, specify your AWS
# credentials for the AWS account that you connected with Banyan. If you have
# already configured the AWS CLI with the credentials for the account you have
# configured with your Banyan account, you can skip this step.

print("Enter AWS_ACCESS_KEY_ID: \n")
ENV["AWS_ACCESS_KEY_ID"] = readline()
print("Enter AWS_SECRET_ACCESS_KEY: \n")
ENV["AWS_SECRET_ACCESS_KEY"] = readline()
print("Enter AWS_DEFAULT_REGION: \n")
ENV["AWS_DEFAULT_REGION"] = readline()

print("AWS is now configured.")

In [None]:
# Run this cell to configure Banyan. When prompted, provide your user ID and API
# key. You can find these on the Account page of your Banyan dashboard.
# If you have already configured Banyan, you can skip this step.

print("Please enter your User ID: \n")
user_id = readline()
print("Please enter your API Key: \n")
api_key = readline()

# Configures Banyan client library with your Banyan credentials
configure(user_id=user_id, api_key=api_key)
print("Banyan is now configured.")

## Creating a cluster

For this example, you will need a Banyan cluster. You can either use an existing
cluster or create a new cluster. Run the following code block and enter in either
the name of an existing cluster or the name you would like to use for a new cluster.

If you already have a cluster, you should specify its name, when prompted.

If you would like to instead create a new cluster, provide a name and the name
of the [Amazon EC2 key pair](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html#having-ec2-create-your-key-pair) that you created during [Banyan setup](https://www.banyancomputing.com/creating-clusters).

In the cell below, you can change `instance_type` to create a cluster with a
different EC2 instance type that may have a larger amount of memory or workers.
See the documentation [here](https://www.banyancomputing.com/banyan-jl-docs/create-cluster/) for the other parameters for creating a cluster.

In [None]:
print("Cluster name for existing cluster or new cluster: ")
cluster_name = readline()
println(cluster_name)

clusters = get_clusters()
println("You have $(length(clusters)) clusters")
if !(haskey(clusters, cluster_name) && clusters[cluster_name].status == :running)
    println("Creating new cluster $(cluster_name)")
    print("Name of SSH EC2 Key Pair: ")
    ec2_key_pair_name = readline()
    println(ec2_key_pair_name)
    create_cluster(
        name=cluster_name,
        instance_type="t3.2xlarge",
        ec2_key_pair_name=ec2_key_pair_name
    )
else
    println("Using existing cluster $(cluster_name)")
end

## Upload data to S3

To demonstrate how Banyan can be used with large data stored in Amazon S3, we
will first upload a file to S3. This cell only needs to be run once. Also, note
that this may take a while to run, due to the nature of S3.

In [None]:
using AWSS3

data_path = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2016-01.csv"
s3_bucket_name = get_cluster_s3_bucket_name(cluster_name)

download(data_path, S3Path("s3://$s3_bucket_name/nyc_tripdata.csv", config=Banyan.get_aws_config()))

## Create a job

In order to perform any computation, we need to allocate resources on the cluster
to perform the computation. To do this, create a job with a specified number
of workers. The number of workers is correlated to the amount of parallelism
and speedup you can get. The more workers, the more parallelized your computation
can be and potentially the faster it can run. For this example, 2 workers should
be sufficient.

In [None]:
# Create a job
job_id = create_job(
    cluster_name = cluster_name,
    nworkers = 2,
    print_logs = true,  # Toggle this if you do not want to view job output printed here in the notebook
    store_logs_in_s3 = true,  # Toggle this if you do not want job logs to be saved
)

## Performing computation on a dataframe

The following code compute several queries on the dataset.

Note that the API for performing various operations on dataframes is the same as that
of the DataFrames library.

In [None]:
# Read the data file
s3_bucket_name = get_cluster_s3_bucket_name(cluster_name)
df = read_csv(
    "s3://$s3_bucket_name/nyc_tripdata.csv",
    source_invalid=true,
    sample_invalid=true
)

In [None]:
# Compute properties of the dataframe

println("Number of rows: $(nrow(df))")
println("Number of columns: $(ncol(df))")
println("Size: $(size(df))")
println("Column names: $(names(df))")

The following cell filter all trips with distance longer than 1.0, groups by passenger count,
and gets the average trip distance for each group.

In [None]:
long_trips = filter(
    row -> row.trip_distance > 1.0,
    df
)
gdf = groupby(long_trips, :passenger_count)
trip_means = combine(gdf, :trip_distance => mean)

trip_means = collect(trip_means)

## Cleanup resources

After running your desired computation, we suggest that you destroy your running
job so that you are not charged for resources when you are not using them. You
may choose to keep your cluster running.

Run the following code block to destroy the job that you created in this example.

In [None]:
# Destroy job
destroy_job(job_id)