In [1]:
import math
import os
import ast

import sqlparse
import numpy as np
import pandas as pd
from sqlalchemy import MetaData, Table, literal_column, Column
from sqlalchemy.sql import select, func, or_
from sqlalchemy.dialects.mysql import insert
from sqlalchemy.dialects.mysql.types import VARCHAR, TINYINT, TEXT

from f3_data_builder import (mysql_connection,
region_queries, 
pull_beatdowns, 
pull_aos, 
pull_beatdowns, 
pull_attendance, 
build_users, 
build_aos, 
build_beatdowns, 
build_attendance,
home_region_query
)

In [2]:
from typing import Any, Tuple, Hashable, Iterable
from sqlalchemy.engine import Engine
from pandas._libs.missing import NAType
from _mysql_connector import MySQLInterfaceError

In [3]:
engine = mysql_connection()

In [4]:
metadata = MetaData()
metadata.reflect(engine, schema="weaselbot")

Table("regions", 
      metadata, 
      Column("region", VARCHAR(length=45), nullable=False, primary_key=True), 
      Column("slack_token", VARCHAR(length=90), nullable=False),
      Column("schema_name", VARCHAR(length=45), nullable=True, default=None),
      Column("active", TINYINT(), default=1),
      Column("firstf_channel", VARCHAR(length=45), nullable=True, default=None),
      Column("contact", VARCHAR(length=45), nullable=True, default=None),
      Column("send_pax_charts", TINYINT(), default=0),
      Column("send_ao_leaderboard", TINYINT(), default=0),
      Column("send_q_charts", TINYINT(), default=0),
      Column("send_region_leaderboard", TINYINT(), default=0),
      Column("send_region_uniquepax_chart", TINYINT(), default=0),
      Column("send_region_stats", VARCHAR(length=45), default=0),
      Column("send_mid_month_charts", VARCHAR(length=45), default=0),
      Column("comments", TEXT()),
      schema="paxminer");

In [28]:
dtypes = {"user_name": pd.StringDtype(),
              "email": pd.StringDtype(),
              "home_region_id": pd.StringDtype(),
              "attendance_count": pd.Int64Dtype(),
              "rn": pd.Int64Dtype()}

df_home_region = home_region_query(engine, metadata)
df_home_region = df_home_region.astype(dtypes)

In [11]:
cr = metadata.tables["weaselbot.combined_regions"]
cu = metadata.tables["weaselbot.combined_users"]
cud = metadata.tables["weaselbot.combined_users_dup"]
ca = metadata.tables["weaselbot.combined_aos"]
cb = metadata.tables["weaselbot.combined_beatdowns"]
catt = metadata.tables["weaselbot.combined_attendance"]

In [12]:
def insert_statement(table, insert_values, update_cols):
    sql = insert(table).values(insert_values)
    on_dup = sql.on_duplicate_key_update(
        {v.name: v for v in sql.inserted if v.name in update_cols}
    )
    return on_dup

In [13]:
def region_subquery(metadata):
    cb = metadata.tables["weaselbot.combined_beatdowns"]
    a = metadata.tables["weaselbot.combined_aos"]

    sql = select(
        a.c.region_id,
        func.max(cb.c.timestamp).label("max_timestamp"),
        func.max(cb.c.ts_edited).label("max_ts_edited"),
        func.count().label("beatdown_count"),
    )
    sql = sql.select_from(cb.join(a, cb.c.ao_id == a.c.ao_id))
    sql = sql.group_by(a.c.region_id).subquery("b")
    return sql

In [14]:
def paxminer_region_query(metadata):
    r = metadata.tables["paxminer.regions"]
    cr = metadata.tables["weaselbot.combined_regions"]
    sub = region_subquery(metadata)

    sql = select(
        r.c.schema_name,
        r.c.region.label("region_name"),
        sub.c.max_timestamp,
        sub.c.max_ts_edited,
        sub.c.beatdown_count,
        cr.c.region_id,
    )
    sql = sql.select_from(
        r.outerjoin(cr, r.c.schema_name == cr.c.schema_name).outerjoin(sub, cr.c.region_id == sub.c.region_id)
    )

    return sql

In [15]:
def weaselbot_region_query(metadata):
    cr = metadata.tables["weaselbot.combined_regions"]
    sub = region_subquery(metadata)

    sql = select(cr, sub.c.beatdown_count)
    sql = sql.select_from(cr.outerjoin(sub, cr.c.region_id == sub.c.region_id))

    return sql

In [16]:
paxminer_region_sql = paxminer_region_query(metadata) # verified
weaselbot_region_sql = weaselbot_region_query(metadata) # verified

In [17]:
### Sample data to move forward with ###
dtypes = dict(
        region_id=pd.StringDtype(),
        region_name=pd.StringDtype(),
        schema_name=pd.StringDtype(),
        slack_team_id=pd.StringDtype(),
        max_timestamp=pd.Float64Dtype(),
        max_ts_edited=pd.Float64Dtype(),
        beatdown_count=pd.Int16Dtype(),
    )

region_data = dict(slack_team_id=["", "", "", "", ""],
    region_id=["37", "24", "87", "100", "131"],
    schema_name="f3denver f3chicago f3naperville f3omaha f3stcharles".split(),
                   region_name="Denver Chicago Naperville Omaha St_Charles".split(),
                   max_timestamp=[1704815430.281659, 1704808955.413849, None, 1697642801.087519, 1704825215.550419],
                   max_ts_edited=[1704815466, 1704724809, None, None, 1704825356],
                   beatdown_count=[100, 100, 100, 100, 100]
                  )
df_regions = pd.DataFrame(region_data)

In [18]:
df_regions = df_regions.convert_dtypes()

In [19]:
for r in df_regions.itertuples(index=False):
    if r.schema_name == 'f3denver':
        row = r

In [20]:
def pull_users(row: tuple[Any, ...], engine: Engine, metadata: MetaData) -> pd.DataFrame:
    dtypes = dict(
        slack_user_id=pd.StringDtype(), user_name=pd.StringDtype(), email=pd.StringDtype(), region_id=pd.StringDtype()
    )
    try:
        usr = Table("users", metadata, autoload_with=engine, schema=row.schema_name)
    except Exception as e:
        print(e)
        return pd.DataFrame(columns=dtypes.keys())
    
    sql = select(
        usr.c.user_id.label("slack_user_id"),
        usr.c.user_name,
        usr.c.email,
        literal_column(f"'{row.region_id}'").label("region_id"),
    )

    with engine.begin() as cnxn:
        df = pd.read_sql(sql, cnxn, dtype=dtypes)

    return df

In [36]:
df_users_dup = pull_users(row, engine, metadata)

In [37]:
df_aos = pull_aos(row, engine, metadata)

In [38]:
df_attendance = pull_attendance(row, engine, metadata)

In [39]:
df_beatdowns = pull_beatdowns(row, engine, metadata)

In [40]:
df_beatdowns.ts_edited = df_beatdowns.ts_edited.replace("NA", pd.NA).astype(pd.Float64Dtype())

In [41]:
#### building users ####

cu = metadata.tables["weaselbot.combined_users"]
cud = metadata.tables["weaselbot.combined_users_dup"]

df_users_dup["email"] = df_users_dup["email"].str.lower()
df_users_dup = df_users_dup[df_users_dup["email"].notna()]

df_user_agg = (
    df_attendance.groupby(["slack_user_id"], as_index=False)["bd_date"].count().rename({"bd_date": "count"}, axis=1)
)
df_users = (
    df_users_dup.merge(df_user_agg[["slack_user_id", "count"]], on="slack_user_id", how="left")
    .fillna(0)
    .sort_values(by="count", ascending=False)
)

df_users.drop_duplicates(subset=["email"], keep="first", inplace=True)

###
df_home_region.rename(columns={"user_name": "user_name_home"}, inplace=True)
df_users = df_users.merge(df_home_region[["user_name_home", "email", "home_region_id"]], on="email", how="left")

mask = df_users["home_region_id"].isna()
df_users.loc[mask, "home_region_id"] = df_users.loc[mask, "region_id"]

df_users.loc[~df_users["user_name_home"].isna(), "user_name"] = df_users[~df_users["user_name_home"].isna()][
    "user_name_home"
]
###

dtypes = dict(
    user_id=pd.StringDtype(), user_name=pd.StringDtype(), email=pd.StringDtype(), home_region_id=pd.StringDtype()
)

insert_values = (
    df_users[["user_name", "email", "region_id"]].rename({"region_id": "home_region_id"}, axis=1).to_dict("records")
)
update_cols = ("user_name", "email", "home_region_id")
user_insert_sql = insert_statement(cu, insert_values, update_cols)

df_users = pd.read_sql(select(cu), engine, dtype=dtypes)
df_users_dup = df_users_dup.merge(df_users[["email", "user_id"]], on="email", how="left")

In [None]:
print(sqlparse.format(user_insert_sql.compile(engine, compile_kwargs={'literal_binds': True}).__str__(), reindent=True, keyword_case='upper'))

In [None]:
#### build AOs ####

ca = metadata.tables["weaselbot.combined_aos"]
insert_values = df_aos[["slack_channel_id", "ao_name", "region_id"]].to_dict("records")
update_cols = ("ao_name",)
aos_insert_sql = insert_statement(ca, insert_values, update_cols)

dtypes = {
        "ao_id": pd.StringDtype(),
        "slack_channel_id": pd.StringDtype(),
        "ao_name": pd.StringDtype(),
        "region_id": pd.StringDtype(),
    }

df_aos = pd.read_sql(select(ca), engine, dtype=dtypes)

In [None]:
print(sqlparse.format(aos_insert_sql.compile(engine, compile_kwargs={'literal_binds': True}).__str__(), reindent=True, keyword_case='upper'))

In [None]:
def extract_user_id(slack_user_id) -> NAType | str:
    """
    Process Slack user ID's. Some of these are
    not just simple user ID's. Clean them up
    to standardize across the process.

    :param slack_user_id: User ID from Slack
    :type slack_user_id: str
    :rtype: str | pandas.NA
    :return: cleaned userid string.
    """

    match isinstance(slack_user_id, type(pd.NA)):
        case True:
            return pd.NA
        case _:
            if slack_user_id.startswith("U"):
                return slack_user_id
            elif "team" in slack_user_id:
                return slack_user_id.split("/team/")[1].split("|")[0]

In [None]:
#### Build Beatdowns ####

df_beatdowns["slack_q_user_id"] = df_beatdowns["slack_q_user_id"].apply(extract_user_id).astype(pd.StringDtype())
df_beatdowns["slack_coq_user_id"] = (
    df_beatdowns["slack_coq_user_id"].apply(extract_user_id).astype(pd.StringDtype())
)

cb = metadata.tables["weaselbot.combined_beatdowns"]

# find duplicate slack_user_ids on df_users_dup
df_beatdowns = (
    df_beatdowns.merge(
        df_users_dup[["slack_user_id", "user_id", "region_id"]],
        left_on=["slack_q_user_id", "region_id"],
        right_on=["slack_user_id", "region_id"],
        how="left",
    )
    .rename({"user_id": "q_user_id"}, axis=1)
    .merge(
        df_users_dup[["slack_user_id", "user_id", "region_id"]],
        left_on=["slack_coq_user_id", "region_id"],
        right_on=["slack_user_id", "region_id"],
        how="left",
    )
    .rename({"user_id": "coq_user_id"}, axis=1)
    .merge(
        df_aos[["slack_channel_id", "ao_id", "region_id"]],
        on=["slack_channel_id", "region_id"],
        how="left",
    )
)
df_beatdowns["fng_count"] = df_beatdowns["fng_count"].fillna(0)

insert_values = df_beatdowns[df_beatdowns["ao_id"].notna()][
    [
        "ao_id",
        "bd_date",
        "q_user_id",
        "coq_user_id",
        "pax_count",
        "fng_count",
        "timestamp",
        "ts_edited",
        "backblast",
        # "json",
    ]
].to_dict("records")

# below columns are INT in their target table. coerce them so they'll load properly
# leaving them as strings in the dataframes for later ease in merges/joins
# NOTE: YHC is unable to test the JSON datatype. Presumbaly, MySQL will want those
# sent over as proper dictionaries and not string representations of dictionaries.
# This is the role of `ast.literal_eval`. If that's not the case, then just remove
# the `if` statement logic to keep them as strings.
for d in insert_values:
    for col in ("ao_id", "q_user_id", "coq_user_id"):
        try:
            d[col] = int(d[col])
        except TypeError:
            pass
    # if d["json"] is not None:
    #     d["json"] = ast.literal_eval(d["json"])

update_cols = ("coq_user_id", "pax_count", "fng_count", "timestamp", "ts_edited", "backblast")#, "json")

beatdowns_insert_sql = insert_statement(cb, insert_values, update_cols)

In [None]:
df_beatdowns.sort_values('bd_date', ascending=False)

In [None]:
print(sqlparse.format(beatdowns_insert_sql.compile(engine, compile_kwargs={'literal_binds': True}).__str__(), reindent=True, keyword_case='upper'))