Skip to content

Commit

Permalink
Merge pull request #179 from jeffbrennan/add_doc_content
Browse files Browse the repository at this point in the history
Add doc content
  • Loading branch information
jeffbrennan committed Jan 28, 2024
2 parents ff044cd + 8480604 commit 8b43774
Show file tree
Hide file tree
Showing 13 changed files with 740 additions and 12,964 deletions.
89 changes: 73 additions & 16 deletions benchmarks/visualize_benchmarks.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from __future__ import annotations

from datetime import datetime as dt
from pathlib import Path

import pandas as pd
import plotly.express as px
import pyspark.sql.functions as F # noqa: N812
import pytz
from pyspark.sql import SparkSession


def parse_results(spark: SparkSession) -> tuple[pd.DataFrame]:
def parse_results(spark: SparkSession) -> tuple[pd.DataFrame, pd.DataFrame, str]:
"""Parse benchmark results into a Pandas DataFrame."""
result_df = (
spark.read.json("benchmarks/results/*.json", multiLine=True)
Expand All @@ -33,6 +37,9 @@ def parse_results(spark: SparkSession) -> tuple[pd.DataFrame]:
.toPandas()
)

if not isinstance(result_df, pd.DataFrame):
raise TypeError

result_df["dataset_name"] = pd.Categorical(
result_df["dataset_name"],
["xsmall", "small", "medium", "large"],
Expand All @@ -45,14 +52,14 @@ def parse_results(spark: SparkSession) -> tuple[pd.DataFrame]:
.reset_index()
)

return result_df, average_df
benchmark_date = get_benchmark_date(benchmark_path="benchmarks/results/")
return result_df, average_df, benchmark_date


def show_boxplot(df: pd.DataFrame) -> None:
def save_boxplot(df: pd.DataFrame, benchmark_date: str) -> None:
"""Displays faceted boxplot of benchmark results."""
today_str = pd.Timestamp.today().strftime("%Y-%m-%d")
machine_config = "Python 3.12.0, Spark 3.5, Pandas 2.1.3, M1 Macbook Pro 32GB RAM"
subtitle = f"<sup>{today_str} | {machine_config}</sup>"
subtitle = f"<sup>{benchmark_date} | {machine_config}</sup>"

fig = px.box(
df,
Expand All @@ -63,21 +70,41 @@ def show_boxplot(df: pd.DataFrame) -> None:
points="all",
title=f"Column to List Benchmark Results<br>{subtitle}</br>",
labels={"runtime": "Runtime (seconds)"},
category_orders={"dataset_name": ["xsmall", "small", "medium", "large"]},
category_orders={
"dataset_name": ["xsmall", "small", "medium", "large"],
"test_name": [
"localIterator",
"collectlist",
"map",
"flatmap",
"toPandas",
],
},
color_discrete_map={
"collectlist": "#636EFA",
"localIterator": "#EF553B",
"toPandas": "#00CC96",
"map": "#AB63FA",
"flatmap": "#FFA15A",
},
)
fig.update_yaxes(matches=None)
fig.update_yaxes({"tickfont": {"size": 9}})
fig.for_each_yaxis(lambda yaxis: yaxis.update(showticklabels=True))
fig.update_xaxes(matches=None, title=None)
fig.update_layout(legend_title_text="")

fig.show()
fig.write_image(
"benchmarks/images/column_to_list_boxplot.svg",
width=1000,
height=700,
)


def show_line_plot(df: pd.DataFrame) -> None:
def save_line_plot(df: pd.DataFrame, benchmark_date: str) -> None:
"""Displays line plot of average benchmark results."""
today_str = pd.Timestamp.today().strftime("%Y-%m-%d")
machine_config = "Python 3.12.0, Spark 3.5, Pandas 2.1.3, M1 Macbook Pro 32GB RAM"
subtitle = f"<sup>{today_str} | {machine_config}</sup>"
subtitle = f"<sup>{benchmark_date} | {machine_config}</sup>"
fig = px.line(
df,
x="dataset_size",
Expand All @@ -86,23 +113,53 @@ def show_line_plot(df: pd.DataFrame) -> None:
color="test_name",
title=f"Column to List Benchmark Results<br>{subtitle}</br>",
labels={"runtime": "Runtime (seconds)", "dataset_size": "Number of Rows"},
category_orders={
"test_name": [
"localIterator",
"collectlist",
"map",
"flatmap",
"toPandas",
],
},
color_discrete_map={
"collectlist": "#636EFA",
"localIterator": "#EF553B",
"toPandas": "#00CC96",
"map": "#AB63FA",
"flatmap": "#FFA15A",
},
)
fig.update_traces(mode="markers+lines")
fig.update_traces(marker={"size": 15})
fig.update_traces(marker={"size": 12})
fig.update_layout(legend_title_text="")

fig.show()
fig.write_image(
"benchmarks/images/column_to_list_line_plot.svg",
width=900,
height=450,
)


def get_benchmark_date(benchmark_path: str) -> str:
"""Returns the date of the benchmark results."""
path = Path(benchmark_path)
benchmark_ts = path.stat().st_mtime
return dt.fromtimestamp(
benchmark_ts,
tz=pytz.timezone("US/Eastern"),
).strftime("%Y-%m-%d")


if __name__ == "__main__":
spark = (
SparkSession.builder.appName("MyApp")
SparkSession.builder.appName("MyApp") # type: ignore
.config("spark.executor.memory", "10G")
.config("spark.driver.memory", "25G")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
)

result_df, average_df = parse_results(spark)
show_boxplot(result_df)
show_line_plot(average_df)
result_df, average_df, benchmark_date = parse_results(spark)
save_boxplot(result_df, benchmark_date)
save_line_plot(average_df, benchmark_date)
5 changes: 5 additions & 0 deletions docs/examples/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Examples

Example Quinn code snippets

- [Schema as Code](../notebooks/schema_as_code.ipynb)
1 change: 1 addition & 0 deletions docs/images/column_to_list_boxplot.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/images/column_to_list_line_plot.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/quinn.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
24 changes: 22 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,30 @@
# Quinn

Quinn containts PySpark helper methods that will make you more productive.
![quinn logo](images/quinn.png)

Quinn contains PySpark helper methods that will make you more productive.

Quinn is also a great way to learn about PySpark best practices like how to organize and unit test your code.

We have a solid group of maintainers, chat on contributor meetings reguarly, and eagerly accept contributions from other members.
## Contributing

We have a solid group of maintainers, chat on contributor meetings regularly, and eagerly accept contributions from other members.

We want to help the world write beautiful PySpark and give them a wonderful developer experience.

### Code Style

We are using [PySpark code-style](https://github.com/MrPowers/spark-style-guide/blob/main/PYSPARK_STYLE_GUIDE.md) and `sphinx` as docstrings format. For more details about `sphinx` format see [this tutorial](https://sphinx-rtd-tutorial.readthedocs.io/en/latest/docstrings.html). A short example of `sphinx`-formatted docstring is placed below:

```python
"""[Summary]
:param [ParamName]: [ParamDescription], defaults to [DefaultParamVal]
:type [ParamName]: [ParamType](, optional)
...
:raises [ErrorType]: [ErrorDescription]
...
:return: [ReturnDescription]
:rtype: [ReturnType]
"""
```
136 changes: 136 additions & 0 deletions docs/learn_more/column_to_list.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Column to list performance

In PySpark, there are many approaches to accomplish the same task. Given a test DataFrame containing two columns - mvv and count, here are five methods to produce an identical list of mvv values using base PySpark functionality.

---

## Setup

```python
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
```

```python
spark = SparkSession.builder.getOrCreate()
vals = [(0, 5), (1, 10), (2, 4), (3, 2), (4, 1)]
df = spark.createDataFrame(count_vals, schema="mvv int, count int")
```

---

## Approaches

### 1. toPandas()

```python
list(df.select("mvv").toPandas()["mvv"])
# [0, 1, 2, 3, 4]
```

### 2. flatMap

```python
df.select("mvv").rdd.flatMap(lambda x: x).collect()
# [0, 1, 2, 3, 4]
```

### 3. map

```python
df.select("mvv").rdd.map(lambda row: row[0]).collect()
# [0, 1, 2, 3, 4]
```

### 4. collect list comprehension

```python
[row[0] for row in df.select("mvv").collect()]
# [0, 1, 2, 3, 4]
```

### 5. toLocalIterator() list comprehension

```python
[row[0] for row in df.select("mvv").toLocalIterator()]
# [0, 1, 2, 3, 4]
```

---

## Benchmark Results

Substantial runtime differences were observed on the medium and large datasets:

![box plot](../images/column_to_list_boxplot.svg)

![line plot](../images/column_to_list_line_plot.svg)

All approaches have similar performance at 1K and 100k rows. `toPandas()` is consistently the fastest method across the tested dataset sizes, and exhibits the least variance in runtime. However, `pyarrow` and `pandas` are not required dependencies of Quinn so this method will only work with those packages available. For typical spark workloads, the `flatMap` approach is the next best option to use by default.

---

## Quinn Implementation

[:material-api: `quinn.column_to_list`](https://mrpowers.github.io/quinn/reference/quinn/dataframe_helpers)

To address these performance results, we updated `quinn.column_to_list()` to check the runtime environment and use the fastest method. If `pandas` and `pyarrow` are available, `toPandas()` is used. Otherwise, `flatmap` is used.

---

## More Information

### Datasets

Four datasets were used for this benchmark. Each dataset contains two columns - mvv and index. The mvv column is a monotonically increasing integer and the count column is a random integer between 1 and 10. The datasets were created using the `create_benchmark_df.py` script in `quinn/benchmarks`

| Dataset name | Number of rows | Number of files | Size on disk (mb) |
| ------------ | -------------- | --------------- | ----------------- |
| mvv_xsmall | 1,000 | 1 | 0.005 |
| mvv_small | 100,000 | 1 | 0.45 |
| mvv_medium | 10,000,000 | 1 | 45 |
| mvv_large | 100,000,000 | 4 | 566 |

---

### Validation

The code and results from this test are available in the `/benchmarks` directory of Quinn. To run this benchmark yourself:

#### 1. install the required dependencies

```bash
poetry install --with docs
```

#### 2. create the datasets

```bash
poetry run python benchmarks/create_benchmark_df.py
```

#### 3. run the benchmark

```bash
poetry run python benchmarks/benchmark_column_performance.py
```

Results will be stored in the `benchmarks/results` directory.
By default each implementation will run for the following durations:

| Dataset name | Duration (seconds) |
| ------------ | ------------------ |
| mvv_xsmall | 20 |
| mvv_small | 20 |
| mvv_medium | 360 |
| mvv_large | 1200 |

These can be adjusted in benchmarks/benchmark_column_performance.py if a shorter or longer duration is desired.

#### 4. Visualize the results

```bash
poetry run python benchmarks/visualize_benchmarks.py
```

.svg files will be saved in the `benchmarks/images` directory.
5 changes: 5 additions & 0 deletions docs/learn_more/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Learn More

Deeper explanations of design decisions and use cases for Quinn

- [Convert PySpark DataFrame Columns to a Python List](column_to_list.md)
Loading

0 comments on commit 8b43774

Please sign in to comment.