In [None]:
"""
This notebook allows rapid analysis of fishing patterns in a cow-based playtonic environment.
Data is extracted from Chatterino logs.
"""

import pathlib
import platform
import re

import matplotlib.pyplot as plt
import pandas as pd

%matplotlib widget

In [None]:
CHANNEL_NAME = "crelly"

MAX_INTERVAL = pd.Timedelta(hours=2)
MAX_Z_SCORE = 2.0

AF_METRIC_GAIN = 1_000.0
AF_METRIC_COUNT_EXPONENT = 1.0  # 0.5
AF_METRIC_TD_MEAN_EXPONENT = -1.0
AF_METRIC_TD_STD_EXPONENT = -1.0
AF_METRIC_TD_KURT_OFFSET = 2.0
AF_METRIC_TD_KURT_EXPONENT = -1.0  # 0.0
AF_METRIC_TD_SKEW_EXPONENT = 1.0  # 2.0

HIST_BINS = "stone"

if False:
    START_DATETIME = pd.Timestamp("01/01/0001")
    END_DATETIME = pd.Timestamp.now()
    MIN_CASTS = 100
else:
    START_DATETIME = pd.Timestamp.now() - pd.Timedelta(days=3)
    END_DATETIME = pd.Timestamp.now()
    MIN_CASTS = 25

In [None]:
# Chatterino log path for chat data.
base_dir = pathlib.Path.home()
match platform.system():
    case "Windows":
        base_dir = base_dir / "appdata/roaming/Chatterino2/"
    case "Linux":
        base_dir = base_dir / ".local/share/chatterino/"
    case "Darwin":
        base_dir = base_dir / "Library/Application Support/chatterino/"
    case _:
        raise RuntimeError(
            f"Cannot determine platform, or platform not supported: {platform.system()}"
        )
base_dir = base_dir / f"Logs/Twitch/Channels/{CHANNEL_NAME}"
print(f"Log file location: {base_dir}")

# Glob for `{channel name}-YYYY-MM-DD.log`
file_list = sorted(base_dir.glob(f"{CHANNEL_NAME}-????-??-??.log"))

# Regex for cast: `[HH:MM:SS] username: !fish*` with capture groups for `HH:MM:SS` and `username`
p = re.compile(r"^\[(\d\d:\d\d:\d\d(?:\.\d\d\d)?)\] (.*): !fish.*$")

In [None]:
# Utility functions
def timestamps_to_filtered_timedeltas(
    timestamps: pd.DataFrame | pd.Series,
    return_filtered_timestamps: bool = False,
    max_interval: pd.Timedelta = MAX_INTERVAL,
    max_z_score: float = MAX_Z_SCORE,
) -> pd.DataFrame | pd.Series:
    # Convert timestamps to timedeltas
    timedeltas = timestamps.diff()
    timestamps = timestamps[1:]

    # Remove large gaps
    mask = timedeltas <= max_interval
    timedeltas = timedeltas[mask]
    timestamps = timestamps[mask]

    # Z-score outlier rejection
    mask = (timedeltas <= timedeltas.mean() + max_z_score * timedeltas.std()) & (
        timedeltas >= timedeltas.mean() - max_z_score * timedeltas.std()
    )
    timedeltas = timedeltas[mask]
    timestamps = timestamps[mask]

    if return_filtered_timestamps:
        # Return time since last cast in seconds and filtered timestamps.
        return timedeltas.dt.total_seconds(), timestamps
    else:
        # Return time since last cast in seconds.
        return timedeltas.dt.total_seconds()


# Custom timedelta stats functions
def td_mean(timestamps: pd.DataFrame | pd.Series) -> float:
    return timestamps_to_filtered_timedeltas(timestamps).mean()


def td_std(timestamps: pd.DataFrame | pd.Series) -> float:
    return timestamps_to_filtered_timedeltas(timestamps).std()


def td_kurt(timestamps: pd.DataFrame | pd.Series) -> float:
    return timestamps_to_filtered_timedeltas(timestamps).kurt()


def td_skew(timestamps: pd.DataFrame | pd.Series) -> float:
    return timestamps_to_filtered_timedeltas(timestamps).skew()

In [None]:
# Iterate through logs and extract data
fishing_data = []
for file in file_list:
    with open(file, encoding="utf-8") as f:
        date_str = file.stem.split("-", maxsplit=1)[-1]  # Get date from file name
        lines = f.readlines()[1:]  # Get all lines except for header
        fish_matches = [p.match(line) for line in lines]  # Match with regex compiled above

        # Find matches, extract data, pack into tuple of (timestamp, username) for cast
        fish_matches = [
            (pd.Timestamp(date_str + " " + match[1]), match[2])
            for match in fish_matches
            if match is not None
        ]
        fishing_data.extend(fish_matches)

# Create dataframe and filter timestamps to include
df = pd.DataFrame(fishing_data, columns=("timestamp", "username"))
df = df[df["timestamp"] > START_DATETIME]
df = df[df["timestamp"] < END_DATETIME]
df

In [None]:
active_users = df.groupby("username")["username"].value_counts().sort_values(ascending=False)
print(active_users)

if False:
    for target_user in active_users.index[:25]:
        timestamps = df[df["username"] == target_user]["timestamp"]
        timedeltas, timestamps = timestamps_to_filtered_timedeltas(
            timestamps, return_filtered_timestamps=True
        )
        fig, axs = plt.subplots(2, figsize=(10, 10))

        axs[0].scatter(timestamps.values, timedeltas.values, s=1)
        axs[0].grid()
        axs[0].set_xlabel("Timestamp")
        axs[0].set_ylabel("Time between casts [s]")

        fig_hist = timedeltas.hist(bins=100, ax=axs[1])
        axs[1].set_xlabel("Time between casts [s]")
        axs[1].set_ylabel("Count [e.a.]")

        fig.suptitle(f"Fishing Cast Time Deltas and Histogram for: {target_user}")
        fig.tight_layout()

In [None]:
# Get stats and remove users without enough data
df_fishers = df.groupby("username")["timestamp"].agg(["count", td_mean, td_std, td_kurt, td_skew])
df_fishers = df_fishers[df_fishers["count"] > MIN_CASTS]
df_fishers = df_fishers[~df_fishers["td_kurt"].isna()]

# Calculate the af-metric from stastical analysis of timedeltas
df_fishers["af_metric"] = (
    AF_METRIC_GAIN
    * df_fishers["count"] ** AF_METRIC_COUNT_EXPONENT
    * df_fishers["td_mean"] ** AF_METRIC_TD_MEAN_EXPONENT
    * df_fishers["td_std"] ** AF_METRIC_TD_STD_EXPONENT
    * (df_fishers["td_kurt"] + AF_METRIC_TD_KURT_OFFSET).abs() ** AF_METRIC_TD_KURT_EXPONENT
    * df_fishers["td_skew"].abs() ** AF_METRIC_TD_SKEW_EXPONENT
)
df_fishers = df_fishers.sort_values("count", ascending=False)

# They are sus...
sus_players = df_fishers.sort_values("af_metric", ascending=False)
sus_players[:25]

In [None]:
def plot_scatter_and_hist_old(timestamps: pd.Series, timedeltas: pd.Series) -> plt.Figure:
    fig, axs = plt.subplots(2, figsize=(10, 5))

    axs[0].scatter(timestamps.values, timedeltas.values, s=1)
    # axs[0].set_ylim(50, 5000)
    # axs[0].semilogy()
    axs[0].grid()
    axs[0].set_xlabel("Timestamp")
    axs[0].set_ylabel("Time between casts [s]")

    _ = timedeltas.hist(bins=100, ax=axs[1])
    axs[1].set_xlabel("Time between casts [s]")
    axs[1].set_ylabel("Count [e.a.]")

    return fig


def plot_scatter_and_hist(x: pd.Series, y: pd.Series, target_user: str) -> plt.Figure:
    # Set up gridspec and axes
    fig = plt.figure(figsize=(10, 10))
    gs = fig.add_gridspec(
        2,
        2,
        width_ratios=(7, 2),
        height_ratios=(2, 7),
        left=0.1,
        right=0.95,
        bottom=0.1,
        top=0.95,
        wspace=0.05,
        hspace=0.05,
    )
    ax = fig.add_subplot(gs[1, 0])
    ax_histx = fig.add_subplot(gs[0, 0], sharex=ax)
    ax_histy = fig.add_subplot(gs[1, 1], sharey=ax)
    ax_box = fig.add_subplot(gs[0, 1])

    # Scatter plot
    ax.scatter(x, y, s=1)
    ax.tick_params("x", rotation=45)
    ax.set_xlabel("Timestamp [ET]")
    ax.set_ylabel("Time Between Cases [s]")

    # x-axis histogram
    ax_histx.hist(x, bins=HIST_BINS)
    ax_histx.tick_params(axis="x", labelbottom=False)
    ax_histx.set_ylabel("Count [ea.]")

    # y-axis histogram
    ax_histy.hist(y, bins=HIST_BINS, orientation="horizontal")
    ax_histy.tick_params(axis="y", labelleft=False)
    ax_histy.set_xlabel("Count [ea.]")

    # Text box
    ax_box.text(0.0, 0.0, y.describe())
    ax_box.axis("off")

    # Add title and return
    fig.suptitle(f"Fishing Cast Time Deltas and Histogram for: {target_user}")
    return fig

In [None]:
target_users = sus_players[:25].index
# target_users = ["oleccy", "lonely_lulu", "mwmwr", "DeadPixel21"]

for target_user in target_users:
    x = df[df["username"] == target_user]["timestamp"]
    y, x = timestamps_to_filtered_timedeltas(x, return_filtered_timestamps=True)
    fig = plot_scatter_and_hist(x, y, target_user)