In [None]:
from datetime import date, datetime
from json import dumps, loads
from pathlib import Path
from re import DOTALL, finditer, search
from shlex import join, split
from subprocess import run
from typing import Annotated as Ann
from typing import Literal, TypeAlias, TypedDict, overload

from devtools import pprint
from dotenv import load_dotenv
from gjob_dev.notebooks import disp_named
from gjob_pipeline.stages.convert import Convert as Params
from more_itertools import first
from pandas import DataFrame
from pydantic import BaseModel, Field
from seaborn import catplot


def just(*args: str):
    return run(
        args=[
            *split("pwsh -NonInteractive -NoProfile -CommandWithArgs"),
            f"./j.ps1 --color never {join(args)}",
        ],
        check=True,
        capture_output=True,
        encoding="utf-8",
    )


def prettier(*args: str):
    return just(*split(f"run prettier --no-color --write {join(args)}"))


load_dotenv(Path(just("sync-contrib-env-file").stdout.strip()))
PARAMS = None
"""Notebook stage parameters."""
Params.hide()

In [None]:
params = Params.model_validate_json(PARAMS) if isinstance(PARAMS, str) else Params()  # pyright: ignore[reportUnnecessaryIsInstance]
mbox = params.deps.mboxes / params.mbox_name
path = params.outs.reqs / f"{mbox.stem}.json"
pprint(params)

In [None]:
print(
    just(
        *split(f"proj::convert-mbox-to-json {mbox.as_posix()} {path.as_posix()}")
    ).stdout.strip()
)

In [None]:
class Message(BaseModel):
    sender: Ann[str, Field(alias="from")]
    recipient: Ann[str, Field(alias="to")]
    subject: str
    received: Ann[datetime, Field(alias="date")]
    body: str


messages = [
    Message(**message)
    for message in loads(path.read_text(encoding="utf-8"))
    if message["from"] == "Job Alerts from Google <notify-noreply@google.com>"
]

disp_named(
    ("Messages", [message.model_dump() for message in messages]),
    (
        "First message body",
        f"{messages[0].body}..." if len(messages[0].body) > 100 else messages[0].body,
    ),
)

In [None]:
class Alert(BaseModel):
    query: str
    search_location: str
    received: datetime
    jobs: list[str]
    footer: str


def get_newline(text: str) -> str:
    return "\r\n" if "\r\n" in text else "\n"


def get_alert(message: Message) -> Alert:
    newline = get_newline(message.body)
    if match := search(
        "".join([
            r"^",
            rf'"(?P<query>.+?)" in .+?{newline}',
            rf"(?P<search_location>.+?){newline}",
            rf"(?P<jobs>.+){newline * 5}",
            rf"(?P<footer>{''.join(['.+?', *(newline * 5)] * 3)})",
            r"$",
        ]),
        message.body,
        flags=DOTALL,
    ):
        return Alert(
            query=match["query"].strip(),
            search_location=match["search_location"].strip(),
            received=message.received,
            jobs=[
                m["job"].strip()
                for m in finditer(
                    rf"(?P<job>.+?){newline * 5}", match["jobs"].strip(), flags=DOTALL
                )
            ],
            footer="".join(match["footer"]).strip(),
        )
    raise ValueError("Could not parse alert")


alerts = [get_alert(message) for message in messages]
disp_named(
    ("Alerts", [{**alert.model_dump()} for alert in alerts]),
    (
        f'Jobs for first alert for "{alerts[0].query}" in {alerts[0].search_location}',
        alerts[0].model_dump(include={"jobs"}),
    ),
    (
        f'First job for first alert for "{alerts[0].query}" in {alerts[0].search_location}',
        first(alerts[0].jobs),
    ),
)

In [None]:
class Job(BaseModel):
    company: str
    location: str
    title: str
    posted: date
    source: str
    full_time: bool
    logo: str


def get_job(text: str) -> Job:
    newline = get_newline(text)
    sp = " "
    match = search(
        "".join([
            r"^",
            rf"(?P<logo>.+?){newline * 3}",
            rf"(?P<title>.+?){newline * 1}",
            rf"(?P<company>.+?){newline * 2}",
            rf"(?P<location>.+?){newline * 2}",
            rf"(?P<source>.+?){newline * 2}",
            r"Time icon (?P<posted>.+?)",
            r"(?P<full_time_specified>\sWork icon (?P<full_time>.+?))?",
            r"$",
        ]),
        text,
        flags=DOTALL,
    )
    if not match:
        raise ValueError("Could not parse job")
    sp = " "
    return Job(
        title=match["title"],
        company=match["company"],
        location=match["location"],
        source=match["source"].removeprefix("via").strip(),
        posted=datetime.strptime(
            sp.join([match["posted"], str(datetime.now().year).zfill(4)]), "%b %d %Y"
        ).date(),
        full_time=(
            match["full_time"].casefold() == "full-time"
            if match["full_time_specified"]
            else True
        ),
        logo=match["logo"],
    )


Jobs: TypeAlias = dict[str, dict[str, list[Job]]]

jobs: Jobs = {}
for alert in alerts:
    jobs_for_query = jobs.get(alert.query)
    if not jobs_for_query:
        jobs[alert.query] = {}
    jobs_for_search_location = jobs[alert.query].get(alert.search_location)
    if not jobs_for_search_location:
        jobs[alert.query][alert.search_location] = []
    jobs[alert.query][alert.search_location].extend([
        get_job(job) for job in alert.jobs
    ])


class DictJob(TypedDict):
    title: str
    company: str
    location: str
    source: str
    posted: date
    full_time: bool
    logo: str


DumpedJobs: TypeAlias = dict[str, dict[str, list[DictJob]]]
SerializedJobs: TypeAlias = dict[str, dict[str, list[dict[str, str]]]]


@overload
def dump_jobs(jobs: Jobs) -> DumpedJobs: ...
@overload
def dump_jobs(jobs: Jobs, mode: Literal["python"]) -> DumpedJobs: ...
@overload
def dump_jobs(jobs: Jobs, mode: Literal["json"]) -> SerializedJobs: ...
def dump_jobs(
    jobs: Jobs, mode: Literal["python", "json"] = "python"
) -> DumpedJobs | SerializedJobs:
    return {
        query: {
            search_location: [job.model_dump(mode=mode) for job in jobs]
            for search_location, jobs in jobs_for_query.items()
        }
        for query, jobs_for_query in jobs.items()
    }


reqs_path = params.outs.reqs / f"{mbox.stem}-reqs.json"
reqs_path.write_text(encoding="utf-8", data=dumps(dump_jobs(jobs, mode="json")))

disp_named(
    ("Jobs", dump_jobs(jobs, mode="json")),
    (
        f'First job for query "{first(jobs)}" in search location "{first(first(jobs.values()))}"',
        first(first(first(jobs.values()).values())).model_dump(),
    ),
    ("Prettier", prettier(reqs_path.as_posix()).stdout),
)

In [None]:
sp = " "
nyny = "New York, NY, United States"
newlines = ["\r\n", "\n"]
columns = {
    **dict.fromkeys(
        [
            "query",
            "country",
            "state_or_province",
            "city",
            "company",
            "title",
            "full_time",
            "posted",
            "source",
            "location",
        ],
        True,
    ),
    "posted": False,
}
drop = ["location"]
df = (
    DataFrame(
        data=(
            {"query": query, "country": search_location, **job}
            for query, jobs_for_query in dump_jobs(jobs).items()
            for search_location, jobs_for_search_location in jobs_for_query.items()
            for job in jobs_for_search_location
        ),
        columns=list(columns.keys()),
    )
    .replace(
        to_replace={
            "location": {
                **dict.fromkeys(
                    [
                        f"The DE Shaw Group, Two Manhattan West, 375 9th Ave 52nd Floor, New York,  {newline}NY, United States"
                        for newline in newlines
                    ],
                    nyny,
                ),
                **dict.fromkeys(
                    [
                        f"IBM Thomas J. Watson Research Center, 1101 Kitchawan Rd, Yorktown Heights,  {newline}NY, United States"
                        for newline in newlines
                    ],
                    nyny,
                ),
            }
        }
    )
    .assign(**{
        "state_or_province": lambda df: df["location"]
        .where(df["location"].str.count(",") > 0)
        .str.split(f",{sp}")
        .str[-2],
        "city": lambda df: df["location"]
        .where(df["location"].str.count(",") > 1)
        .str.split(f",{sp}")
        .str[0],
    })
    .sort_values(ascending=list(columns.values()), by=list(columns.keys()))
    .replace(to_replace={"state_or_province": {"Quebec": "QB"}})
)
df.drop(axis="columns", labels=drop).to_csv(params.outs.reqs / f"{mbox.stem}-reqs.csv")
df

In [None]:
query = "state_or_province == 'WA' & query == 'Research software engineer'"
aspect = 1.0
threshold = 0.00
g = catplot(
    aspect=aspect,
    errorbar=None,
    hue="city",
    kind="bar",
    x="city",
    y="count",
    data=(
        df.query(query)
        .assign(**{
            "city": df["city"],
            "count": df["city"].map(df["city"].value_counts()),
        })
        .pipe(lambda df: df[df["count"] > threshold * df["count"].max()])
        .sort_values("count", ascending=False)
    ),
)
query = "query == 'Research software engineer'"
aspect = 5.0
threshold = 0.07
catplot(
    errorbar=None,
    hue="state_or_province",
    kind="bar",
    x="state_or_province",
    y="count",
    data=(
        df.query(query)
        .assign(**{
            "city": df["state_or_province"],
            "count": df["state_or_province"].map(
                df["state_or_province"].value_counts()
            ),
        })
        .pipe(lambda df: df[df["count"] > threshold * df["count"].max()])
        .sort_values("count", ascending=False)
    ),
)
threshold = 0.05
catplot(
    aspect=aspect,
    errorbar=None,
    hue="city",
    kind="bar",
    x="city",
    y="count",
    data=(
        df.query(query)
        .assign(**{
            "city": df["city"],
            "count": df["city"].map(df["city"].value_counts()),
        })
        .pipe(lambda df: df[df["count"] > threshold * df["count"].max()])
        .sort_values("count", ascending=False)
    ),
);

In [None]:
query = "query == 'Thermal'"
aspect = 6.0
threshold = 0.07
catplot(
    errorbar=None,
    hue="state_or_province",
    kind="bar",
    x="state_or_province",
    y="count",
    data=(
        df.query(query)
        .assign(**{
            "city": df["state_or_province"],
            "count": df["state_or_province"].map(
                df["state_or_province"].value_counts()
            ),
        })
        .pipe(lambda df: df[df["count"] > threshold * df["count"].max()])
        .sort_values("count", ascending=False)
    ),
)
threshold = 0.05
catplot(
    aspect=aspect,
    errorbar=None,
    hue="city",
    kind="bar",
    x="city",
    y="count",
    data=(
        df.query(query)
        .assign(**{
            "city": df["city"],
            "count": df["city"].map(df["city"].value_counts()),
        })
        .pipe(lambda df: df[df["count"] > threshold * df["count"].max()])
        .sort_values("count", ascending=False)
    ),
);