# DuckDB and Polars

So far, we mostly talked about `pandas` and used databased through the SQLite software through the built-in `sqlite3` module.
What are the limitations of them?

Can we do better?

For SQL-based data analytics, there are many open-source or commercially-available software, like MySQL, PostgreSQL, MongoDB, etc. However, there are a couple of recent, open-source, high-performance packages I wanted to discuss. 



## DuckDB

Dr. Hua Zhou, my postdoctoral mentor and the director of the Master of Data Science in Health program, wrote in his BIOSTAT 203B (Intro to Data Science with R) lecture note: 

> BTW, as modern data scientists, we should all start using DuckDB (https://duckdb.org/) instead of SQLite. DuckDB is a modern, embeddable SQL OLAP (Online Analytical Processing) database management system. It is designed to handle analytical workloads (OLAP) on read-only data. It is based on a column-store architecture and is designed to be very fast to query, highly compressible, and run on modern hardware. It is a great alternative to SQLite for analytical workloads.

* OLAP: another word for analytical query workloads, short for "[online analytical processing](https://en.wikipedia.org/wiki/Online_analytical_processing)". Pretty much the software supporting the operations we used in the classroom for analyzing data.

This package does not require you to save your data in a database form. In Python, it can read directly from (compressed) CSV, parquet, and even pandas dataframe. Its performance is remarkable, using multiple threads for reading data file and running the queries.

A quick example: 


In [None]:
import duckdb
import sqlite3
import pandas as pd
duckdb.sql("INSTALL sqlite") # we need sqlite plugin first
with duckdb.connect("employees.sqlite") as conn:
    joined = conn.sql("SELECT * FROM employees LEFT JOIN phone ON employees.name = phone.name;").df()
joined

We can run a SQL command on a pandas dataframe!

In [None]:
with sqlite3.connect("employees.sqlite") as conn:
    employees = pd.read_sql_query("SELECT * FROM employees;", conn)
    phone = pd.read_sql_query("SELECT * FROM phone;", conn)

Note that `employees` and `phone` refer to pandas dataframes at this point.

In [None]:
duckdb.sql("SELECT * FROM employees LEFT JOIN phone ON employees.name = phone.name;").df()

In [None]:
%%time 
with duckdb.connect("temps.db") as conn:
    # conn is automatically closed when this block ends
    cmd = \
    """
    SELECT S.NAME, T.Month, ROUND(AVG(T.Temp), 1) "Mean Temperature"
    FROM temperatures T
    LEFT JOIN stations S ON T.ID = S.ID
    WHERE S.LATITUDE>80 OR S.LATITUDE<-80
    GROUP BY S.NAME, T.Month
    ORDER BY S.NAME
    """
    df = conn.sql(cmd).df()
df

Compare this number to the previous lecture note. 

## Can we go even faster?

It depends. If what you have is the full data, and is unlikely to be changed, it's worth trying saving them in a format that stores the data columnwise. That is how these software work with the data (think of pandas dataframes)! So if data are already stored in this format, DuckDB or polars work faster than working on data stored row-wise (such as CSV files or SQLite databases). Software discussed today both take advantage of columnar data storage, with vectorized operations. 

We install the parquet support for DuckDB first:

In [None]:
duckdb.sql("INSTALL parquet;")

And this is the command to save the `temperatures` table into the `parquet` format. As you might expect, DuckDB can write data in many different formats. By the way, Pandas has support for the `parquet` format that works after installing pandas' optional dependency. 

In [None]:
with duckdb.connect("temps.db") as conn:
    conn.sql("""
    COPY
        (SELECT * FROM temperatures)
        TO 'temps.parquet'
        (FORMAT 'parquet');
    """)

In [None]:
with duckdb.connect("temps.db") as conn:
    stations = conn.sql("SELECT * from stations").df()
stations

In [None]:
cmd = \
"""
SELECT S.NAME, T.Month, ROUND(AVG(T.Temp), 1) "Mean Temperature"
FROM 'temps.parquet' T
LEFT JOIN stations S ON T.ID = S.ID
WHERE S.LATITUDE>80 OR S.LATITUDE<-80
GROUP BY S.NAME, T.Month
ORDER BY S.NAME
"""
df = duckdb.sql(cmd).df()

In [None]:
%timeit df = duckdb.sql(cmd).df()

## Polars

Polars is another tool you will need to keep your eyes on. While DuckDB is optimized for SQL-type queries, polars is optimized for `pandas`-style Python commands, thus being more flexible. People with stronger programming background might prefer this one over DuckDB. The backend of polars is written in the modern Rust language, suitable for effective parallel computing. Let's install the newest version of `polars`:

In [None]:
!pip install polars --upgrade

While this type of installation in general is often less recommended in this course, it's polars guys' preferred way of installation.

In [None]:
import polars as pl

Let's look back at how we processed our data from the beginning:

In [None]:
%%time
intervals = [f"{10 * i + 1}-{10 * (i+1)}" for i in range(190, 202)]# quiz! 1901-1910 to 2011-2020.
dfs = []
for interval in intervals:
    filepath = f"datafiles/{interval}.csv"
    df = pd.read_csv(filepath)
    dfs.append(df)
df = (pd.concat(dfs, axis=0, ignore_index=True)
      .melt(
        id_vars = ["ID", "Year"],
        value_vars = [f"VALUE{i}" for i in range(1, 13)],
        var_name = "Month",
        value_name = "Temp")
      .query("~Temp.isnull()") 
      .assign(Month = lambda x : x.Month.str[5:].astype(int))
      .assign(Temp = lambda x : x.Temp / 100)
      .merge(stations, on="ID", how="inner")
      .query("LATITUDE > 80 | LATITUDE < -80")
      .groupby(["NAME", "Month"])
      .agg({"Temp": "mean"})
      .rename(columns={"Temp": "Mean Temp"})
    )

I intentionally put some unfamiliiar methods for you here, but you can figure it out that the code block above does what we did in the past week. Whew, it's just a single long chain of pandas methods after reading the file! If you have seen R's `dplyr` syntaxes, and how they do `pipe`s, it's pretty similar. 

Polars uses similar syntaxes, but it's much faster! It uses all the cores on your computer, and several datatypes are optimized through specialized internal representations known as the [Arrow](https://arrow.apache.org/docs/format/Columnar.html) format. The code above in polars is:

In [None]:
%%time
df = pl.read_csv("datafiles/*.csv")
stations_pl = pl.from_pandas(stations) # changing pandas dataframe into polars one.
df = (pl.read_csv("datafiles/*.csv")
        .unpivot(index=["ID", "Year"], # melt
              variable_name="Month",
              value_name="Temp")
        .filter(~pl.col("Temp").is_null()) # removing null values
        .with_columns(pl.col("Month").str.slice(5).cast(pl.Int64))
        .with_columns((pl.col("Temp") / 100)) # divide the Temp column by 100
        .join(stations_pl, on="ID", how="inner")
        .filter((pl.col("LATITUDE") > 80) | (pl.col("LATITUDE")< -80))
        .group_by(["NAME", "Month"])
        .agg(pl.col("Temp").mean().alias("Mean Temp"))
)

Polars is a DataFrame interface on top of an OLAP Query Engine implemented in Rust

Polars, as well as DuckDB, supports "lazy evaluation": the command to run can is synthesized before running the actual computation, and it can run later. This provides an opportunity for the software to optimize the computation before it is run. This is done through the type `LazyFrame`: accessible by using functions starting with `scan_*` rather than `read_*`. 

In [None]:
stations_pl = pl.scan_csv("station-metadata.csv")
df = (pl.scan_csv("datafiles/*.csv")
        .unpivot(index=["ID", "Year"], # melt
              variable_name="Month",
              value_name="Temp")
        .filter(~pl.col("Temp").is_null()) # removing null values
        .with_columns(pl.col("Month").str.slice(5).cast(pl.Int64))
        .with_columns((pl.col("Temp") / 100)) # divide the Temp column by 100

        .join(stations_pl, on="ID", how="inner")
        .filter((pl.col("LATITUDE") > 80) | (pl.col("LATITUDE")< -80))
        .group_by(["NAME", "Month"])
        .agg(pl.col("Temp").mean().alias("Mean Temp"))
)
df

What you see is not the final results yet: you have to call the `collect()` method to obtain the final results. Actually, it is similar for `duckdb` -- nothing was actually computed until you call the `df()` method. You could have called `.pl()` instead of `.df()` to obtain a polars dataframe rather than a pandas dataframe. 

In [None]:
%time df.collect()

What if we started from the `parquet` file? We download the station metadata file first if it's not there yet:

In [None]:
import os
import urllib
# download station-metadata.csv if it does not exist
url = "https://raw.githubusercontent.com/PIC16B-ucla/24F/refs/heads/main/datasets/noaa-ghcn/station-metadata.csv"
if not os.path.exists("station-metadata.csv"): 
    urllib.request.urlretrieve(url, "station-metadata.csv")

Then run this lazy query.

In [None]:
stations_pl = pl.scan_csv("station-metadata.csv")
df = (pl.scan_parquet("temps.parquet")
        .join(stations_pl, on="ID", how="inner")
        .filter((pl.col("LATITUDE") > 80) | (pl.col("LATITUDE")< -80))
        .group_by(["NAME", "Month"])
        .agg(pl.col("Temp").mean().alias("Mean Temp"))
)

In [None]:
%timeit df.collect()

Oh, polars also supports some SQL!

In [None]:
%%timeit
ctx = pl.SQLContext(temperatures=pl.scan_parquet("temps.parquet"), stations=pl.scan_csv("station-metadata.csv"))
cmd = \
"""
SELECT S.NAME, T.Month, ROUND(AVG(T.Temp), 1) "Mean Temperature"
FROM temperatures T
LEFT JOIN stations S ON T.ID = S.ID
WHERE S.LATITUDE>80 OR S.LATITUDE<-80
GROUP BY S.NAME, T.Month
"""
rslt = ctx.execute(cmd)
rslt.collect()

All these numbers are remarkably faster than what we have seen in the past week! These two approaches will help you run fast as a data scientist.