In [20]:
!pip install scikit-multilearn



In [21]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from skmultilearn.adapt import MLkNN
from sklearn.metrics import hamming_loss, accuracy_score
from sklearn.neighbors import NearestNeighbors
from sklearn.neighbors import KNeighborsClassifier

In [22]:
# Utility: parse the tag id string from works CSV into a list of tag id strings
import pandas as _pd

def parse_tag_string(s):
    if _pd.isna(s):
        return []
    parts = [p for p in str(s).split('+') if p != '']
    return parts

# Optional helper to inspect NearestNeighbors (debugging shadowing issues)
def debug_nearestneighbors():
    print('scikit-learn version:', sklearn.__version__)
    print('scikit-multilearn version:', skmultilearn.__version__)
    print('NearestNeighbors object:', NearestNeighbors)
    try:
        print('NearestNeighbors.__init__ signature:', inspect.signature(NearestNeighbors.__init__))
    except Exception as e:
        print('Could not get signature:', e)

In [23]:
def train_mknn(X, Y, k=5):
    """
    X: np.ndarray (num_works, num_features)
    Y: np.ndarray (num_works, num_tags) — each row is 0/1 per tag
    """
    knn = NearestNeighbors(n_neighbors=k, metric="cosine")
    knn.fit(X)
    return knn, Y


def predict_tags_mknn(knn, Y, word_count_vector, tags_work, threshold=0.2):
    """
    knn: fitted NearestNeighbors model
    Y: label matrix from training (num_works, num_tags)
    word_count_vector: np.ndarray (num_features,)
    tags_work: list of strings, tag names
    threshold: minimum probability for a tag to be predicted
    """

    # Find nearest neighbors
    distances, indices = knn.kneighbors([word_count_vector])

    # Gather neighbor tag vectors
    neighbor_tags = Y[indices[0]]  # shape: (k, num_tags)

    # Compute average label presence (freq among k neighbors)
    tag_scores = neighbor_tags.mean(axis=0)

    # Choose tags above threshold
    predicted = [
        tags_work[i]
        for i, score in enumerate(tag_scores)
        if score >= threshold
    ]

    return predicted, tag_scores

In [24]:
#Description: This will be an AI project focused on generating potential tags for AO3 based fanfiction
aspects_df = pd.read_csv('tags-20210226.csv')
aspects_df['name'] = aspects_df['name'].astype(str)
id = aspects_df["id"]
tags_name = aspects_df["name"]
aspects_df2 = pd.read_csv('works-20210226.csv')
aspects_df2["tag_id_list"] = aspects_df2["tags"].apply(parse_tag_string)
tag_id_to_name = dict(zip(id.astype(str), tags_name))
def ids_to_names(id_list):
    return [tag_id_to_name[i] for i in id_list if i in tag_id_to_name]

aspects_df2["tag_name_list"] = aspects_df2["tag_id_list"].apply(ids_to_names)

aspects_df2["tag_name_list"] = aspects_df2["tag_name_list"].apply(lambda lst: ",".join(lst))
tags_work = aspects_df2["tag_name_list"]
#tags_work = tags_work.astype("string")
word_count = aspects_df2["word_count"]
#word_count = word_count.astype("string")
print(tags_work)

1      Explicit,Dubious Consent,Rimming,Dealfic,M/M,N...
2      Explicit,Star Trek,Star Trek: The Original Ser...
3      Avatar: The Last Airbender,Alternate Universe,...
4      Teen And Up Audiences,F/M,Gen,Graphic Depictio...
                             ...                        
994    Teen And Up Audiences,Hurt/Comfort,M/M,Choose ...
995    Dream,Gen,Graphic Depictions Of Violence,Teen ...
996    Teen And Up Audiences,Dream,M/M,Choose Not To ...
997    Teen And Up Audiences,Dream,Gen,Graphic Depict...
998    Mature,Hurt/Comfort,Romance,M/M,No Archive War...
Name: tag_name_list, Length: 999, dtype: object


In [25]:
from sklearn.preprocessing import MultiLabelBinarizer

# Create a multi-label binarizer for tags
mlb = MultiLabelBinarizer()
y_tags = mlb.fit_transform([tags.split(",") for tags in tags_work])

# Use word_count as features (or use TF-IDF of tag text descriptions if available)
# For now, we'll create a simple feature from word_count
X_features = np.column_stack([word_count.values])  # Can add more features here

# Split the data
x_train, x_test, y_train, y_test = train_test_split(X_features, y_tags, test_size=0.30, random_state=42)


In [26]:
# Debug check to detect shadowing or incompatible versions
#debug_nearestneighbors()

# Train MLkNN - ensure inputs are the right shapes and types
mlknn_classifier = KNeighborsClassifier(n_neighbors=5)
mlknn_classifier.fit(x_train, y_train)
# Predict on test set and show a simple metric
y_pred = mlknn_classifier.predict(x_test)
print('Hamming loss:', hamming_loss(y_test, y_pred))

Hamming loss: 0.016994871794871795


In [27]:
pip install pandas scikit-learn gradio



In [28]:
import io
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.multiclass import OneVsRestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, precision_score, recall_score
import gradio as gr

# ---------------------------
# Utility / Preprocessing
# ---------------------------
def parse_tag_string(tag_string):
    """Convert AO3 style '123+45+9001' into ['123','45','9001'] safely."""
    if pd.isna(tag_string) or tag_string == "":
        return []
    parts = str(tag_string).split("+")
    cleaned = [p.strip() for p in parts if p and p.strip().isdigit()]
    return cleaned

def build_tag_mapping(tags_df, id_col=None, name_col=None):
    """Return mapping str(id)->name from tags dataframe. Attempts common column names."""
    if id_col is None:
        for c in tags_df.columns:
            if "id" in c.lower():
                id_col = c
                break
    if name_col is None:
        for c in tags_df.columns:
            if "name" in c.lower() or "tag" in c.lower():
                name_col = c
                break
    if id_col is None or name_col is None:
        raise ValueError("Couldn't auto-detect id/name columns in tags file. Please ensure columns contain 'id' and 'name' words.")
    mapping = {str(r): n for r, n in zip(tags_df[id_col].astype(str), tags_df[name_col].astype(str))}
    return mapping, id_col, name_col

def ids_to_names(id_list, mapping):
    return [mapping[i] for i in id_list if i in mapping]

def names_list_to_csv_string(name_list):
    if not name_list:
        return ""
    # Join with commas, ensure dtype str on output
    return ",".join([str(x).strip() for x in name_list])

# ---------------------------
# Model helper: lightweight baseline
# ---------------------------
class TagPredictorBaseline:
    """
    TF-IDF on a concatenated metadata text + One-vs-Rest Logistic Regression.
    This is a fast baseline suitable for the notebook UI/prototype.
    """
    def __init__(self):
        self.mlb = None
        self.pipeline = None
        self.tag_index_to_name = None

    def build_input_text(self, df):
        # Choose a set of metadata columns to summarize into text
        # Use many columns if available to give the model some signal.
        cols = [c for c in df.columns if c.lower() not in ("tags","tag_id_list","tag_name_list")]
        # create text by concatenating column:value tokens
        text_series = df[cols].fillna("").astype(str).apply(lambda row: " ".join([f"{col}:{row[col]}" for col in cols]), axis=1)
        return text_series

    def fit(self, works_df, tag_vocab):
        """
        works_df must contain a 'tag_name_list' column (list of valid tag names).
        tag_vocab is the list of allowed tag names (strings).
        """
        df = works_df.copy()
        df["text_input"] = self.build_input_text(df)

        # MultiLabelBinarizer on allowed tags (ensures we never predict unknown tags)
        self.mlb = MultiLabelBinarizer(classes=tag_vocab)
        Y = self.mlb.fit_transform(df["tag_name_list"])

        # Simple TF-IDF + OneVsRest Logistic Regression
        self.pipeline = Pipeline([
            ("tfidf", TfidfVectorizer(max_features=20000, ngram_range=(1,2))),
            ("clf", OneVsRestClassifier(LogisticRegression(max_iter=1000)))
        ])
        self.pipeline.fit(df["text_input"], Y)
        self.tag_index_to_name = {i: t for i, t in enumerate(self.mlb.classes_)}

    def predict(self, works_df, threshold=0.5, top_k=None):
        df = works_df.copy()
        df["text_input"] = self.build_input_text(df)
        probs = self.pipeline.predict_proba(df["text_input"])  # shape: (n_samples, n_labels)
        predictions = []
        for i, row_probs in enumerate(probs):
            if top_k is not None:
                # take top_k labels by probability
                top_idx = np.argsort(row_probs)[::-1][:top_k]
                picked = [self.tag_index_to_name[idx] for idx in top_idx if row_probs[idx] > 0]  # pick non-zero probabilities
            else:
                picked = [self.tag_index_to_name[idx] for idx, p in enumerate(row_probs) if p >= threshold]
            predictions.append(picked)
        return predictions

    def evaluate(self, works_df, threshold=0.5):
        df = works_df.copy()
        df["text_input"] = self.build_input_text(df)
        Y_true = self.mlb.transform(df["tag_name_list"])
        probs = self.pipeline.predict_proba(df["text_input"])
        Y_pred = (probs >= threshold).astype(int)
        micro_f1 = f1_score(Y_true, Y_pred, average="micro", zero_division=0)
        micro_precision = precision_score(Y_true, Y_pred, average="micro", zero_division=0)
        micro_recall = recall_score(Y_true, Y_pred, average="micro", zero_division=0)
        return {"f1": micro_f1, "precision": micro_precision, "recall": micro_recall}

# ---------------------------
# Gradio app functions
# ---------------------------
state = {
    "works_df": None,
    "tags_df": None,
    "tag_mapping": None,
    "baseline": TagPredictorBaseline(),
    "tag_vocab": None
}

def upload_tags_file(file_obj):
    try:
        df = pd.read_csv(file_obj)
    except Exception as e:
        return f"Failed to read tags CSV: {e}", None
    state["tags_df"] = df
    # auto-detect mapping
    try:
        mapping, id_col, name_col = build_tag_mapping(df)
    except Exception as e:
        return f"Tags file loaded but mapping failed: {e}", df.head().to_csv(index=False)
    state["tag_mapping"] = mapping
    return f"Tags file loaded. Detected id column '{id_col}', name column '{name_col}'. {len(mapping)} tags loaded.", df.head().to_csv(index=False)

def upload_works_file(file_obj):
    try:
        df = pd.read_csv(file_obj)
    except Exception as e:
        return f"Failed to read works CSV: {e}", None
    # parse tag id strings into list
    if "tags" in df.columns:
        df["tag_id_list"] = df["tags"].apply(parse_tag_string)
    else:
        df["tag_id_list"] = [[] for _ in range(len(df))]
    state["works_df"] = df
    return f"Works file loaded with {len(df)} rows.", df.head().to_csv(index=False)

def map_ids_to_names_preview():
    if state["works_df"] is None or state["tag_mapping"] is None:
        return "Upload both works and tags files first.", ""
    df = state["works_df"].copy()
    df["tag_name_list"] = df["tag_id_list"].apply(lambda lst: ids_to_names(lst, state["tag_mapping"]))
    # convert back to csv string for preview
    df["tags_comma_string"] = df["tag_name_list"].apply(names_list_to_csv_string)
    state["works_df"] = df  # store
    # Also build tag vocab for the model (unique tag names seen in mapping)
    tag_vocab = sorted(list(set([t for sub in df["tag_name_list"].tolist() for t in sub])))
    state["tag_vocab"] = tag_vocab
    return f"Mapped tags. Found {len(tag_vocab)} unique tag names in works (that exist in tags file).", df.head().to_csv(index=False)

def quick_train_demo(test_size=0.2, random_state=42):
    if state["works_df"] is None or state["tag_vocab"] is None:
        return "Map IDs to names first (press 'Map IDs → Names')", ""
    df = state["works_df"].copy()
    # require tag_name_list column
    if "tag_name_list" not in df.columns:
        return "Map IDs to names column missing.", ""
    # filter out works with no tags (optional)
    df_train = df.copy()
    # quick split
    train_df, val_df = train_test_split(df_train, test_size=test_size, random_state=random_state)
    baseline = TagPredictorBaseline()
    baseline.fit(train_df, state["tag_vocab"])
    state["baseline"] = baseline
    # evaluate
    metrics = baseline.evaluate(val_df)
    metrics_text = f"Validation micro-F1: {metrics['f1']:.4f}, Precision: {metrics['precision']:.4f}, Recall: {metrics['recall']:.4f}"
    return "Quick baseline trained.", metrics_text

def predict_for_uploaded(threshold=0.5, top_k=None):
    if state["works_df"] is None:
        return "No works uploaded.", ""
    baseline = state.get("baseline", None)
    if baseline is None or baseline.pipeline is None:
        return "Model not trained yet. Use 'Quick Train Demo' first.", ""
    df = state["works_df"].copy()
    preds = baseline.predict(df, threshold=threshold, top_k=top_k if top_k>0 else None)
    df["predicted_tag_list"] = preds
    df["predicted_tags_csv"] = df["predicted_tag_list"].apply(names_list_to_csv_string)
    # Show preview CSV
    return f"Predicted tags for {len(df)} works (preview below).", df[["predicted_tags_csv"]].head(50).to_csv(index=False)

# ---------------------------
# Gradio interface layout
# ---------------------------
with gr.Blocks() as demo:
    gr.Markdown("## AO3 Tagging — Gradio prototype (Notebook)\nUpload your `tags` CSV and `works` CSV (AO3-style tag IDs). This UI will parse, map IDs→names (ignoring unknown IDs), allow a quick baseline training, and produce predicted tag **names** as comma-separated strings.")
    with gr.Row():
        with gr.Column(scale=1):
            tags_file = gr.File(label="Upload tags CSV (tags-20210226.csv)", file_types=['.csv'])
            tags_status = gr.Textbox(label="Tags status", interactive=False)
            tags_preview = gr.Textbox(label="Tags preview (CSV head)", interactive=False)
            upload_tags_btn = gr.Button("Load tags file")
        with gr.Column(scale=1):
            works_file = gr.File(label="Upload works CSV (works-20210226.csv)", file_types=['.csv'])
            works_status = gr.Textbox(label="Works status", interactive=False)
            works_preview = gr.Textbox(label="Works preview (CSV head)", interactive=False)
            upload_works_btn = gr.Button("Load works file")
    gr.Markdown("---")
    map_btn = gr.Button("Map IDs → Names & Convert to comma-separated string")
    map_status = gr.Textbox(label="Mapping status", interactive=False)
    map_preview = gr.Textbox(label="Works with mapped tags preview", interactive=False)
    gr.Markdown("### Quick baseline training (fast demo)")
    with gr.Row():
        train_btn = gr.Button("Quick Train Demo (TF-IDF + One-vs-Rest LR)")
        train_status = gr.Textbox(label="Training status", interactive=False)
        train_metrics = gr.Textbox(label="Validation metrics", interactive=False)
    gr.Markdown("### Predict")
    with gr.Row():
        threshold_slider = gr.Slider(0.0, 1.0, value=0.5, step=0.01, label="Prediction threshold (sigmoid)")
        topk_input = gr.Number(value=0, label="Top-K (0 = use threshold)", precision=0)
        predict_btn = gr.Button("Predict tags for uploaded works")
        predict_status = gr.Textbox(label="Prediction status", interactive=False)
        predict_preview = gr.Textbox(label="Prediction preview (predicted_tags_csv column)", interactive=False)

    # Bind buttons to functions
    upload_tags_btn.click(fn=lambda f: upload_tags_file(f.name) if f is not None else ("No file provided.", ""), inputs=[tags_file], outputs=[tags_status, tags_preview])
    upload_works_btn.click(fn=lambda f: upload_works_file(f.name) if f is not None else ("No file provided.", ""), inputs=[works_file], outputs=[works_status, works_preview])
    map_btn.click(fn=map_ids_to_names_preview, inputs=[], outputs=[map_status, map_preview])
    train_btn.click(fn=quick_train_demo, inputs=[], outputs=[train_status, train_metrics])
    predict_btn.click(fn=lambda th, k: predict_for_uploaded(threshold=th, top_k=int(k)), inputs=[threshold_slider, topk_input], outputs=[predict_status, predict_preview])

# Launch the Gradio interface in notebook (visible inline)
demo.launch(share=False, inbrowser=False)

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
Note: opening Chrome Inspector may crash demo inside Colab notebooks.
* To create a public link, set `share=True` in `launch()`.


<IPython.core.display.Javascript object>

