In [None]:
%load_ext autoreload
#%reload_ext autoreload

%autoreload 2

import sys
import logging

sys.path.append("../model")

import re
import os
import pandas as pd
import mlflow as mlf
from tqdm.auto import tqdm

tqdm.pandas()

from pysentimiento.preprocessing import preprocess_tweet
from analyzer_blstm import (
    AnalyzerForSequenceClassification,
    create_analyzer_blstm,
)

logging.getLogger("Utils").setLevel(logging.DEBUG)


ROBERTUITO_RUN_ID = "52089d0757e64bf588f2c75e439ae4e0"  # FINAL

ROBERTUITO_BLSTM_RUN_ID = "631eeac0d2d84a24a6b41a7ff6cc3ba4"  # FINAL

MIN_TEXT_LENGTH = 4

PIVOT_DATE = "2025-01-10"

In [None]:
re.match(r"\d{4}[-/]\d{1,2}[-/]\d{1,2}", "2023-11-23")

In [96]:
def str_to_date_tiktok(text: str, pivot_date=PIVOT_DATE) -> pd.Timestamp:
    text = str(text).strip()

    def _parse_date(_text: str) -> pd.Timestamp:
        formats = ["%d-%m-%Y", "%d/%m/%Y", "%Y-%m-%d", "%Y/%m/%d"]
        for fmt in formats:
            try:
                return pd.to_datetime(_text, format=fmt, errors="raise")
            except ValueError:
                continue

    if text and re.match(r"\d{4}[-/]\d{1,2}[-/]\d{1,2}", text):
        return _parse_date(text)
    elif text and re.match(r"\d{1,2}[-/]\d{1,2}[-/]\d{4}", text):
        return _parse_date(text)
    elif text and re.match(r"\d{1,2}[-/]\d{1,2}", text):
        if "-" in text:
            _, month = text.split("-")
        else:
            _, month = text.split("/")
        if month == 1:
            text = f"{text}-2025"
        else:
            text = f"{text}-2024"
        text = text.replace("/", "-")
        return pd.to_datetime(text, format="%m-%d-%Y", errors="raise")
    elif "ago" in text:
        pivot_date = pd.to_datetime(pivot_date, errors="raise", format="%Y-%m-%d")
        if "d" in text:
            date = pivot_date - pd.Timedelta(days=int(text.split("d ago")[0]))
        elif "h" in text:
            date = pivot_date - pd.Timedelta(hours=int(text.split("h ago")[0]))
        elif "w" in text:
            date = pivot_date - pd.Timedelta(weeks=int(text.split("w ago")[0]))
        elif "m" in text:
            date = pivot_date - pd.Timedelta(month=int(text.split("m ago")[0]))
        else:
            date = pivot_date
        return date

    return pd.NaT


def str_to_date_twitter(text: str) -> pd.Timestamp:
    text = str(text).strip()

    def _parse_timestamp(serial: int) -> pd.Timestamp:
        # Fecha base en Excel (1 de enero de 1900)
        fecha_base = pd.Timestamp("1900-01-01")

        # Convertir el número de serie a una fecha
        fecha = fecha_base + pd.Timedelta(days=serial - 2)

        # Formatear la fecha en el formato "dd/mm/yyyy"
        fecha_formateada = fecha.strftime("%d/%m/%Y")

        return fecha_formateada

    if "Z" in text:
        return pd.to_datetime(text, format="%Y-%m-%dT%H:%M:%S.%fZ", errors="raise")
    elif re.match(r"\d{4,6}", text):
        return _parse_timestamp(int(text))

    return pd.NaT

In [None]:
runs_path = os.path.abspath(os.path.join("..", "model", "mlruns"))
mlf.set_tracking_uri("file:/ " + runs_path)

columns = [
    "run_id",
    "status",
    "params.lstm_hidden_dim",
    "params.lstm_num_layers",
    "metrics.train_runtime",
    "metrics.eval_macro_f1",
    "metrics.train_loss",
    "metrics.eval_macro_recall",
    "metrics.eval_macro_precision",
    "artifact_uri",
]

runs = mlf.search_runs(
    filter_string='status="FINISHED"', order_by=["metrics.eval_macro_f1 DESC"]
)[columns]

# search
run_torch = runs[runs["run_id"] == ROBERTUITO_RUN_ID].iloc[0]
roubertuito = f'{run_torch["artifact_uri"]}/model'.replace("file:///", "")
roubertuito

In [None]:
model_analyzer = create_analyzer_blstm(model_path=roubertuito)

## Analysis tweets


In [97]:
def process_analysis_x(
    row: pd.Series, model_analyzer: AnalyzerForSequenceClassification
):

    text = row["text"]

    if not text or len(text) < MIN_TEXT_LENGTH:
        return {
            "text": text,
            "analysis": None,
        }

    text_analysis = model_analyzer.predict(text).probas

    max_proba = max(text_analysis.items(), key=lambda x: x[1])[0]

    return {
        "text": text,
        "analysis": {
            "max_proba": max_proba,
            "probas": text_analysis,
        },
    }

In [None]:
df_x = pd.read_excel("../data/append_tweets.xlsx")
df_x["text"] = df_x["text"].astype(str)
df_x["date_parsed"] = df_x["timestamp"].progress_apply(lambda x: str_to_date_twitter(x))

In [None]:
df_x[df_x["date_parsed"].isna()][["timestamp", "date_parsed"]]

In [100]:
analysis = df_x.progress_apply(
    lambda row: process_analysis_x(row, model_analyzer), axis=1
)

df_x["analysis"] = analysis

  0%|          | 0/1646 [00:00<?, ?it/s]

In [102]:
df_x.to_excel("../data/append_tweets_analysis_v2.xlsx", index=False)

## Analysis tiktoks


In [39]:
def process_analysis_tiktok(
    row: pd.Series, model_analyzer: AnalyzerForSequenceClassification
):

    description_text = row["description"]

    comments_text = row["comments_text"].split("|")

    comments_text = [
        comment for comment in comments_text if len(comment) >= MIN_TEXT_LENGTH
    ]

    description_analysis = None

    if not description_text or len(description_text) < MIN_TEXT_LENGTH:
        probas_desc = model_analyzer.predict(description_text).probas

        if len(probas_desc) == 0:
            probas_desc = None
        else:
            max_proba_description = max(probas_desc.items(), key=lambda x: x[1])[0]
            description_analysis = {
                "sentiment": max_proba_description,
                "probas": probas_desc,
            }
    else:
        description_analysis = None

    comments_analysis = []

    for comment_text in tqdm(
        comments_text, desc="Analyzing comments", leave=False, unit="comment"
    ):
        if len(comment_text) < MIN_TEXT_LENGTH:
            comments_analysis.append({"text": comment_text, "analysis": None})

        probas = model_analyzer.predict(comment_text).probas

        if len(probas) == 0:
            comments_analysis.append(
                {
                    "text": comment_text,
                    "analysis": None,
                }
            )
        else:
            max_proba = max(probas.items(), key=lambda x: x[1])[0]

            comments_analysis.append(
                {
                    "text": comment_text,
                    "analysis": {
                        "sentiment": max_proba,
                        "probas": probas if len(probas) > 0 else None,
                    },
                }
            )

    return {
        "description": {
            "text": description_text,
            "analysis": description_analysis,
        },
        "comments": comments_analysis,
    }

In [None]:
df_tiktok = pd.read_excel("../data/append_tik_tok.xlsx")
df_tiktok["description"] = df_tiktok["description"].astype(str)
df_tiktok["comments_text"] = df_tiktok["comments_text"].astype(str)
df_tiktok["date_parsed"] = df_tiktok["date"].progress_apply(
    lambda x: str_to_date_tiktok(x)
)

In [None]:
analysis = df_tiktok.progress_apply(
    lambda row: process_analysis_tiktok(row, model_analyzer), axis=1
)

df_tiktok["analysis"] = analysis
df_tiktok.to_excel("../data/append_tik_tok_analysis_v2.xlsx", index=False)