# Framing DataFrames

### Pyowa - 2025-05-27
### Adam Best

# About Me

- 6 years of experience as a Data Engineer - 10 years with data
- John Deere Engine Works - Quality Engineering
- General Dynamics IT - Healthcare Data Warehouse
- Dwolla - Data Platform/Warehouse + AWS
- John Deere Financial - Database Administration + AWS
- Tractor Zoom - Data Platform + AWS + Databricks

# Goals
- Understanding of when and how to use a DataFrame
- Cover some of the options, and what tools might be best for your use case
- Some deeper dives into Daft as an example library


### Will not cover
- Exact syntax of each library
- How to build a data pipeline
- Machine learning/AI/Agents

# Stirring the data
![AI](https://imgs.xkcd.com/comics/machine_learning.png)

# What is a DataFrame?

### What is a DataFrame?
- Tabular data structure with rows and columns (2D)
- Each column has a consistent data type (homogeneous)
- Columns are aligned and indexed for efficient access
- Supports operations on entire columns (vectorized)
- Provides built-in methods for data manipulation

Emphasize - vectorized operations

# Common tasks

- Selecting and filtering
- Aggregations - sum, count
- Joining or merging
- Grouping

# Ways to create a DataFrame
- Python `list` of `dict`s
- Python `dict` of `list`s
- NumPy arrays (pandas)
- Files (csv, json, parquet, excel, etc.)
- SQL Database `pandas.read_sql`

# Essential Tools - Inspecting Data
- `head(n)` and `tail(n)` - first and last n rows
- `info()` - consise summary of the DataFrame, including data types and missing values
- `describe()` - statistical summary of numeric columns
- `shape` - number of rows and columns
- `columns` - list of column names
- `dtypes` - data types of columns

# Selection and Filtering
- Column selection - `df['col1']`, `df[['col1', 'col2']]`, `df.col1`
- Row selection by integer position (pandas) - `df.iloc[0]`, `df.iloc[[0, 1, 2]]`, `df.iloc[:, 0]`
- Boolean indexing (filtering) - `df[df['column_name'] > some_value]`

# Missing Values
- `.isnull()` - check for missing values
- `.dropna()` - remove rows or columns with missing values
- `.fillna(value)` - replace missing values with a specific value

# Column Operations
- `df['new_column_name'] = df['existing_column'] * 2` - add a new column
- `df['column_name'] = df['column_name'] + 5` - modify an existing column
- `del df['column_name']` or `df.drop(columns=['column_name'])` - remove a column
- `df.rename(columns={'old_name': 'new_name'})` - rename a column

# Sorting, Grouping, and Aggregations
- `df.sort_values('column_name')`
    - sort by a column
- `df.groupby('category_column')['value_column'].mean()`
    - average of value_col for each unique value in category_column
- `df.groupby('category_column')['value_column'].agg(['mean', 'sum', 'count'])`
    - multiple aggregations

# Combining DataFrames
- `df1.join(df2, on='common_column', how='inner')`
    - merge two DataFrames on a common column
- `df1.concat(df2)`
    - concatenate two DataFrames vertically

In [None]:
# Example DataFrame
from pandas import DataFrame

df = DataFrame({
    'name': ['John', 'Jane', 'Jim', 'Jill'],
    'age': [20, 21, 22, 23],
    'city': ['New York', 'Los Angeles', 'Chicago', 'Houston']
})
df.head()

# What isn't a DataFrame?

In [None]:
# List of Dicts
data = [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]
# No indexing or operations on fields
# e.g. fetch all records with age > 25

In [None]:
# Dict of List
data = {"name": ["Alice", "Bob"], "age": [30, 25]}
# Column based, but lacks type and shape enforcement
# Similar to how Dataframes are usually structured conceptually, lacks vectorized operations

In [None]:
# Named Tuple + Dataclasses (closer)
from collections import namedtuple

Person = namedtuple("Person", ["name", "age"])
people = [Person("Alice", 30), Person("Bob", 25)]
# again, no operations on specific fields

# SQL Tables
```sql
CREATE TABLE people
(id int,
 name, varchar(30),
 age, int);

SELECT COUNT(*) FROM people WHERE age > 30;
```
- Very similar functionality to DataFrames
- Declarative, lacks reusability

# A note on Vectorization
![Vectorize it](./resources/vectorize_it.png)

In [2]:
import pandas as pd
from datetime import datetime

df = pd.read_csv('../resources/large_dataset.csv')

before_v = datetime.now()
# ... Vectorized operation:
df["ratio"] = 100 * (df["zip"] / df["id"])
after_v = datetime.now()


In [3]:
# ... Non-vectorized operation: (DON'T DO THIS)
def calc_ratio(row):
    return 100 * (row["zip"] / row["id"])

before_row = datetime.now()
df["ratio2"] = df.apply(calc_ratio, axis=1)
after_row = datetime.now()

In [None]:
print(f"Vectorized time: {(after_v - before_v).total_seconds():.4f} seconds")
print(f"Non-vectorized time: {(after_row - before_row).total_seconds():.4f} seconds")
print(
    f"Speedup: {(after_row - before_row).total_seconds() / (after_v - before_v).total_seconds():.2f}x"
)

# Summary
Each of these represents structured or semi-structured data, but only DataFrames combine labeling, tabular structure, and column operations in one unified tool.

### When to use a DataFrame
- Working with hundreds or thousands of rows/columns
- Performance is important
- Need to scale
- Ok to install dependencies

On dependencies: This repo has 117

`uv pip freeze | wc -l`

### When not to use a DataFrame
- Working with a few rows/columns, not expected to grow
- Unable to install dependencies
- "Excel is fine"

# So you want to use a DataFrame
![Choices](./resources/choices.png)

# The options (some of them)
- Pandas
- PySpark
- Dask
- Polars
- Daft

# Pandas
[Docs](https://pandas.pydata.org/docs/reference/index.html)
- ~2012
- huge userbase
- mature and flexible
- single threaded, memory constrained
- Cython/C based
- Seamless integration with NumPy, Matplotlib/Seaborn (for plotting), and Scikit-learn (for machine learning)

In [5]:
dummy_data = {
    "name": ["John", "Jane", "Jim", "Jill"],
    "age": [20, 21, 22, 23],
    "city": ["New York", "Los Angeles", "Chicago", "Houston"],
}

In [None]:
import pandas as pd

pd_df = pd.DataFrame(dummy_data)

avg_age_by_city_pandas = pd_df.groupby("city")["age"].mean()
avg_age_by_city_pandas.head()

# PySpark
[Docs](https://spark.apache.org/docs/latest/api/python/reference/pyspark.html)
- ~2010
- Massively horizontally scalable
- Excellent SQL syntax support
- high overhead for small datasets
- JVM - Scala based

Emphasis on large number of compute nodes

lineage of data and fault tolerance

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.appName("DataFrameDemo").getOrCreate()

schema = StructType([
    StructField("name", StringType()),
    StructField("age", IntegerType()),
    StructField("city", StringType())
])

spk_df = spark.createDataFrame(pd_df,  # Note: Pandas DataFrame source
                               schema=schema)

avg_age_by_city_spark = spk_df.groupBy("city").mean("age")
avg_age_by_city_spark.show()


# Dask
[Docs](https://docs.dask.org/en/latest/dataframe-api.html)
- ~2015
- Parallel and lazy evaluation
- Similar to Pandas API
- Handles larger than memory datasets
- Python based
- Delegates to other libraries for execution

A layer on top of Pandas to address the single threaded nature of Pandas

In [None]:
import dask.dataframe as dd

dask_df = dd.from_dict(dummy_data, npartitions=2)
avg_age_by_city_dask = dask_df.groupby("city")["age"].mean()
avg_age_by_city_dask.compute()  # Note: necessary to trigger computation
avg_age_by_city_dask.head()

# Polars
[Docs](https://docs.pola.rs/user-guide/getting-started/)
- ~2021
- Fast - parallel and lazy evaluation
- Less flexible for edge cases
- Rust based

In [None]:
import polars as pl

pl_df = pl.DataFrame(dummy_data)

avg_age_by_city_polars = pl_df.group_by("city").agg(pl.col("age").mean())
avg_age_by_city_polars.head()

# Daft
[Docs](https://www.getdaft.io/projects/docs/en/stable/quickstart/)
- ~2023
- Python + Rust + SQL
- Scales Vertically and Horizontally (with Ray)
- Cloud Storage Optimized

Ray - Open source distributed execution engine

Eventual Inc backed

In [None]:
import daft

daft_df = daft.from_pydict(dummy_data)
avg_age_by_city_daft = daft_df.groupby("city").agg(daft.col("age").mean())

avg_age_by_city_daft.show()

In [None]:
# Side by side
pd_df.groupby("city")["age"].mean()  # Pandas
spk_df.groupBy("city").mean("age")  # PySpark
dask_df.groupby("city")["age"].mean()  # Dask
pl_df.group_by("city").agg(pl.col("age").mean())  # Polars
daft_df.groupby("city").agg(daft.col("age").mean())  # Daft

Pandas <> Dask very similar

Polars <> Daft very similar

![Standards](https://imgs.xkcd.com/comics/standards.png)

# Enter Ibis (and others)
[Docs](https://ibis-project.org/tutorials/basics)
> Ibis defines a Python dataframe API that executes on any query engine – the frontend for any backend data platform, with nearly 20 backends today. This allows Ibis to have excellent performance – as good as the backend it is connected to – with a consistent user experience.

# Modin
[Docs](https://modin.readthedocs.io/en/latest/getting_started/why_modin/modin_vs_dask_vs_koalas.html)
> Libraries such as Dask DataFrame (DaskDF for short) and Koalas aim to support the pandas API on top of distributed computing frameworks, Dask and Spark respectively. Instead, Modin aims to preserve the pandas API and behavior as is, while abstracting away the details of the distributed computing framework underneath. Thus, the aims of these libraries are fundamentally different.

Koalas - now included in PySpark (Pandas API on Spark)

```python
import pyspark.pandas as ps
```


# More Modin
> Specifically, Modin
    - enables pandas-like row and column-parallel operations, unlike DaskDF and Koalas that only support row-parallel operations
    - indexing & ordering semantics, unlike DaskDF and Koalas that deviate from these semantics
    - eager execution, unlike DaskDF and Koalas that provide lazy execution

# Comparing

| Feature/Need             | pandas | Polars | Dask | PySpark | Daft  |
|--------------------------|--------|--------|------|---------|-------|
| Easy local work          | ✅     | ✅     | ⚠️   | ⚠️      | ✅    |
| Huge files (10M+ rows)   | ⚠️     | ✅     | ✅   | ✅      | ✅    |
| Multi-core               | ❌     | ✅     | ✅   | ✅      | ✅    |
| Clustered / distributed  | ❌     | ⚠️     | ✅   | ✅      | ✅    |
| SQL-like syntax          | ⚠️     | ⚠️     | ⚠️   | ✅      | ⚠️    |
| Learning curve           | Easy   | Medium | Medium | Steep  | Medium |

### Another Comparison table (from Daft docs)
![Table](./resources/comparison.png)

# PyArrow vs Parquet
- Parquet - disk based columnar storage
- PyArrow - in memory columnar storage
    - Multi language support, super efficient in memory OLAP

### Quick Benchmarks
![Benchmarks](./resources/benchmarks.png)

Dask - slower than pandas, but uses less memory

Polars - very fast

Daft - not as fast as Polars, higher overhead

PySpark - Possible issue with benchmark methodology

In [None]:
# Let's get our hands dirty
import daft

taxi_data = "../resources/2023_Green_Taxi_Trip_Data_20250514.csv"
df = daft.read_csv(taxi_data).collect()

In [None]:
# sample it
df.show(3)
df.count().collect()

In [None]:
# how many trips had a distance > 10 miles and > 1 passenger?
df = df.with_column("is_long_multi", 
                   (df["trip_distance"] > 10) & (df["passenger_count"] > 1))

percent_count = (df.groupby("is_long_multi")
                 .agg(daft.col("VendorID")
                      .count()
                      .alias("count"))
                      .collect())
percent_count

In [None]:
# Why None?
(df.select(df["is_long_multi"], df["trip_distance"], df["passenger_count"])
    .where(df["is_long_multi"].is_null())
    .show(3))

In [None]:
# how about nested object records?
d = {
    "timestamp": "2025-01-02T02:03:04.000Z",
    "id": 123486993,
    "obj": {
        "key": "my_key",
        "value": "my_value"
    }
}
# works from_json() as well
df = daft.from_pydict({"log": [d]}).collect()

(df.filter(df["log"]["obj"]["value"] == 'my_value')
 .select(df["log"]["obj"]["key"])).show()

In [None]:
# Example of lazy execution
large_df = daft.read_csv(taxi_data)
result = (
    large_df.filter(large_df["total_amount"] > 500.00)  # Filter first
    .select(large_df["DOLocationID"], large_df["total_amount"])  # Then select only needed columns
    .sort("total_amount", desc=True)  # Order by amount
    .limit(5)  # Finally take just 5 rows
)

In [None]:
# only now are results computed
result.show(10)

In [None]:
# Joining datasets
taxi_df = daft.read_csv(taxi_data)
locations_df = daft.from_pydict({"location_id": [145, 265]})

joined_df = taxi_df.join(
    locations_df, left_on="DOLocationID", right_on="location_id", how="inner"
).collect()

joined_df.show(3)

# Daft specific features
- Every library is different, built for different use cases

## Live Query Over Public S3 Parquet File
Read directly from an S3-hosted dataset with no explicit download step.
> The total file size is around 37 gigabytes, even in the efficient Parquet file format.

In [None]:
import daft

s3_df = daft.read_parquet("s3://ursa-labs-taxi-data/2019/*/data.parquet")
filtered = s3_df.where(s3_df["passenger_count"] > 4)
filtered.show(5)

## Lazy Filtering and Optimized Execution Plan
Filters are not executed until necessary and are optimized into a single step.

In [None]:
df = daft.read_csv("../resources/large_dataset.csv")
# For demonstration, we use a small generated DataFrame
df = daft.from_pydict({"col1": list(range(100)), "col2": ["NY"] * 50 + ["CA"] * 50})
filtered = df.where(df["col1"] > 10).where(df["col2"] == "NY")
filtered.explain(show_all=True)

## Visualize Execution DAG
Another example with a smaller dataset.

In [None]:
df = daft.from_pydict({"val": list(range(100)), "category": ["A"] * 50 + ["B"] * 50})

result = df.where(df["val"] > 10).groupby("category").agg(df["val"].mean())

result.explain(show_all=True)

In [None]:
# s3 scan example explain
filtered.explain(show_all=True)

## Manual Repartitioning for Optimized Joins
Control the physical layout of your data to speed up joins.

In [None]:
df1 = daft.from_pydict({"user_id": list(range(100)), "score": list(range(100))})
df2 = daft.from_pydict({"user_id": list(range(50, 150)), "label": ["active"] * 100})

partitioned_df1 = df1.repartition(None, "user_id")
partitioned_df2 = df2.repartition(None, "user_id")
partitioned_joined = partitioned_df1.join(partitioned_df2, on="user_id")
partitioned_joined.show(5)

Partitioning ensures related data is colocated

Typically should be handled by the optimizer, can be a useful performance optimization

# Write to a Delta table
- Provides:
    - ACID transactions
        - Atomicity, Consistency, Isolation, Durability
    - Scalable metadata
    - Interoperability with other data tools
    - History and recovery (Time Travel)
    - Schema enforcement and evolution

Basis of Unity Catalog

Comparable to Apache Iceberg

In [None]:
# Example (install Daft deltalake module)
import daft

df = daft.read_csv("../resources/large_dataset.csv")
df.write_deltalake("../resources/delta_table", mode="overwrite")
# Read from a Delta table
delta_df = daft.read_deltalake("../resources/delta_table")
delta_df.show(5)

In [None]:
# Try writing a different dataset to that table:
taxi_df = daft.read_csv("../resources/2023_Green_Taxi_Trip_Data_20250514.csv")
taxi_df.write_deltalake("../resources/delta_table")

Very popular in the data lake space

Watch out for small files, can cause performance issues

# Wrapping up
- Every library is different
- Each has tradeoffs
- Makes scalable python data analysis easier
- Daft is a good fit for many use cases
- Disclaimer: I'm not affiliated with Daft, I just think they're cool

# Resource Links/Recommended Reading:
- [Parquet vs Arrow](https://medium.com/@diehardankush/comparing-data-storage-parquet-vs-arrow-aa2231e51c8a)
- [Understanding Vectorization](https://medium.com/analytics-vidhya/understanding-vectorization-in-numpy-and-pandas-188b6ebc5398)
- [This presentation code](https://github.com/adam133/pyowa-dataframes)
- [Daft docs](https://www.getdaft.io/projects/docs/en/stable/)
- [Pandas docs](https://pandas.pydata.org/docs/reference/index.html)
- [Polars docs](https://docs.pola.rs/)
- [Dask docs](https://docs.dask.org/en/stable/)
- [PySpark docs](https://spark.apache.org/docs/latest/api/python/reference/index.html)
- LinkedIn: [Adam Best](https://www.linkedin.com/in/adamrbest)
- Active in the [PyOwa Discord](https://discord.gg/KSsEVdVpGZ)

Thanks to PyOwa and Source Allies for hosting

# Questions?
![End](./resources/end.jpg)