<a href="https://colab.research.google.com/github/MiraHatoum/-AAI614_Hatoum/blob/main/notebook7_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# AAI614: Data Science & its Applications

*Notebook 7.1: Introducing Dask*

<a href="https://colab.research.google.com/github/harmanani/AAI614/blob/main/Week%207/Notebook7.1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Source: NVIDIA

# Dask

Dask is not faster than pandas for a single file or for small size data.  It excels for multiple data as it uses lazy computaion. In this lab, we will learn how to use Dask to speed up computation under the correct conditions.

First, let's get these libraries loaded.

In [None]:
!pip install dask

import dask.dataframe as dd
import glob
import pandas as pd
import time
import urllib
import ssl

ssl._create_default_https_context = ssl._create_unverified_context



Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.



## Using Dask versus Pandas

Neither pandas or cuDF can read in multiple CSV files directly with [read_csv](https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html). In order to read multiple files into a DataFrame, we would need to loop through each file and append them together.

To see this, let's pull a couple more files from the [Water Level Website](https://tidesandcurrents.noaa.gov/stations.html?type=Water+Levels). This time, we will request a CSV and save it with the [urllib.request](https://docs.python.org/3/library/urllib.request.html).

We should now have a few `.csv` files in the `data` folder. When referencing these files, we could type out the paths of each of these files individually, but instead, we will use the [glob](https://docs.python.org/3/library/glob.html) library to programmatically do this for us. We can use `*` as a wild card to filter files that match our pattern specified like so:

In [None]:
import os
import glob
import pandas as pd

# Define the folder path
data_folder = r"C:\Users\MiraHatoum\Downloads\data"

# Step 1: Verify folder existence and list files
if os.path.exists(data_folder):
    print("Directory exists.")
    files = os.listdir(data_folder)
    print("Files in the directory:", files)
else:
    print(f"Directory does not exist: {data_folder}")

# Step 2: Add missing .csv extensions
for file in os.listdir(data_folder):
    file_path = os.path.join(data_folder, file)
    if not file.endswith(".csv"):
        new_file_path = file_path + ".csv"
        os.rename(file_path, new_file_path)
        print(f"Renamed: {file_path} to {new_file_path}")

# Step 3: Match .csv files using glob
file_pattern = os.path.join(data_folder, "*.csv")
csv_files = glob.glob(file_pattern)

if csv_files:
    print(f"Found {len(csv_files)} CSV files:")
    for file in csv_files:
        print(file)

    # Step 4: Combine all .csv files into a single DataFrame
    combined_df = pd.concat([pd.read_csv(file) for file in csv_files], ignore_index=True)
    print("\nCombined DataFrame:")
    print(combined_df.head())
else:
    print("No matching .csv files found.")


Directory exists.
Files in the directory: []
No matching .csv files found.


Each path starts with `data`, ends with `.csv`, and the `*` indicates to pick up anything in between. Let's set up a for loop to see how long it takes to read all of these files. Run the block **twice** to see how much faster cuDF is after it has been initialized.

In [None]:
import pandas as pd
import glob

# Define the folder path and file pattern
data_folder = r"C:\Users\MiraHatoum\Downloads\data"
file_pattern = data_folder + r"\*.csv"

# Use glob to find all .csv files
file_paths = glob.glob(file_pattern)

if not file_paths:
    print("No matching .csv files found.")
else:
    print(f"Found {len(file_paths)} CSV files:")
    for file in file_paths:
        print(file)

    # Define the function to read and combine CSV files
    def read_all(library, file_paths):
        df_list = []
        for file in file_paths:
            try:
                df = library.read_csv(
                    file,
                    index_col=None,
                    header=None,  # Change to `0` if files have headers
                    usecols=[0, 1, 2, 4, 5],  # Select specific columns
                    skiprows=1  # Skip the first row if it's not data
                )
                df_list.append(df)
            except Exception as e:
                print(f"Error reading {file}: {e}")
        return library.concat(df_list, axis=0, ignore_index=True)

    # Use pandas to read files
    df_cpu = read_all(pd, file_paths)

    # Display the combined DataFrame
    print("\nCombined DataFrame:")
    print(df_cpu.head())


No matching .csv files found.


In [None]:
import os
import glob
import pandas as pd

# Step 1: Define the directory path
data_folder = r"C:\Users\MiraHatoum\Downloads\data"

# Step 2: Check if the directory exists
if not os.path.exists(data_folder):
    print(f"Directory does not exist: {data_folder}")
else:
    print(f"Directory exists: {data_folder}")

    # Step 3: List all files in the directory
    files = os.listdir(data_folder)
    print("Files in the directory:", files)

    # Step 4: Add missing .csv extensions if needed
    for file in files:
        file_path = os.path.join(data_folder, file)
        if not file.endswith(".csv") and os.path.isfile(file_path):
            new_file_path = file_path + ".csv"
            os.rename(file_path, new_file_path)
            print(f"Renamed: {file_path} to {new_file_path}")

    # Step 5: Use glob to find all .csv files
    file_pattern = os.path.join(data_folder, "*.csv")
    csv_files = glob.glob(file_pattern)

    if not csv_files:
        print("No matching .csv files found.")
    else:
        print(f"Found {len(csv_files)} CSV files:")
        for file in csv_files:
            print(file)

        # Step 6: Define the function to read and combine all CSV files
        def read_all(library, file_paths):
            df_list = []
            for file in file_paths:
                try:
                    # Read each CSV file
                    df = library.read_csv(
                        file,
                        index_col=None,
                        header=0,  # Adjust to None if there's no header
                        usecols=[0, 1, 2, 4, 5],  # Adjust column selection as needed
                        skiprows=0  # Adjust if you need to skip rows
                    )
                    df_list.append(df)
                except Exception as e:
                    print(f"Error reading {file}: {e}")
            return library.concat(df_list, axis=0, ignore_index=True)

        # Step 7: Combine CSV files using pandas
        try:
            df_cpu = read_all(pd, csv_files)
            print("\nCombined DataFrame:")
            print(df_cpu.head())
        except Exception as e:
            print(f"Error combining CSV files: {e}")


Directory exists: C:\Users\MiraHatoum\Downloads\data
Files in the directory: []
No matching .csv files found.


Since Dask is made to be parallel, we do not need a for loop. It can read multiple files natively.

The below code shows how to read data in parallel. This only sets up the process to read the files. we need to force Dask to *compute*

In [None]:
import os
import glob
import dask.dataframe as dd

# Step 1: Define the folder path
data_folder = r"C:\Users\MiraHatoum\Downloads\data"

# Step 2: Verify the directory and list files
if not os.path.exists(data_folder):
    print(f"Directory does not exist: {data_folder}")
else:
    print(f"Directory exists: {data_folder}")
    files = os.listdir(data_folder)
    print("Files in the directory:", files)

    # Step 3: Use glob to find all .csv files
    file_pattern = os.path.join(data_folder, "*.csv")
    file_paths = glob.glob(file_pattern)

    if not file_paths:
        print("No matching .csv files found.")
    else:
        print(f"Found {len(file_paths)} CSV files:")
        for file in file_paths:
            print(file)

        # Step 4: Read files using Dask
        try:
            ddf_cpu = dd.read_csv(file_paths, usecols=[0, 1, 2, 4, 5], header=0, skipinitialspace=True)
            print("Dask DataFrame loaded.")

            # Step 5: Compute the result
            result = ddf_cpu.compute()
            print("\nCombined DataFrame:")
            print(result.head())
        except Exception as e:
            print(f"Error reading files with Dask: {e}")


Directory exists: C:\Users\MiraHatoum\Downloads\data
Files in the directory: []
No matching .csv files found.


Let's sample our data to confirm it had been read correctly. This time, we will only be working with the first three columns of data.

In [None]:
import os
import glob
import dask.dataframe as dd

# Define the folder path
data_folder = r"C:\Users\MiraHatoum\Downloads\data"

# Check if directory exists
if not os.path.exists(data_folder):
    print(f"Directory does not exist: {data_folder}")
else:
    print(f"Directory exists: {data_folder}")
    files = os.listdir(data_folder)
    print("Files in the directory:", files)

    # Use glob to find .csv files
    file_pattern = os.path.join(data_folder, "*.csv")
    file_paths = glob.glob(file_pattern)

    if not file_paths:
        print("No matching .csv files found.")
    else:
        print(f"Found {len(file_paths)} CSV files:")
        for file in file_paths:
            print(file)

        # Create Dask DataFrame
        try:
            ddf_cpu = dd.read_csv(file_paths, usecols=[0, 1, 2], header=0, skipinitialspace=True)
            print("Dask DataFrame created successfully.")

            # Sample data
            print("Sampling data from Dask DataFrame:")
            print(ddf_cpu.head())

            # Visualize DAG
            print("Visualizing DAG...")
            ddf_cpu.visualize(filename='dag.png')
            print("DAG saved as 'dag.png'.")

        except Exception as e:
            print(f"Error creating Dask DataFrame: {e}")


Directory exists: C:\Users\MiraHatoum\Downloads\data
Files in the directory: []
No matching .csv files found.


How can Dask do this faster than regular pandas or cuDF? Under the hood, Dask is building a system of operations called a DAG. We can view this DAG with the [visualize](https://docs.dask.org/en/latest/graphviz.html) method.