In [None]:
from loguru import logger
import psycopg
from psycopg import Connection
from psycopg.sql import SQL
import tomli
from typing import Dict, Optional, Generator, List, TypedDict, TypeVar, Iterable, Callable, Any, Sequence
from pydantic import BaseModel
from pathlib import Path
import polars as pl
import plotly.express as px
import plotly.graph_objects as go
import plotly.subplots as sp
from IPython.display import display
import os

CONFIG_PATH = "../database/config.toml"

class DatabaseConfig(BaseModel):
    dbname: str
    user: str
    password: Optional[str]

class Config(BaseModel):
    database: DatabaseConfig

In [None]:
def to_kv_str(d: Dict[str, str]) -> str:
    """Convert dictionary to key-value string"""
    return " ".join(f"{k}={v}" for k, v in d.items())

def postgres_env_password() -> Optional[str]:
    """Get password from environment variable"""
    return os.environ.get("PGPASSWORD")

In [None]:
config_dict = {}
with open(Path(CONFIG_PATH), "rb") as f:
    config_dict = tomli.load(f)
config_obj = Config(**config_dict)
if not config_obj.database.password:
    config_obj.database.password = postgres_env_password()
# https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
conn_info = to_kv_str(config_obj.database.model_dump())

In [None]:
artist_post_count_query = """--sql
SELECT t.name    as tag_name,
       post_count
FROM booru.artists
         INNER JOIN booru.artist_tags_assoc ata on artists.id = ata.artist_id
         INNER JOIN booru.tags t on ata.tag_id = t.id
         -- INNER JOIN booru.tag_post_counts tpc on t.id = tpc.tag_id
         INNER JOIN booru.view_artist_tag_no_comic tpc on t.id = tpc.tag_id
GROUP BY t.id, artist_id, t.name, post_count;
"""

In [None]:
def get_df_by_sql(sql: str) -> pl.DataFrame:
    """Get dataframe by SQL"""
    with psycopg.connect(conninfo=conn_info) as conn:
        with conn.cursor() as cur:
            cur.execute(sql)
            rows = cur.fetchall()
            assert cur.description is not None
            column_names = [desc[0] for desc in cur.description]
            return pl.DataFrame(rows, schema=column_names)

In [None]:
artist_post_df = get_df_by_sql(artist_post_count_query)
artist_post_df.describe()

In [None]:
post_count_less_n = artist_post_df.filter(
  pl.col("post_count") < 50)
fig = px.histogram(post_count_less_n, x="post_count")
fig.update_layout(
    title="Post Count Distribution (Post Count < 100)",
    xaxis_title="Post Count",
    yaxis_title="Number of Artists",
)
fig.show()

In [None]:
post_count_less_n = artist_post_df.filter(
  pl.col("post_count") > 50).filter(pl.col("post_count") < 1000)
fig = px.histogram(post_count_less_n, x="post_count")
fig.update_layout(
    title="Post Count Distribution (Post Count < 100)",
    xaxis_title="Post Count",
    yaxis_title="Number of Artists",
)
fig.show()

In [None]:
post_count_less_n = artist_post_df.filter(
  pl.col("post_count") > 1000).filter(pl.col("post_count") < 6000)
fig = px.histogram(post_count_less_n, x="post_count")
fig.update_layout(
    title="Post Count Distribution (Post Count < 100)",
    xaxis_title="Post Count",
    yaxis_title="Number of Artists",
)
fig.show()

In [None]:
aspect_ratio_bucket_query = """--sql
SELECT aspect_ratio_bucket, count(*)
FROM booru.view_post_aspect_ratio
GROUP BY aspect_ratio_bucket;
"""

ar_buckets_ranges = [
    "(0, 9/21]",
    "(9/21, 9/16]",
    "(9/16, 3/4]",
    "(3/4, 1)",
    "1",
    "(1, 4/3]",
    "(4/3, 16/9]",
    "(16/9, 21/9]",
    "(21/9, inf)",
]

aspect_ratio_bucket_df = get_df_by_sql(aspect_ratio_bucket_query)

In [None]:
names = aspect_ratio_bucket_df["aspect_ratio_bucket"].to_list()
values = aspect_ratio_bucket_df["count"].to_list()
ar_dict = dict(zip(names, values))

In [None]:
def to_vhs_ar(ar_dict: Dict[str, int]) -> Dict[str, int]:
    return {
      "vertical": ar_dict["(0, 9/21]"] + ar_dict["(9/21, 9/16]"] + ar_dict["(9/16, 3/4]"] + ar_dict["(3/4, 1)"],
      "horizontal": ar_dict["(1, 4/3]"] + ar_dict["(4/3, 16/9]"] + ar_dict["(16/9, 21/9]"] + ar_dict["(21/9, inf)"],
      "square": ar_dict["1"]
    }

v_h_s_ar = to_vhs_ar(ar_dict)
trace_vhs = go.Pie(labels=list(v_h_s_ar.keys()), values=list(v_h_s_ar.values()))
trace_ar = go.Pie(labels=names, values=values)

fig = sp.make_subplots(rows=1, cols=2, specs=[[{"type": "domain"}, {"type": "domain"}]])
fig.add_trace(trace_vhs, 1, 1)
fig.add_trace(trace_ar, 1, 2)
fig.update_layout(
    title="Aspect Ratio Distribution",
)

fig.show()

In [None]:
artists_name = [
    "hiten_(hitenkei)",
    "hagi_(ame_hagi)",
    "shion_(mirudakemann)",
    "as109",
    "toosaka_asagi",
    "kantoku",
    "niliu_chahui",
    "atdan",
    "himitsu_(hi_mi_tsu_2)",
    "lm7_(op-center)",
    "terada_tera",
]

artist_name = artists_name[-1]
artist_posts_query = f"""--sql
WITH a_tag AS (SELECT booru.get_tag_id_by_artist_name('{artist_name}') AS id)
SELECT p.id as post_id, p.created_at, p.fav_count, p.score, p.width, p.height, ppa.aspect_ratio_bucket
FROM booru.posts_tags_assoc pta
         INNER JOIN booru.posts p ON pta.post_id = p.id
         INNER JOIN a_tag ON pta.tag_id = a_tag.id
         INNER JOIN booru.view_post_aspect_ratio ppa ON p.id = ppa.id;
"""

artist_posts_df = get_df_by_sql(artist_posts_query)

In [None]:
artist_posts_df.describe()

In [None]:
artist_df_with_year = artist_posts_df.with_columns([pl.col("created_at").dt.year().alias("year")])
fig = px.histogram(artist_df_with_year, x="year")
fig.update_layout(
    title=f"{artist_name} Post Count by Year",
    xaxis_title="Year",
    yaxis_title="Number of Posts",
)
fig.show()

In [None]:
artist_ar_count_df = artist_posts_df.groupby(pl.col("aspect_ratio_bucket")).agg(pl.col("post_id").count().alias("count"))
artist_vhs_ar = to_vhs_ar(dict(zip(artist_ar_count_df["aspect_ratio_bucket"].to_list(), artist_ar_count_df["count"].to_list())))

fig = sp.make_subplots(rows=1, cols=2, specs=[[{'type':'pie'}, {'type':'pie'}]])
fig.add_trace(go.Pie(labels=list(artist_vhs_ar.keys()), values=list(artist_vhs_ar.values())), row=1, col=1)
fig.add_trace(go.Pie(labels=artist_ar_count_df["aspect_ratio_bucket"].to_list(), values=artist_ar_count_df["count"].to_list()), row=1, col=2)
fig.update_layout(
    title=f"{artist_name} Post Count by Aspect Ratio",
)

fig.show()

In [None]:
artist_df_with_year_score = artist_df_with_year.groupby("year").agg([
    pl.col("score").mean().alias("avg_score"),
    pl.col("score").median().alias("median_score"),
    pl.col("post_id").count().alias("post_count"),
    pl.col("fav_count").mean().alias("avg_fav_count"),
    pl.col("fav_count").median().alias("median_fav_count"),
])

# display(artist_df_with_year_score)

fig = go.Figure()

fig.add_trace(
    go.Bar(
        x=artist_df_with_year_score["year"], 
        y=artist_df_with_year_score["avg_score"], 
        name='Average Score'
    )
)

fig.add_trace(
    go.Bar(
        x=artist_df_with_year_score["year"], 
        y=artist_df_with_year_score["median_score"], 
        name='Median Score',
        visible="legendonly",
    )
)

fig.add_trace(
    go.Bar(
        x=artist_df_with_year_score["year"], 
        y=artist_df_with_year_score["avg_fav_count"],
        name='Average Fav Count',
    )
)

fig.add_trace(
    go.Bar(
        x=artist_df_with_year_score["year"], 
        y=artist_df_with_year_score["median_fav_count"], 
        name='Median Fav Count',
        visible="legendonly",
    )
)

fig.add_trace(
    go.Bar(
        x=artist_df_with_year_score["year"], 
        y=artist_df_with_year_score["post_count"], 
        name='Post Count',
        yaxis='y2',
        opacity=0.5
    )
)

fig.update_layout(
    title=f"{artist_name} Average and Median Score by Year",
    xaxis_title="Year",
    yaxis_title="Score",
    # This makes the bars grouped instead of stacked
    barmode='group',
    yaxis2=dict(
        title="Post Count",
        overlaying="y",
        side='right'
    )
)

# Show the plot
fig.show()