## Scenario

In short: read records from multiple JSON and Parquet files with different schemas, explode/unnest and aggregate the data, and write the results to Postgres.

### Data

The data generated with `generate_example_data.py` for this scenario is in the `data/` directory and contains 25 JSON files and 5 Parquet files. Each JSON file and each row in the Parquet files contains a record with the following fields:
| Field | Optional | Data type | Possible values |
| --- | --- | --- | --- |
| id |  | int | [1, 600) |
| timestamp |  | str | ISO format timestamp |
| col1 | x | float | [100, 200) |
| col2 | x | float | [200, 300) |
| col3 | x | float | [300, 400) |
| tags | x | List[str] | "a", "b", "c", "d" |

### Task 1

Find the count and average of values in the fields `col1`, `col2`, and `col3` aggregated by the different tags. Write the result to Postgres as a new table.

### Task 2

Find the most common tag(s) for every month (ignoring the year). Write the result to Postgres as a new table.

### Plan of attack

We'll work through this problem in the following steps:
1. read JSON and Parquet files and combine schemas,
2. explode tags and get Task 1 aggregates,
3. convert str timestamp to actual timestamp and extract month,
4. get tag ranks by month and get top tags as a list by month for Task 2,
5. connect and write results to Postgres.

In [1]:
import duckdb

## 1. Read JSON and Parquet files and combine schemas

DuckDB supports globbing so reading multiple files is as easy as reading a single file. Since our files can have different schemas, we need to use the functions `read_json_auto()` and `read_parquet()` in order to use the parameter `union_by_name = true`.

In [2]:
# Read all Parquet files:
duckdb.sql("FROM read_parquet('data/*.parquet', union_by_name = true)").show()

┌───────┬─────────────────────┬──────────────┬────────────────────┬────────────────────┬────────────────────┐
│  id   │      timestamp      │     tags     │        col3        │        col2        │        col1        │
│ int64 │       varchar       │  varchar[]   │       double       │       double       │       double       │
├───────┼─────────────────────┼──────────────┼────────────────────┼────────────────────┼────────────────────┤
│   201 │ 2023-02-14T23:43:06 │ [c, d, b, a] │               NULL │               NULL │               NULL │
│   202 │ 2023-12-10T04:20:09 │ [c, b]       │  344.4384086978107 │ 283.82011872170756 │               NULL │
│   203 │ 2023-04-12T19:11:56 │ [a, d, b]    │  300.2019868412024 │ 250.26658507608727 │               NULL │
│   204 │ 2023-11-29T05:50:49 │ NULL         │               NULL │               NULL │ 160.47502928589932 │
│   205 │ 2023-07-24T18:16:09 │ [a]          │  389.3784272049234 │  240.8342356007753 │ 134.37312613968828 │
│   206 │ 

In [3]:
# Read all JSON files:
duckdb.sql("FROM read_json_auto('data/*.json', union_by_name = true)").show()

┌───────┬─────────────────────┬────────────────────┬────────────────────┬──────────────┬────────────────────┐
│  id   │      timestamp      │        col1        │        col3        │     tags     │        col2        │
│ int64 │       varchar       │       double       │       double       │  varchar[]   │       double       │
├───────┼─────────────────────┼────────────────────┼────────────────────┼──────────────┼────────────────────┤
│    10 │ 2023-03-19T15:02:04 │ 179.27596351197928 │  393.0434391894289 │ [d]          │               NULL │
│     9 │ 2023-06-26T21:56:05 │ 164.56291322757346 │ 331.81802543751616 │ [d]          │               NULL │
│     8 │ 2023-11-16T18:57:25 │               NULL │ 318.59194254112924 │ [b]          │               NULL │
│     7 │ 2023-07-19T03:39:52 │               NULL │  344.1654722833825 │ [d, b]       │ 200.66933318979386 │
│    19 │ 2023-03-27T05:11:16 │               NULL │               NULL │ NULL         │               NULL │
│     6 │ 

In [4]:
# UNION the results and save the query as a view:
query = """
CREATE OR REPLACE VIEW all_records_view AS (
    FROM read_parquet('data/*.parquet', union_by_name = true)
    UNION ALL BY NAME
    FROM read_json_auto('data/*.json', union_by_name = true)
)
"""
duckdb.sql(query)
duckdb.sql("FROM all_records_view ORDER BY id").show()

┌───────┬─────────────────────┬──────────────┬────────────────────┬────────────────────┬────────────────────┐
│  id   │      timestamp      │     tags     │        col3        │        col2        │        col1        │
│ int64 │       varchar       │  varchar[]   │       double       │       double       │       double       │
├───────┼─────────────────────┼──────────────┼────────────────────┼────────────────────┼────────────────────┤
│     1 │ 2023-02-09T06:28:40 │ [b]          │  359.8701591734018 │               NULL │ 116.18588359611803 │
│     2 │ 2023-05-14T23:19:16 │ [a, b]       │               NULL │               NULL │               NULL │
│     3 │ 2023-06-20T22:06:01 │ NULL         │               NULL │               NULL │               NULL │
│     4 │ 2023-02-01T09:24:39 │ [b, a, c]    │               NULL │  264.8946250873081 │ 153.48516545739275 │
│     5 │ 2023-02-10T04:00:55 │ NULL         │               NULL │               NULL │               NULL │
│     6 │ 

## 2. Explode tags and get Task 1 aggregates

To aggregate by tags, we'll explode the tag lists, i.e. we create a row for every tag in a list of tags. In DuckDB this is done with the function `unnest()`.

In [5]:
duckdb.sql("FROM all_records_view SELECT unnest(tags) AS tag, col1, col2, col3").show()

┌─────────┬────────────────────┬────────────────────┬───────────────────┐
│   tag   │        col1        │        col2        │       col3        │
│ varchar │       double       │       double       │      double       │
├─────────┼────────────────────┼────────────────────┼───────────────────┤
│ c       │               NULL │               NULL │              NULL │
│ d       │               NULL │               NULL │              NULL │
│ b       │               NULL │               NULL │              NULL │
│ a       │               NULL │               NULL │              NULL │
│ c       │               NULL │ 283.82011872170756 │ 344.4384086978107 │
│ b       │               NULL │ 283.82011872170756 │ 344.4384086978107 │
│ a       │               NULL │ 250.26658507608727 │ 300.2019868412024 │
│ d       │               NULL │ 250.26658507608727 │ 300.2019868412024 │
│ b       │               NULL │ 250.26658507608727 │ 300.2019868412024 │
│ a       │ 134.37312613968828 │  240.

One thing to note is that `unnest()` effectively drops all rows without tags. We might want to keep them to get the aggregated values for rows with no tags. We can manually add them back with a simple UNION.

In [6]:
query = """
FROM all_records_view SELECT unnest(tags) AS tag, col1, col2, col3
UNION ALL
FROM all_records_view SELECT 'NO TAGS' AS tag, col1, col2, col3
WHERE tags IS NULL
"""
duckdb.sql(query).show()

┌─────────┬────────────────────┬────────────────────┬────────────────────┐
│   tag   │        col1        │        col2        │        col3        │
│ varchar │       double       │       double       │       double       │
├─────────┼────────────────────┼────────────────────┼────────────────────┤
│ c       │               NULL │               NULL │               NULL │
│ d       │               NULL │               NULL │               NULL │
│ b       │               NULL │               NULL │               NULL │
│ a       │               NULL │               NULL │               NULL │
│ c       │               NULL │ 283.82011872170756 │  344.4384086978107 │
│ b       │               NULL │ 283.82011872170756 │  344.4384086978107 │
│ a       │               NULL │ 250.26658507608727 │  300.2019868412024 │
│ d       │               NULL │ 250.26658507608727 │  300.2019868412024 │
│ b       │               NULL │ 250.26658507608727 │  300.2019868412024 │
│ a       │ 134.373126139

Now all we are missing are the aggregates. Let's also save the result as a view.

In [7]:
query = """
CREATE OR REPLACE VIEW task1 AS (
    WITH cte AS (
        FROM all_records_view SELECT unnest(tags) AS tag, col1, col2, col3
        UNION ALL
        FROM all_records_view SELECT 'NO TAGS' AS tag, col1, col2, col3
        WHERE tags IS NULL
    )
    FROM cte SELECT tag, COUNT(col1), MEAN(col1), COUNT(col2), MEAN(col2), COUNT(col3), MEAN(col3)
    GROUP BY tag
)
"""
duckdb.sql(query)
duckdb.sql("FROM task1").show()

┌─────────┬─────────────┬────────────────────┬─────────────┬────────────────────┬─────────────┬────────────────────┐
│   tag   │ count(col1) │     mean(col1)     │ count(col2) │     mean(col2)     │ count(col3) │     mean(col3)     │
│ varchar │    int64    │       double       │    int64    │       double       │    int64    │       double       │
├─────────┼─────────────┼────────────────────┼─────────────┼────────────────────┼─────────────┼────────────────────┤
│ a       │          26 │ 147.36241567052772 │          22 │ 244.83179086145267 │          29 │  349.9492771150753 │
│ d       │          20 │  150.4693243437674 │          17 │ 246.87694825760414 │          19 │ 351.87354047508893 │
│ NO TAGS │          17 │ 148.85170712520215 │          16 │ 238.82129301737254 │          13 │  360.7105108776255 │
│ b       │          31 │ 141.89237957017806 │          30 │ 243.45421426701134 │          35 │  350.7843464869098 │
│ c       │          19 │  148.0404355046262 │          20 │  24

## 3. Convert str timestamp to actual timestamp and extract month

It would actually be quicker to just extract the month number with string slicing, but we'll go through with converting the text to timestamps and extracting the month out anyways to see how to work with timestamps in DuckDB.

Converting strings to timestamps is done with the function `strptime()`. Then, we can extract the month with the `extract()` function.

First, let's see how to use `strptime()`.

In [8]:
duckdb.sql("FROM all_records_view SELECT id, strptime(timestamp, '%xT%X')")

┌───────┬────────────────────────────────┐
│  id   │ strptime("timestamp", '%xT%X') │
│ int64 │           timestamp            │
├───────┼────────────────────────────────┤
│   201 │ 2023-02-14 23:43:06            │
│   202 │ 2023-12-10 04:20:09            │
│   203 │ 2023-04-12 19:11:56            │
│   204 │ 2023-11-29 05:50:49            │
│   205 │ 2023-07-24 18:16:09            │
│   206 │ 2023-09-04 10:29:01            │
│   207 │ 2023-06-29 14:47:20            │
│   208 │ 2023-03-20 09:52:24            │
│   209 │ 2023-02-10 11:53:49            │
│   210 │ 2023-12-23 10:54:51            │
│     · │          ·                     │
│     · │          ·                     │
│     · │          ·                     │
│    17 │ 2023-08-01 22:57:43            │
│     4 │ 2023-02-01 09:24:39            │
│    18 │ 2023-01-06 14:51:50            │
│    12 │ 2023-11-04 07:11:02            │
│    25 │ 2023-09-18 01:12:56            │
│    23 │ 2023-01-06 16:46:58            │
│    24 │ 2

Then, we extract the month.

In [9]:
duckdb.sql("FROM all_records_view SELECT id, extract(month FROM strptime(timestamp, '%xT%X'))")

┌───────┬─────────────────────────────────────────────────────────┐
│  id   │ main.date_part('month', strptime("timestamp", '%xT%X')) │
│ int64 │                          int64                          │
├───────┼─────────────────────────────────────────────────────────┤
│   201 │                                                       2 │
│   202 │                                                      12 │
│   203 │                                                       4 │
│   204 │                                                      11 │
│   205 │                                                       7 │
│   206 │                                                       9 │
│   207 │                                                       6 │
│   208 │                                                       3 │
│   209 │                                                       2 │
│   210 │                                                      12 │
│     · │                                       

And finally we add the exploding of tags. Note that since we are only interested in the count of each tag per month, we only need to select the month and tag columns.

In [10]:
duckdb.sql("FROM all_records_view SELECT extract(month FROM strptime(timestamp, '%xT%X')) AS month, unnest(tags) AS tag")

┌───────┬─────────┐
│ month │   tag   │
│ int64 │ varchar │
├───────┼─────────┤
│     2 │ c       │
│     2 │ d       │
│     2 │ b       │
│     2 │ a       │
│    12 │ c       │
│    12 │ b       │
│     4 │ a       │
│     4 │ d       │
│     4 │ b       │
│     7 │ a       │
│     · │ ·       │
│     · │ ·       │
│     · │ ·       │
│     9 │ d       │
│     9 │ b       │
│     1 │ a       │
│     1 │ d       │
│    12 │ c       │
│    12 │ b       │
│     9 │ d       │
│     9 │ b       │
│     9 │ c       │
│     9 │ a       │
├───────┴─────────┤
│    227 rows     │
│   (20 shown)    │
└─────────────────┘

## 4. Get tag ranks by month and get top tags as a list by month for Task 2

Now, we need to get the rank of the count of each tag per month. First, we'll aggregate tag counts per month and save the counts as a view.

In [11]:
query = """
CREATE OR REPLACE VIEW month_tag_counts AS (
    WITH month_tags AS (
        FROM all_records_view SELECT extract(month FROM strptime(timestamp, '%xT%X')) AS month, unnest(tags) AS tag
    )
    FROM month_tags
    SELECT month, tag, COUNT(1) as count
    GROUP BY ALL
)
"""
duckdb.sql(query)
duckdb.sql("FROM month_tag_counts ORDER BY month, tag")

┌───────┬─────────┬───────┐
│ month │   tag   │ count │
│ int64 │ varchar │ int64 │
├───────┼─────────┼───────┤
│     1 │ a       │     6 │
│     1 │ b       │     5 │
│     1 │ c       │     3 │
│     1 │ d       │     4 │
│     2 │ a       │     7 │
│     2 │ b       │     8 │
│     2 │ c       │     6 │
│     2 │ d       │     5 │
│     3 │ a       │     6 │
│     3 │ b       │     7 │
│     · │ ·       │     · │
│     · │ ·       │     · │
│     · │ ·       │     · │
│    10 │ c       │     2 │
│    10 │ d       │     4 │
│    11 │ a       │     2 │
│    11 │ b       │     3 │
│    11 │ c       │     2 │
│    11 │ d       │     2 │
│    12 │ a       │     6 │
│    12 │ b       │     8 │
│    12 │ c       │     5 │
│    12 │ d       │     3 │
├───────┴─────────┴───────┤
│   48 rows (20 shown)    │
└─────────────────────────┘

Now, we get the rank of each tag per month. We can do this in at least two ways:
- windowing and DENSE_RANK,
- filtering rows by selecting max count by month.

We'll do both just to see how to do windows and joins in DuckDB.

### DENSE_RANK

In [12]:
duckdb.sql("FROM month_tag_counts SELECT *, DENSE_RANK() OVER(PARTITION BY month ORDER BY count DESC) AS rank ORDER BY month, tag")

┌───────┬─────────┬───────┬───────┐
│ month │   tag   │ count │ rank  │
│ int64 │ varchar │ int64 │ int64 │
├───────┼─────────┼───────┼───────┤
│     1 │ a       │     6 │     1 │
│     1 │ b       │     5 │     2 │
│     1 │ c       │     3 │     4 │
│     1 │ d       │     4 │     3 │
│     2 │ a       │     7 │     2 │
│     2 │ b       │     8 │     1 │
│     2 │ c       │     6 │     3 │
│     2 │ d       │     5 │     4 │
│     3 │ a       │     6 │     2 │
│     3 │ b       │     7 │     1 │
│     · │ ·       │     · │     · │
│     · │ ·       │     · │     · │
│     · │ ·       │     · │     · │
│    10 │ c       │     2 │     2 │
│    10 │ d       │     4 │     1 │
│    11 │ a       │     2 │     2 │
│    11 │ b       │     3 │     1 │
│    11 │ c       │     2 │     2 │
│    11 │ d       │     2 │     2 │
│    12 │ a       │     6 │     2 │
│    12 │ b       │     8 │     1 │
│    12 │ c       │     5 │     3 │
│    12 │ d       │     3 │     4 │
├───────┴─────────┴───────┴─

We only need the most common tags, i.e. rank = 1, so let's filter the rest out.

In [13]:
query = """
WITH ranks AS (
    FROM month_tag_counts SELECT *, DENSE_RANK() OVER(PARTITION BY month ORDER BY count DESC) AS rank
)
FROM ranks
SELECT * EXCLUDE(rank)
WHERE rank = 1
ORDER BY month, tag
"""
duckdb.sql(query).show()

┌───────┬─────────┬───────┐
│ month │   tag   │ count │
│ int64 │ varchar │ int64 │
├───────┼─────────┼───────┤
│     1 │ a       │     6 │
│     2 │ b       │     8 │
│     3 │ b       │     7 │
│     3 │ c       │     7 │
│     3 │ d       │     7 │
│     4 │ b       │     6 │
│     5 │ a       │     6 │
│     6 │ b       │     4 │
│     7 │ b       │     8 │
│     8 │ a       │     5 │
│     8 │ b       │     5 │
│     9 │ a       │     8 │
│     9 │ b       │     8 │
│     9 │ c       │     8 │
│    10 │ a       │     4 │
│    10 │ b       │     4 │
│    10 │ d       │     4 │
│    11 │ b       │     3 │
│    12 │ b       │     8 │
├───────┴─────────┴───────┤
│ 19 rows       3 columns │
└─────────────────────────┘



### Self join

In [14]:
query = """
WITH max_counts AS (
    FROM month_tag_counts
    SELECT month, MAX(count) AS count
    GROUP BY month
)
FROM month_tag_counts
JOIN max_counts
ON month_tag_counts.month = max_counts.month AND month_tag_counts.count = max_counts.count
SELECT month_tag_counts.month, month_tag_counts.tag, month_tag_counts.count
ORDER BY month_tag_counts.month, month_tag_counts.tag
"""
duckdb.sql(query).show()

┌───────┬─────────┬───────┐
│ month │   tag   │ count │
│ int64 │ varchar │ int64 │
├───────┼─────────┼───────┤
│     1 │ a       │     6 │
│     2 │ b       │     8 │
│     3 │ b       │     7 │
│     3 │ c       │     7 │
│     3 │ d       │     7 │
│     4 │ b       │     6 │
│     5 │ a       │     6 │
│     6 │ b       │     4 │
│     7 │ b       │     8 │
│     8 │ a       │     5 │
│     8 │ b       │     5 │
│     9 │ a       │     8 │
│     9 │ b       │     8 │
│     9 │ c       │     8 │
│    10 │ a       │     4 │
│    10 │ b       │     4 │
│    10 │ d       │     4 │
│    11 │ b       │     3 │
│    12 │ b       │     8 │
├───────┴─────────┴───────┤
│ 19 rows       3 columns │
└─────────────────────────┘



Let's save this as a view to make the next step simpler.

In [15]:
query = """
CREATE OR REPLACE VIEW most_common_tags_by_month AS (
    WITH max_counts AS (
        FROM month_tag_counts
        SELECT month, MAX(count) AS count
        GROUP BY month
    )
    FROM month_tag_counts
    JOIN max_counts
    ON month_tag_counts.month = max_counts.month AND month_tag_counts.count = max_counts.count
    SELECT month_tag_counts.month, month_tag_counts.tag, month_tag_counts.count
    ORDER BY month_tag_counts.month, month_tag_counts.tag
)
"""
duckdb.sql(query)

### Aggregate list

We'd like to only have one row per month. For each month we want to have a list of the most common tags. We can accomplish this with the `list()` aggregate function.

In [16]:
query = """
CREATE OR REPLACE VIEW task2 AS (
    FROM most_common_tags_by_month
    SELECT month, list(tag ORDER BY tag) AS most_common_tags, count
    GROUP BY ALL
    ORDER BY month
)
"""
duckdb.sql(query)
duckdb.sql("FROM task2")

┌───────┬──────────────────┬───────┐
│ month │ most_common_tags │ count │
│ int64 │    varchar[]     │ int64 │
├───────┼──────────────────┼───────┤
│     1 │ [a]              │     6 │
│     2 │ [b]              │     8 │
│     3 │ [b, c, d]        │     7 │
│     4 │ [b]              │     6 │
│     5 │ [a]              │     6 │
│     6 │ [b]              │     4 │
│     7 │ [b]              │     8 │
│     8 │ [a, b]           │     5 │
│     9 │ [a, b, c]        │     8 │
│    10 │ [a, b, d]        │     4 │
│    11 │ [b]              │     3 │
│    12 │ [b]              │     8 │
├───────┴──────────────────┴───────┤
│ 12 rows                3 columns │
└──────────────────────────────────┘

Note the usage of ORDER BY inside the aggregate function `list(tag ORDER BY tag)`. DuckDB allows you to apply ordering like this inside aggregations. The typical aggregate functions of course are order insensitive, but `list()` is an example of an order *sensitive* aggregate function so this let's us make sure that the tags are in order in the list.

## 5. Connect and write results to Postgres

All that remains is to write the results to Postgres. This is very simple to do. All we need to do is to attach to a running Postgres instance with a URI or libpq connection string and then we can operate directly on the Postgres database with DuckDB commands.

For this example, I have Postgres running locally with user `postgres` having password `postgres`. Let's write the results to the default database for the user `postgres` to a new schema we'll create.

In [17]:
# Install and load the postgres extension for DuckDB
duckdb.sql("INSTALL postgres")
duckdb.sql("LOAD postgres")

# Connect to the local Postgres with a Postgres URI
# URI format: postgresql://username:password@host:port/database
duckdb.sql("ATTACH 'postgresql://postgres:postgres@localhost:5432/postgres' AS pg (TYPE postgres)")

# Create a schema for the results
duckdb.sql("CREATE SCHEMA pg.duckdbscenario")

# Create and load new tables for the results
duckdb.sql("CREATE TABLE pg.duckdbscenario.task1 AS FROM task1")
duckdb.sql("CREATE TABLE pg.duckdbscenario.task2 AS FROM task2")

If we want to make sure we actually created the tables in the new schema, we can switch the new schema as the default/current database and schema and use SHOW TABLES.

In [18]:
# The tables and views in the in-memory DuckDB database
duckdb.sql("SHOW TABLES").show()

# Switch to the new schema in the Postgres database and SHOW TABLES will only show the two new tables
duckdb.sql("USE pg.duckdbscenario")
duckdb.sql("SHOW TABLES").show()

┌───────────────────────────┐
│           name            │
│          varchar          │
├───────────────────────────┤
│ all_records_view          │
│ month_tag_counts          │
│ most_common_tags_by_month │
│ task1                     │
│ task2                     │
└───────────────────────────┘

┌─────────┐
│  name   │
│ varchar │
├─────────┤
│ task1   │
│ task2   │
└─────────┘



In [19]:
# Final SELECT * statements from the new tables to make sure they were loaded
duckdb.sql("FROM pg.duckdbscenario.task1 ORDER BY tag").show()

┌─────────┬─────────────┬────────────────────┬─────────────┬────────────────────┬─────────────┬───────────────────┐
│   tag   │ count(col1) │     mean(col1)     │ count(col2) │     mean(col2)     │ count(col3) │    mean(col3)     │
│ varchar │    int64    │       double       │    int64    │       double       │    int64    │      double       │
├─────────┼─────────────┼────────────────────┼─────────────┼────────────────────┼─────────────┼───────────────────┤
│ NO TAGS │          17 │  148.8517071252022 │          16 │ 238.82129301737257 │          13 │ 360.7105108776255 │
│ a       │          26 │ 147.36241567052775 │          22 │ 244.83179086145273 │          29 │ 349.9492771150753 │
│ b       │          31 │   141.892379570178 │          30 │ 243.45421426701134 │          35 │ 350.7843464869097 │
│ c       │          19 │ 148.04043550462617 │          20 │  244.4782917085122 │          20 │ 339.3808263120647 │
│ d       │          20 │  150.4693243437674 │          17 │ 246.8769482

In [20]:
duckdb.sql("FROM pg.duckdbscenario.task2").show()

┌───────┬──────────────────┬───────┐
│ month │ most_common_tags │ count │
│ int64 │    varchar[]     │ int64 │
├───────┼──────────────────┼───────┤
│     1 │ [a]              │     6 │
│     2 │ [b]              │     8 │
│     3 │ [b, c, d]        │     7 │
│     4 │ [b]              │     6 │
│     5 │ [a]              │     6 │
│     6 │ [b]              │     4 │
│     7 │ [b]              │     8 │
│     8 │ [a, b]           │     5 │
│     9 │ [a, b, c]        │     8 │
│    10 │ [a, b, d]        │     4 │
│    11 │ [b]              │     3 │
│    12 │ [b]              │     8 │
├───────┴──────────────────┴───────┤
│ 12 rows                3 columns │
└──────────────────────────────────┘

