# Introduction to Dask DataFrame

Dask DataFrames coordinate many pandas DataFrames/Series arranged along the index. A Dask DataFrame is partitioned row-wise, grouping rows by index value for efficiency.

![Dask DataFrame](https://docs.dask.org/en/stable/_images/dask-dataframe.svg)

Many of existing methods from pandas API are available in Dask DataFrame. Checkout [this section](https://docs.dask.org/en/stable/dataframe.html#scope) of the documentation to learn more about these. In general, computations that are parallelizable are implemented in Dask DataFrame. 

In this lecture, you will learn to use Dask DataFrame to analyze large tabular climate data.  

## Analyzing Multiple Large CSV files using Dask Data Frame

For this tutorial, we will use the NOAA Global Historical Climatology Network Daily (GHCN-D) data available on AWS S3. 
You can reach more about the data on Registry of Open Data on AWS [here](https://registry.opendata.aws/noaa-ghcn/).

More information about the dataset, including the metadata descriptions, is available on [NOAA's website](https://www.ncei.noaa.gov/products/land-based-station/global-historical-climatology-network-daily). 

GHCN-D contains **daily observations** over global land areas. It contains station-based measurements from land-based stations worldwide, about two thirds of which are for precipitation measurement only. Some data are more than *175 years* old.

This dataset is very large and to analyze it within Python you need to use Dask Dataframe.

### Download Data from AWS S3 bucket

You can download the dataset from AWS S3 bucket using the following commands. This dataset does not require an AWS account (hence the `--no-sign-request` flag should be passed).

In [None]:
import boto3
import os
from botocore import UNSIGNED
from botocore.client import Config

def download_s3_objects_no_auth(bucket_name, download_path, prefix, substring, aws_region=None):
    """
    Download all objects from a public S3 bucket (no authentication) that contain a specific prefix and substring in their keys.

    :param bucket_name: The name of the S3 bucket.
    :param download_path: Local directory where the files will be downloaded.
    :param prefix: Characters that are required to be at the begining of the S3 object keys.
    :param substring: The substring to search for in the S3 object keys.
    :param aws_region: AWS region where the S3 bucket is located (optional).
    """
    # Initialize the S3 client with no request signing (public bucket)
    s3_client = boto3.client('s3', config=Config(signature_version=UNSIGNED), region_name=aws_region)

    # Ensure the download path exists
    if not os.path.exists(download_path):
        os.makedirs(download_path)

    # List all objects in the bucket that contain the prefix
    response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
    if 'Contents' not in response:
        print(f"No objects found in the bucket '{bucket_name}'.")
        return

    # Loop through objects and download those that contain the substring
    for obj in response['Contents']:
        key = obj['Key']
        if substring in key:
            local_filename = os.path.join(download_path, key.split('/')[-1])
            print(f"Downloading {key} to {local_filename}...")
            s3_client.download_file(bucket_name, key, local_filename)
            print(f"Downloaded: {local_filename}")


In [None]:
download_s3_objects_no_auth(bucket_name="noaa-ghcn-pds", download_path=".", prefix="csv/by_year/", substring="202")


### Import Packages

In [None]:
import dask.dataframe as dd

In [None]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
client

### Read One CSV file

Let's first load one CSV file and see how Dask Dataframe works. 

In [None]:
df = dd.read_csv("2023.csv", dtype={'Q_FLAG': 'object'})

You can check the number of partitions that Dask by defualt selects. 

In [None]:
df.npartitions

To change the number of partitions you need to define the `blocksize` in the `read_csv` function:

In [None]:
df = dd.read_csv("2023.csv", dtype={'Q_FLAG': 'object'}, blocksize=25e6)

In [None]:
df.npartitions

The following line is an unnecessary step and you should not do it regularly. 
We will just try it to see how `dd` loads the data. 

In [None]:
df.compute()

In [None]:
df

As you see, `df` is empty again. This is because Dask does not store the outputs of the `df.compute()` back in `df`. If you need to keep these values, you should instead run `df = df.compute()`. 

### Read Multiple CSVs

Here, we will define a new df and load multiple CSV files. 

In [None]:
large_df = dd.read_csv("*.csv", dtype={'Q_FLAG': 'object'}, blocksize=25e6)

In [None]:
large_df.npartitions

In [None]:
large_df

In [None]:
# This is going to fail, do not run it. 
# large_df.compute()

Let's calculate the mean of each type of observation in whole dataset. 

In [None]:
mean_values = large_df.groupby("ELEMENT")["DATA_VALUE"].mean()

In [None]:
mean_values

In [None]:
mean_values.compute()

Next, we will select a station in Worcester, MA and calculate the mean for each observation. 
You can see the list of all stations on NOAA's website [here](https://www.ncei.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt). 

In [None]:
worcester_df = large_df[large_df["ID"].isin(["US1MAWR0097"])]

In [None]:
worcester_df

In [None]:
worcester_mean = worcester_df.groupby("ELEMENT")["DATA_VALUE"].mean()

In [None]:
worcester_mean

Now, we want to calculate the mean but we are interested to keep these values in memory. So we will assign the output to a new variable `worcester_mean_values`:

In [None]:
worcester_mean_values = worcester_mean.compute()

In [None]:
worcester_mean_values

### Task: find the station with the highest number of snow days

In the following, we aim to find the station that has the highest number of snow days across years 2020-2024: