In [None]:
!pip install pandas gensim openpyxl regex ftfy scikit-learn



In [None]:
from google.colab import drive

In [None]:
drive.mount('/content/drive')

drive_path_raw = "/content/drive/MyDrive/Colab Notebooks/nlp/Excels/"
drive_path_cleaned = "/content/drive/MyDrive/Colab Notebooks/nlp/Cleaned/"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


**START POINT**

In [None]:
import re, html, unicodedata
import pandas as pd
from pathlib import Path

try:
    from ftfy import fix_text
except Exception:
    def fix_text(s): return s


In [None]:
def lower_az(s: str) -> str:
    if not isinstance(s, str): return ""
    s = s.replace('İ', 'i').replace('I', 'ı')
    s = s.lower()
    s = unicodedata.normalize("NFC", s)
    return s

HTML_TAG_RE = re.compile(r"<[^>]+>")
URL_RE = re.compile(r"(https?://\S+|www\.\S+)", re.IGNORECASE)
EMAIL_RE = re.compile(r"\b[\w\.-]+@[\w\.-]+\.\w+\b", re.IGNORECASE)
PHONE_RE = re.compile(r"\+?\d[\d\-\s\(\)]{6,}\d")
USER_RE = re.compile(r"@\w+")
MULTI_PUNCT = re.compile(r"([!?.,;:])\1{1,}")
MULTI_SPACE = re.compile(r"\s+")
REPEAT_CHARS= re.compile(r"(.)\1{2,}", flags=re.UNICODE)

TOKEN_RE = re.compile(
    r"EMO_(?:POS|NEG)|<[A-Z0-9_]+>|<NUM>|URL|EMAIL|PHONE|USER"
    r"|\d+"
    r"|[A-Za-zƏəĞğIıİiÖöÜüÇçŞşXxQq]+(?:'[A-Za-zƏəĞğIıİiÖöÜüÇçŞşXxQq]+)?"
)


EMO_MAP = {
    "🙂":"EMO_POS","😀":"EMO_POS","😍":"EMO_POS","😊":"EMO_POS","👍":"EMO_POS",
    "😃":"EMO_POS","😄":"EMO_POS","😁":"EMO_POS","😆":"EMO_POS","😅":"EMO_POS",
    "🤣":"EMO_POS","😂":"EMO_POS","🙃":"EMO_POS","😉":"EMO_POS",
    "☹":"EMO_NEG","🙁":"EMO_NEG","😠":"EMO_NEG","😡":"EMO_NEG","👎":"EMO_NEG",
    "😒":"EMO_NEG","😓":"EMO_NEG","😔":"EMO_NEG","😖":"EMO_NEG","😞":"EMO_NEG",
    "😟":"EMO_NEG","😤":"EMO_NEG","😢":"EMO_NEG","😭":"EMO_NEG","😦":"EMO_NEG",
}

SLANG_MAP = {
    "slm":"salam","tmm":"tamam","sagol":"sağol","cox":"çox","yaxsi":"yaxşı",
    }

NEGATE_PREV = {
    "deyil","deyildi","deyilim","deyilsən","deyildir",
    "deyilik","deyilsiniz","deyildilər","yox","yoxdur","yoxdu"
}

NEGATE_NEXT = {"heç","qətiyyən"}


In [None]:
NEWS_HINTS = re.compile(r"\b(apa|trend|azertac|reuters|bloomberg|dha|aa)\b", re.I)
SOCIAL_HINTS = re.compile(r"\b(rt)\b|@|#|(?:😂|😍|😊|👍|👎|😡|🙂|USER)")
REV_HINTS = re.compile(r"\b(azn|manat|qiymət|aldım|ulduz|çox yaxşı|çox pis|STARS|RATING_POS|RATING_NEG)\b", re.I)

PRICE_RE = re.compile(r"(?:\b\d+\b|<NUM>)\s*(?:azn|manat|₼)\b", re.I)
STARS_RE = re.compile(r"(?:(?<!<)\b([1-5])\b|<NUM>)\s*[- ]*ulduz\b", re.I)
POS_RATE = re.compile(r"\bçox yaxşı\b")
NEG_RATE = re.compile(r"\bçox pis\b")



In [None]:
def detect_domain(text: str) -> str:
    s = text.lower()
    if NEWS_HINTS.search(s): return "news"
    if SOCIAL_HINTS.search(s): return "social"
    if REV_HINTS.search(s): return "reviews"
    return "general"

def _stars_sub(m):
    n = m.group(1)
    return f" <STARS_{n}> " if n else " <STARS> "

def domain_specific_normalize(cleaned: str, domain: str) -> str:
    if domain == "reviews":
        s = PRICE_RE.sub(" <PRICE> ", cleaned)
        s = POS_RATE.sub(" <RATING_POS> ", s)
        s = NEG_RATE.sub(" <RATING_NEG> ", s)
        s = STARS_RE.sub(_stars_sub, s)
        return " ".join(s.split())
    return cleaned

def add_domain_tag(line: str, domain: str) -> str:
    return f"dom{domain} " + line


In [None]:
def normalize_text_az(s: str, numbers_to_token=True, keep_sentence_punct=False) -> str:
    if not isinstance(s, str): return ""

    s = fix_text(s)
    s = html.unescape(s)
    s = HTML_TAG_RE.sub(" ", s)
    s = re.sub(r"#([A-Za-z0-9_]+)", lambda m: " " + re.sub('([a-z])([A-Z])', r'\1 \2', m.group(1)).lower() + " ", s)

    s = lower_az(s)

    for emo, tag in EMO_MAP.items():
        s = s.replace(emo, f" {tag} ")

    s = URL_RE.sub(" URL ", s)
    s = EMAIL_RE.sub(" EMAIL ", s)
    s = PHONE_RE.sub(" PHONE ", s)
    s = USER_RE.sub(" USER ", s)
    s = MULTI_PUNCT.sub(r"\1", s)
    if numbers_to_token:
        s = re.sub(r"(?i)(?<!<)\d+(?!\s*[- ]*ulduz\b)", " <NUM> ", s)
    if keep_sentence_punct:
        s = re.sub(r"[^\w\s<>'_əğıöşüçƏĞIİÖŞÜÇxqXQ.!?]", " ", s)
    else:
        s = re.sub(r"[^\w\s<>'_əğıöşüçƏĞIİÖŞÜÇxqXQ]", " ", s)
    s = MULTI_SPACE.sub(" ", s).strip()

    toks = TOKEN_RE.findall(s)
    norm = []
    mark_neg = 0

    for t in toks:
        t = REPEAT_CHARS.sub(r"\1\1", t)
        t = SLANG_MAP.get(t, t)

        if mark_neg > 0 and t not in {"URL","EMAIL","PHONE","USER"} and not t.startswith("<"):
            norm.append(t + "_NEG")
            mark_neg -= 1
            continue

        if t in NEGATE_PREV:
            i = len(norm) - 1
            while i >= 0:
                if not norm[i].endswith("_NEG") and norm[i] not in {"URL","EMAIL","PHONE","USER"} and not norm[i].startswith("<"):
                    norm[i] = norm[i] + "_NEG"
                    break
                i -= 1
            norm.append(t)
            continue

        if t in NEGATE_NEXT:
            norm.append(t)
            mark_neg = 3
            continue

        norm.append(t)

    norm = [t for t in norm if len(t) > 1 or t.isdigit() or t in {"o", "e"}]
    return " ".join(norm).strip()

In [None]:
def map_sentiment_value(v, scheme: str):
    if scheme == "binary":
        try:
            return 1.0 if int(v) == 1 else 0.0
        except Exception:
            return None
    s = str(v).strip().lower()
    if s in {"pos","positive","1","müsbət","good","pozitiv"}: return 1.0
    if s in {"neu","neutral","2","neytral"}: return 0.5
    if s in {"neg","negative","0","mənfi","bad","neqativ"}: return 0.0
    return None


In [None]:
def process_file(in_path, text_col, label_col, scheme, out_two_col_path, remove_stopwords=False):
    df = pd.read_excel(in_path)
    for c in ["Unnamed: 0","index"]:
        if c in df.columns: df = df.drop(columns=[c])
    assert text_col in df.columns and label_col in df.columns, f"Missing columns in {in_path}"

    df = df.dropna(subset=[text_col])
    df = df[df[text_col].astype(str).str.strip().str.len() > 0]
    df = df.drop_duplicates(subset=[text_col])

    df["cleaned_text"] = df[text_col].astype(str).apply(lambda s: normalize_text_az(s, numbers_to_token=False))# false?

    df["__domain__"] = df[text_col].astype(str).apply(detect_domain)
    df["cleaned_text"] = df.apply(lambda r: domain_specific_normalize(r["cleaned_text"], r["__domain__"]), axis=1)

    df["cleaned_text"] = df["cleaned_text"].apply(lambda s: re.sub(r"(?<!<STARS_)\b\d+\b", " <NUM> ", s))
    df["cleaned_text"] = df["cleaned_text"].apply(lambda s: MULTI_SPACE.sub(" ", s).strip())


    if remove_stopwords:
        sw = set(["və","ilə","amma","ancaq","lakin","ya","həm","ki","bu","bir","o","biz","siz","mən","sən",
                  "orada","burada","bütün","hər","artıq","çox","az","ən","də","da","üçün", "necə", "şey", "isə",
                  "hələ", "nə", "niyə", "kimi", "belə", "indi", "qədər"])
        for keep in set(list(NEGATE_PREV) + list(NEGATE_NEXT)):
            sw.discard(keep)
        df["cleaned_text"] = df["cleaned_text"].apply(lambda s: " ".join([t for t in s.split() if t not in sw]))

    df["sentiment_value"] = df[label_col].apply(lambda v: map_sentiment_value(v, scheme))
    df = df.dropna(subset=["sentiment_value"])
    df["sentiment_value"] = df["sentiment_value"].astype(float)

    out_df = df[["cleaned_text","sentiment_value"]].reset_index(drop=True)
    Path(out_two_col_path).parent.mkdir(parents=True, exist_ok=True)
    out_df.to_excel(out_two_col_path, index=False)
    print(f"Saved: {out_two_col_path} (rows={len(out_df)})")


In [None]:
def build_corpus_txt(input_files, text_cols, out_txt="corpus_all.txt"):
    """Create domain-tagged, lowercase, punctuation-free corpus (one sentence per line)."""
    lines = []
    for (f, text_col) in zip(input_files, text_cols):
        df = pd.read_excel(f)
        for raw in df[text_col].dropna().astype(str):
            dom = detect_domain(raw)
            s = normalize_text_az(raw, keep_sentence_punct=True)
            parts = re.split(r"[.!?]+", s)
            for p in parts:
                p = p.strip()
                if not p: continue
                p = re.sub(r"[^\w\səğıöşüçƏĞIİÖŞÜÇxqXQ]", " ", p)
                p = " ".join(p.split()).lower()
                if p:
                    lines.append(f"dom{dom} " + p)
    with open(out_txt, "w", encoding="utf-8") as w:
        for ln in lines:
            w.write(ln + "\n")
    print(f"Wrote {out_txt} with {len(lines)} lines")


In [None]:
if __name__ == "__main__":
    CFG = [
        (f"{drive_path_raw}labeled-sentiment.xlsx", "text", "sentiment", "tri"),
        (f"{drive_path_raw}test__1_.xlsx", "text", "label", "binary"),
        (f"{drive_path_raw}train__3_.xlsx", "text", "label", "binary"),
        (f"{drive_path_raw}train-00000-of-00001.xlsx", "text", "labels", "tri"),
        (f"{drive_path_raw}merged_dataset_CSV__1_.xlsx", "text", "labels", "binary"),
    ]

    for fname, tcol, lcol, scheme in CFG:
        out_filename = f"{Path(fname).stem}_2col.xlsx"

        full_out_path = f"{drive_path_cleaned}/{out_filename}"

        process_file(fname, tcol, lcol, scheme, full_out_path, remove_stopwords=False)

    corpus_output_path = f"{drive_path_cleaned}/corpus_all.txt"
    build_corpus_txt([c[0] for c in CFG], [c[1] for c in CFG], out_txt=corpus_output_path)
    print(drive_path_cleaned)

Saved: /content/drive/MyDrive/Colab Notebooks/nlp/Cleaned//labeled-sentiment_2col.xlsx (rows=2955)
Saved: /content/drive/MyDrive/Colab Notebooks/nlp/Cleaned//test__1__2col.xlsx (rows=4198)
Saved: /content/drive/MyDrive/Colab Notebooks/nlp/Cleaned//train__3__2col.xlsx (rows=19557)
Saved: /content/drive/MyDrive/Colab Notebooks/nlp/Cleaned//train-00000-of-00001_2col.xlsx (rows=41756)
Saved: /content/drive/MyDrive/Colab Notebooks/nlp/Cleaned//merged_dataset_CSV__1__2col.xlsx (rows=55662)
Wrote /content/drive/MyDrive/Colab Notebooks/nlp/Cleaned//corpus_all.txt with 124353 lines
/content/drive/MyDrive/Colab Notebooks/nlp/Cleaned/
