In [1]:
% load_ext memory_magics
# imports
import os
import random
from pathlib import Path

import numpy as np

In [2]:
# consts
_data_root = Path(os.sep.join(["/", "Users", "toby.devlin", "dev", "projects", "data-processing", "data"]))
crime_path = (_data_root / "Crime_Data_from_2020_to_Present.csv").resolve()
reddit_path = (_data_root / "reddit_account_data.csv").resolve()
print(crime_path)
print(reddit_path)

np.random.seed(0)
random.seed(0)

/Users/toby.devlin/dev/projects/data-processing/data/Crime_Data_from_2020_to_Present.csv
/Users/toby.devlin/dev/projects/data-processing/data/reddit_account_data.csv


# What we are doing
We have 3 files, as printed above.
These have Various schemas & we want to do some processing in various tools to test performance. The tests themselves make no sense but the data is large enough and  "real" enough to be somewhat reflective of a real world scenario without any need to adjust for random generators.

## Data Sourcing
- [Crime_Data_from_2020_to_Present.csv](Crime_Data_from_2020_to_Present.csv) -> https://catalog.data.gov/dataset/crime-data-from-2020-to-present
- [reddit_account_data.csv](reddit_account_data.csv) -> https://files.pushshift.io/reddit/

## The Test:
1. Read in `Crime_Data_from_2020_to_Present.csv` to a frame, converting all datetime to native datetime format: `Date Rptd`, `DATE OCC`
2. Filter this frame such that:
   1. Column `Vict Age` > 15
   2. Use only records in the quantiles 0.05 -> 0.95 for `LAT` and `LON`
3. Read in the `reddit_account_data.csv` to a frame, converting all datetime to native datetime format: `created_utc`, `updated_on`
4. Filter this frame such that:
   1. None of the account names end with the number 7
5. Create a column in the `account_data` frame, and assign each of the users a `group` value, uniformly, from the `crime_data.DR_NO` column
6. Join these two frames together on `account_data.group == crime_data.DR_NO`
7. Select only those who are NOT part of the following groups:
    1. `CC`
    2. `AA`
8. Group these by the first 2 letters of their name and calculate:
   1. The average time since reporting the crime
   2. The number of members in the group
   3. The user with the highest & lowest Karma scores
   4. The average time since creating an account

## What we are monitoring
- Time taken, using the timeit module
- Memory usage, using [rusmux's ipython-memory-magics](https://github.com/rusmux/ipython-memory-magics), which should just look at the cells memory usage.

# Before we start
The files themselves should be looked at first; their size, shape and location on disk. todo: also stream from s3

| file                                |                       size |    records | columns |
|-------------------------------------|---------------------------:|-----------:|:--------|
| Crime_Data_from_2020_to_Present.csv |   168,553,714 bytes (161M) |    659,640 | 28      |
| reddit_account_data.csv             | 3,298,086,717 bytes (3.1G) | 69,382,538 | 6       |

The both these frames are relatively small, but we are hoping to show there is some extra on disk processing we can do to speed things up, for example polars will push down predicates and scan filters on disk.

# The Code

In [None]:
import logging

import polars as pl

log_polars = logging.getLogger(__name__)


def do_polars_test():
    log_polars.info(f"using file crime_path: {crime_path}")
    log_polars.info(f"using file reddit_path: {reddit_path}")
    # 1. Read in `Crime_Data_from_2020_to_Present.csv` to a frame, converting all datetime to native datetime format: `Date Rptd`, `DATE OCC`
    df_crime = pl.scan_csv(crime_path, try_parse_dates=True)
    df_reddit = pl.scan_csv(reddit_path, try_parse_dates=True)

    distinct_ids = df_crime.select([pl.col("DR_NO").unique()]).collect().to_series()

    q0 = (
        # 3. Read in the `reddit_account_data.csv` to a frame, converting all datetime to native datetime format: `created_utc`, `updated_on`
        df_reddit
        # 4. Filter this frame such that:
        #    1. None of the account names end with a number
        .filter(~pl.col("name").str.contains("abc")).filter(~pl.col("name").str.ends_with("7"))
        # 5. Create a column in the `account_data` frame, and assign each of the users a `group` value, uniformly, from the `crime_data.DR_NO` column
        .with_columns(
            [
                pl.col("name").apply(lambda _: distinct_ids.sample(seed=0, with_replacement=True)[0]).alias("DR_NO"),
                pl.from_epoch(pl.col("created_utc"), unit="s").alias("created_utc"),
            ]
        )
    )

    q1 = (
        df_crime
        # 2. Filter this frame such that:
        #    1. Column `Vict Age` > 15
        .filter(pl.col("Vict Age") > 15)
        #    2. Use only records in the quantiles 0.05 -> 0.95 for `LAT` and `LON`
        .filter(pl.col("LAT").quantile(0.05) < pl.col("LAT"))
        .filter(pl.col("LAT").quantile(0.95) > pl.col("LAT"))
        .filter(pl.col("LON").quantile(0.05) < pl.col("LON"))
        .filter(pl.col("LON").quantile(0.95) > pl.col("LON"))
        # 6. Join these two frames together on `account_data.Group == crime_data.Status`
        .join(q0, on="DR_NO")
        # 7. Select only those who are NOT part of the following groups:
        #     1. `CC`
        #     2. `AA`
        .filter(~pl.col("Status").is_in(("CC", "AA")))
        # 8. Group these by the lower case first letter of their name and calculate:
        .with_columns(
            [
                pl.col("name").apply(lambda name: name[0].lower()).alias("grp_by"),
                (pl.col("comment_karma") + pl.col("link_karma")).alias("karma_sum"),
            ]
        )
        .groupby("grp_by")
        .agg(
            [
                #    1. The average time since reporting the crime as `mean_time_since_reports`
                pl.col("Date Rptd")
                .drop_nulls()
                .apply(lambda x: x - datetime.now())
                .mean()
                .alias("mean_time_since_reports"),
                #    2. The number of members in the group as `count_members`
                pl.col("name").count().alias("count_members"),
                #    3. The user with the highest & lowest Karma scores as `highest_karma` and `lowest_karma`
                pl.col("karma_sum").max().alias("highest_karma"),
                pl.col("karma_sum").min().alias("lowest_karma"),
                #    4. The average time since creating an account as `mean_time_since_account_open`
                pl.col("created_utc")
                .drop_nulls()
                .apply(lambda x: x - datetime.now())
                .mean()
                .alias("mean_time_since_account_open"),
            ]
        )
    )
    return q1.collect()


In [None]:
import logging

import numpy as np
import pandas as pd

from data_processing.const import crime_path, reddit_path

log_pandas = logging.getLogger(__name__)


def do_pandas_test():
    log_pandas.info(f"using file crime_path: {crime_path}")
    log_pandas.info(f"using file reddit_path: {reddit_path}")
    # Pandas
    # 1. Read in `Crime_Data_from_2020_to_Present.csv` to a frame, converting all datetime to native datetime format: `Date Rptd`, `DATE OCC`
    df_crime = pd.read_csv(
        crime_path, parse_dates=["Date Rptd", "DATE OCC"], dtype={"Vict Age": int, "LAT": float, "LON": float}
    )

    # 2. Filter this frame such that:
    #    1. Column `Vict Age` > 15
    #    2. Use only records in the quantiles 0.05 -> 0.95 for `LAT` and `LON`
    df_crime = df_crime[df_crime["Vict Age"] > 15]
    df_crime = df_crime[df_crime["LAT"].quantile(0.05) < df_crime["LAT"]]
    df_crime = df_crime[df_crime["LAT"] < df_crime["LAT"].quantile(0.95)]
    df_crime = df_crime[df_crime["LON"].quantile(0.05) < df_crime["LON"]]
    df_crime = df_crime[df_crime["LON"] < df_crime["LON"].quantile(0.95)]

    # 3. Read in the `reddit_account_data.csv` to a frame, converting all datetime to native datetime format: `created_utc`, `updated_on`
    df_reddit = pd.read_csv(reddit_path)
    df_reddit["created_utc"] = pd.to_datetime(df_reddit["created_utc"], unit="s")
    df_reddit["updated_on"] = pd.to_datetime(df_reddit["updated_on"], unit="s")

    # 4. Filter this frame such that:
    #    1. None of the account names end in the number 7
    #    2. None of the account names have the string abc in them
    df_reddit = df_reddit[df_reddit["name"].str.contains("abc") == False]
    df_reddit = df_reddit[df_reddit["name"].str.endswith("7") == False]
    # df_reddit = df_reddit[df_reddit["name"].str.match("(.*abc.*|.*7$)") == False]

    # 5. Create a column in the `account_data` frame, and assign each of the users a `group` value, uniformly, from the `crime_data.DR_NO` column
    rng = np.random.default_rng()
    unique = df_crime["DR_NO"].unique()
    df_reddit["group"] = rng.choice(unique, len(df_reddit))

    # 6. Join these two frames together on `account_data.group == crime_data.DR_NO`
    df_merge = df_reddit.merge(df_crime, how="inner", right_on="DR_NO", left_on="group")

    # 7. Select only those who are NOT part of the following Statuses:
    #     1. `CC`
    #     2. `AA`
    df_merge = df_merge[~df_merge["Status"].isin(("CC", "AA"))]

    df_merge["grp_by"] = [s[0].lower() for s in df_merge["name"]]
    df_merge["karma_sum"] = df_merge["comment_karma"] + df_merge["link_karma"]
    df_merge["since_rptd"] = df_merge["Date Rptd"] - pd.Timestamp.now()
    df_merge["since_open"] = df_merge["created_utc"] - pd.Timestamp.now()
    groups = df_merge.groupby("grp_by")

    # 8. Group these by the lower case first letter of their name and calculate:
    res = groups.agg(
        #    1. The average time since reporting the crime as `mean_time_since_reports`
        mean_time_since_reports=pd.NamedAgg(column="since_rptd", aggfunc=np.mean),
        #    2. The number of members in the group as `count_members`
        count_members=pd.NamedAgg(column="id", aggfunc="count"),
        #    3. The user with the highest & lowest Karma scores as `highest_karma` and `lowest_karma`
        highest_karma=pd.NamedAgg(column="karma_sum", aggfunc=np.max),
        lowest_karma=pd.NamedAgg(column="karma_sum", aggfunc=np.min),
        #    4. The average time since creating an account as `mean_time_since_account_open`
        mean_time_since_account_open=pd.NamedAgg(column="since_open", aggfunc=np.mean),
    )
    return res


The code is pretty well explained, so next up is how we run it: Unfortunately I was [having trouble](https://github.com/actions/setup-python/issues/350) with running the code using a GitHub runner. Below is a very simple benchmark harness that i will try using.

In [None]:
import json
import logging
import time
from datetime import datetime
from typing import Callable

from data_processing.const import out_path
from data_processing.pandas import do_pandas_test
from data_processing.polars import do_polars_test

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)8s - %(message)s")

log = logging.getLogger(__name__)


def do_test(func: Callable):
    n = 5
    run_times = dict()
    log.info(f"start {func.__name__}")
    for i in range(n):
        counter_start = time.perf_counter_ns()
        func()
        counter_end = time.perf_counter_ns() - counter_start
        log.info(f"<<< Ending {func.__name__} round {i} after {counter_end}ns")
        run_times[n] = {"ns taken": counter_end - counter_start}

    with open(out_path / f"{func.__name__}.{datetime.now().isoformat()}.json", "w+") as f:
        json.dump(run_times, f)


do_test(do_polars_test)
do_test(do_pandas_test)

This spat out the following results:

```bash
```