<img src="./images/DLI_Header.png" width=400/>

# Lab: Introduction to the Polars GPU Engine #

## Table of Contents ##
<br>
This Lab Notebook briefly introduces Dataframe manipulations using the Polars library and its GPU engine. This notebook covers the below sections:

## Table of Contents ##
<br>
This Lab Notebook briefly introduces Polars and covers the new GPU engine. This notebook covers the below sections:

1. [Introduction to the Polars Library](#Introduction-to-the-Polars-Library)
    * [Creating a DataFrame](#Creating-a-DataFrame)
    * [Running Basic Operations](#Running-Basic-Operations)
2. [Basic Polars Operations](#Basic-Polars-Operations)
    * [Polars Eager Execution API Reference](#Polars-Eager-Execution-API-Reference)
    * [Exercise #1 - Load Data](#Exercise-#1---Load-Data)
    * [Exercise #2 - Calculate Average Age of Population](#Exercise-#2---Calculate-Average-Age-of-Population)
    * [Exercise #3 - Group By and Aggregation](#Exercise-#3---Group-By-and-Aggregation)
    * [Exercise #4 - Gender Distribution](#Exercise-#4---Gender-Distribution)
4. [Lazy Execution](#Lazy-Execution)
    * [Polars Lazy Execution API Reference](#Polars-Lazy-Execution-API-Reference)
    * [Execution Graph](#Execution-Graph)
    * [Exercise #5 - Creating a Lazy Dataframe](#Exercise-#5---Creating-a-Lazy-Dataframe)
    * [Exercise #6 - Query Creation](#Exercise-#6---Query-Creation)
5. [GPU Accelerated Polars](#GPU-Accelerated-Polars)
    * [Accelerate Previous Code](#Accelerate-Previous-Code)
    * [Verify Results Across Engines](#Verify-Results-Across-Engines)
    * [Fallback](#Fallback)
    * [Exercise #7 - Enable GPU Engine](#Exercise-#7---Enable-GPU-Engine)
6. [Conclusion](#Conclusion)

## Introduction to the Polars Library ##
Polars is a data analysis and manipulation library that is designed for large data processing (10-100GB) on a single GPU and is known for its speed and memory efficiency. While Pandas makes use of eager execution, Polars additionally has the capability for lazy execution through the built-in query optimizer and makes use of zero-copy optimization techniques. Due to these improvements, Polars delivers significant acceleration out of the box compared to other CPU-only data manipulation tools.

cuDF is part of the NVIDIA RAPIDS suite of CUDA-X libraries. It’s a GPU-accelerated DataFrame library that harnesses the massive parallelism of GPUs to significantly enhance data processing performance.

Bringing the speed of cuDF on NVIDIA GPUs to the efficiency of Polars adds an additional performance boost, allowing you to maintain an interactive experience as your datasets grow to hundreds of millions or even billions of rows.

### Creating a DataFrame ###
Let's first start by installing Polars.  This version will also install the GPU engine.

In [None]:
pip install polars[gpu] --extra-index-url=https://pypi.nvidia.com

Next, let's download the dataset that we'll be using for today.  Please note, by downloading this dataset, you agree to the [UK national archives licensing terms](https://www.nationalarchives.gov.uk/doc/open-government-licence/version/3/)

In [None]:
# Download and unzip files if they do not exist
!if [ ! -f "./uk_pop.zip" ]; then curl "https://data.rapids.ai/teaching-kit/uk_pop.zip" -o ./uk_pop.zip; else echo "Population dataset already downloaded"; fi
!if [ ! -f "./uk_pop.csv" ]; then unzip -d ./ ./uk_pop.zip ; else echo "Population dataset found and ready"; fi

Now let's see how the syntax looks! We will create a dataframe to use within Polars.

In [None]:
import polars as pl
import time

start_time = time.time()

data = pl.read_csv('./uk_pop.csv')

polars_df = pl.DataFrame(data)
polars_time = time.time() - start_time

print(f"Time Taken: {polars_time:.4f} seconds")

In [None]:
data.head()

### Running Basic Operations ###
That was simple- now let's try running a few operations on the dataset!

In [None]:
start_time = time.time()

#load data
polars_df = pl.read_csv('./uk_pop.csv')

# Filter for ages above 0
filtered_df = polars_df.filter(pl.col('age') > 0.0)

#Sort by name
sorted_df = filtered_df.sort('name', descending=True)

print(sorted_df.head())
polars_time = time.time() - start_time
print(f"Time Taken: {polars_time:.4f} seconds")

## Basic Polars Operations ##
Please refer to the following API reference guide to complete the exercises below.

1. Load data
2. Calculate average age of population
3. Group By and Aggregation
4. Gender Distribution

### Polars Eager Execution API Reference ###

**DataFrame**

The main data structure for eager execution in Polars.

- `pl.DataFrame(data)`: Create a DataFrame from data
- `pl.read_csv(file)`: Read CSV file into DataFrame
- `pl.read_parquet(file)`: Read Parquet file into DataFrame

**Key Methods**

- `filter(mask)`: Filter rows based on a boolean mask
- `select(columns)`: Select specific columns
- `with_columns(expressions)`: Add or modify columns
- `group_by(columns)`: Group by specified columns
- `agg(aggregations)`: Perform aggregations on grouped data
- `sort(columns)`: Sort the data by specified columns
- `join(other, on)`: Join with another DataFrame

**Expressions**

Used to define operations on columns:

- `pl.col("column")`: Reference a column
- `pl.lit(value)`: Create a literal value
- `pl.when(predicate).then(value).otherwise(other)`: Conditional expression

**Series Operations**

- `series.sum()`: Calculate sum of series
- `series.mean()`: Calculate mean of series
- `series.max()`: Find maximum value in series
- `series.min()`: Find minimum value in series
- `series.sort()`: Sort series values

**Data Types**

- `pl.Int64`: 64-bit integer
- `pl.Float64`: 64-bit float
- `pl.Utf8`: String
- `pl.Boolean`: Boolean
- `pl.Date`: Date

**Utilities**

- `pl.concat([df1, df2])`: Concatenate DataFrames
- `df.describe()`: Generate summary statistics
- `df.to_csv(file)`: Write DataFrame to CSV
- `df.to_parquet(file)`: Write DataFrame to Parquet

The eager API executes operations immediately, providing direct access to results. It's suitable for interactive data exploration and smaller datasets.

### Exercise #1 - Load Data ###
Load the csv file into a Dataframe using Polars.

### Exercise #2 - Calculate Average Age of Population ###
Now, filter for individuals aged 65 and above, and sort by ascending age.

### Exercise #3 - Group By and Aggregation ###
Next, group by county and calculate the total population and average age.

### Exercise #4 - Gender Distribution ###
Lastly, let's calculate the percentage of males to females in the sample data.

## Lazy Execution ##
Polars utilizes a technique called lazy execution to perform operations. Unlike eager execution, where operations are performed immediately, Polars defines and stores operations in a computational graph that isn't executed until explicitly required. This allows Polars to optimize the sequence of operations to minimize computation overhead and apply optimization techniques such as: applying filters early (predicate pushdown), selecting only necessary columns (projection pushdown), and executing operations in parallel. To make use of lazy execution in polars, a "LazyFrame" data structure is used.

Now, lets run the same operations with lazy execution and visualize the graph!

### Polars Lazy Execution API Reference ###

**LazyFrame**

The main entry point for lazy execution in Polars. Created from a DataFrame or data source.

- `pl.LazyFrame(data)`: Create a LazyFrame from data.
- `df.lazy()`: Convert a DataFrame to LazyFrame.

**Key Methods**

- `filter(predicate)`: Filter rows based on a condition.
- `select(columns)`: Select specific columns.
- `with_columns(expressions)`: Add or modify columns.
- `group_by(columns)`: Group by specified columns.
- `agg(aggregations)`: Perform aggregations on grouped data.
- `sort(columns)`: Sort the data by specified columns.
- `join(other, on)`: Join with another LazyFrame.
- `collect()`: Execute the lazy query and return a DataFrame.

**Expressions**

Used to define operations on columns:

- `pl.col("column")`: Reference a column.
- `pl.lit(value)`: Create a literal value.
- `pl.when(predicate).then(value).otherwise(other)`: Define a conditional expression.

**Execution**

- `collect()`: Execute and return a DataFrame.
- `fetch(n)`: Execute and return the first n rows.
- `describe_plan()`: Show the query plan for optimization insights.
- `explain()`: Explain the query execution process.

**Optimization**

- `cache()`: Cache intermediate results for faster access.
- `optimize()`: Apply query optimizations to improve performance.

The lazy API allows building complex queries that are optimized before execution, enabling better performance for large datasets.

In [None]:
import polars as pl
import time

start_time = time.time()

# Create a lazy DataFrame
lazy_df = pl.scan_csv('./uk_pop.csv')

# Define the lazy operations
lazy_result = (
    lazy_df
    .filter(pl.col('age') > 0.0)
    .sort('name', descending=True)
)

# Execute the lazy query and collect the results
result = lazy_result.collect()

print(result.head())
polars_time = time.time() - start_time
print(f"Time Taken: {polars_time:.4f} seconds")

### Execution Graph ###

Let's see how the unoptimized execution graph looks.

In [None]:
# Show unoptimized Graph
lazy_result.show_graph(optimized=False)

In [None]:
# Show optimized Graph
lazy_result.show_graph(optimized=True)

As we can see, during execution, Polars ran the age filter in parallel with reading the csv to save time! These type of optimizations is part of the reason why Polars is such a powerful Data Science tool.

### Exercise #5 - Creating a Lazy Dataframe ###
First, let's load the csv as a lazy dataframe.

### Exercise #6 - Query Creation ###
Now, let's create a query to find the 5 most common names for individuals under 30.

## Polars GPU engine ##
The Polars GPU engine is built directly into the Polars Lazy API. The only requirement is to pass engine="gpu" to the collect operation. Polars also allows defining an instance of the GPU engine for greater customization!

In [None]:
lazy_df = pl.scan_csv('./uk_pop.csv').collect(engine="gpu")

Now let's try defining our own engine object!

In [None]:
import polars as pl
import time

gpu_engine = pl.GPUEngine(
    device=0, # This is the default
    raise_on_fail=True, # Fail loudly if we can't run on the GPU.
)

In [None]:
lazy_df = pl.scan_csv('./uk_pop.csv').collect(engine=gpu_engine)

Now that the GPU is warmed up, let's try accelerating the same code as before! Notice that we added an engine parameter to the collect call.

### Accelerate Previous Code ###

In [None]:
start_time = time.time()

# Create a lazy DataFrame
lazy_df = pl.scan_csv('./uk_pop.csv')

# Define the lazy operations
lazy_result = (
    lazy_df
    .filter(pl.col('age') > 0.0)
    .sort('name', descending=True)
)

# Switch to gpu_engine
result = lazy_result.collect(engine=gpu_engine)

print(result.head())
polars_time = time.time() - start_time
print(f"Time Taken: {polars_time:.4f} seconds")

### Verify Results Across Engines ###
How do we know the results are the same with both the CPU and GPU engine? Luckily with Polars, we can execute the same query across both and compare results using the built in testing module!

In [None]:
from polars.testing import assert_frame_equal

# Run on the CPU
result_cpu = lazy_result.collect()

# Run on the GPU
result_gpu = lazy_result.collect(engine="gpu")

# assert both result are equal - Will error if not equal, return None otherwise
if (assert_frame_equal(result_gpu, result_cpu) == None):
    print("The test frames are equal")

### Fallback ###
What happens when an operation isn't supported?

In [None]:
result = (
    lazy_df
    .with_columns(pl.col('age').rolling_mean(window_size=7).alias('age_rolling_mean'))
    .filter(pl.col('age') > 0.0)
    .collect(engine=gpu_engine)
)
print(result[::7])

We intially constructed the GPU engine with raise_on_fail=True to ensure all operations ran on GPU. But as we can see, the rolling mean operation is not currently supported, which results in the query not executing. To enable fallback, we can simply change the raise_on_fail parameter to False.

In [None]:
gpu_engine_with_fallback = pl.GPUEngine(
    device=0, # This is the default
    raise_on_fail=False, # Fallback to CPU if we can't run on the GPU (this is the default)
)

Now let's try this query again.

In [None]:
result = (
    lazy_df
    .with_columns(pl.col('age').rolling_mean(window_size=7).alias('age_rolling_mean'))
    .filter(pl.col('age') > 0.0)
    .collect(engine=gpu_engine_with_fallback)
)
print(result[::7])

### Exercise #7 - Enable GPU Engine ###
The below code calculates the average latitude and longitude for each county. Let's try enabling the GPU Engine for this query!

In [None]:
# Create the lazy query with column pruning
lazy_query = (
    lazy_df
    .select(["county", "lat", "long"])  # Column pruning: select only necessary columns
    .group_by("county")
    .agg([
        pl.col("lat").mean().alias("avg_latitude"),
        pl.col("long").mean().alias("avg_longitude")
    ])
    .sort("county")
)

# Execute the query
result = lazy_query.collect()

print("\nAverage latitude and longitude for each county:")
print(result.head())  # Display first few rows

In [None]:
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

## Conclusion
**Well Done!**
In this lab, you have learned how basic usage of Polars library, using both CPU and GPU acceleration to:
1. [Creating a DataFrame](#Creating-a-DataFrame)
2. Running Polars [Basic Operations](#Running-Basic-Operations) such
    * [Polars Eager Execution API Reference](#Polars-Eager-Execution-API-Reference)
    * [Lazy Execution](#Lazy-Execution)
3. [Understanding the Execution Graph](#Execution-Graph)
5. [Turning on GPU Acceleration in Polars](#cuDF-Polars)

Continue your Polars journey by going by visiting the [Polars user guide](https://docs.pola.rs/user-guide/gpu-support/).  Continue your GPU accelerated data science journey by going to https://github.com/rapidsai-community/showcase/tree/main/accelerated_data_processing_examples