# Ubuntu Dialogue Corpus
Source [Kaggle](https://www.kaggle.com/datasets/rtatman/ubuntu-dialogue-corpus)

In [None]:
!wget  https://www.kaggle.com/api/v1/datasets/download/rtatman/ubuntu-dialogue-corpus -O ../data/raw/convo/ubuntu-dialogue-corpus.zip

In [None]:
import zipfile

with zipfile.ZipFile("../data/raw/convo/ubuntu-dialogue-corpus.zip", 'r') as zip_ref:
    zip_ref.extractall("../data/raw/convo/")

## Load the dataset
The dataset consists of 3 csv files. SO we need to load and combine these files into one dataframe for easy analysis

In [1]:
import pandas as pd
import os
from tqdm import tqdm

# Path to the folder containing the CSV files
folder_path = "../data/raw/convo/Ubuntu-dialogue-corpus"
out_path = "../data/processed/convo/dialogues_parquet/"

def process_chunk(df: pd.DataFrame) -> pd.DataFrame:

    """Light-weight, per-chunk normalization and id creation."""

    # Dates if present
    if "date" in df.columns:
        df["date"] = pd.to_datetime(df["date"], errors="coerce")
    
    # create categorical columns
    for col in ["folder", "sender"]:
        if col in df.columns:
            df[col] = df[col].astype("category")

    # create unique id if possible
    if {"folder", "dialogueID"}.issubset(df.columns):
        # create the id without using apply (vectorized and fast)
        df["dialogueID"] = df["dialogueID"].astype(str).str.replace(r"\.tsv$", "", regex=True)
        df["id"] = df["folder"].astype(str) + "_" + df["dialogueID"]
    return df


In [2]:
def merge_consecutive_messages(df: pd.DataFrame, carry: dict | None, text_col: str = "text"):
    """
    Merge consecutive messages from the same speaker, given a carry-over dictionary.

    Returns:
        merged_rows: List of dicts (safe to convert to DataFrame)
        new_carry: Updated carry-over dictionary or None
    """

    merged = []
    for row in df.itertuples(index=False):
        row = row._asdict()
        
        if carry is None:
            carry = row
            continue

        # print(carry.keys())   # debug
        same_dialogue = row["dialogueID"] == carry["dialogueID"]
        same_speaker = row["sender"] == carry["sender"]


        if same_dialogue and same_speaker:
            carry[text_col] += " " + row[text_col]
        else:
            merged.append(carry)
            carry = row


    return merged, carry

In [3]:
def stream_csv_to_parquet(
        folder_path: str,
        out_path: str,
        chunksize: int = 200_000):
    """Stream CSVs from `folder_path` in chunks, process them, and writes Parquet parts to out_path/<source_name>/part_XXX.parquet.

    This avoids loading all files into memory. It writes intermediate parquet parts and attempts to merge them.
    """
    os.makedirs(out_path, exist_ok=True)

    for file_name in sorted(os.listdir(folder_path)):
        if not file_name.endswith(".csv"):
            continue

        file_path = os.path.join(folder_path, file_name)
        source_name = os.path.splitext(file_name)[0]
        source_out_path = os.path.join(out_path, source_name)
        os.makedirs(source_out_path, exist_ok=True)

        try:
            reader = pd.read_csv(
                file_path,
                chunksize=chunksize,
                dtype={
                    "folder": "string",
                    "dialogueID": "string",
                    "text": "string",
                },
            )
        except Exception as e:
            print(f"Error reading {file_path}: {e}")
            continue

        carry = None

        for idx, chunk in enumerate(tqdm(reader, desc=f"Processing {file_name}")):
            chunk = chunk.rename(columns={"from": "sender"})
            processed_chunk = process_chunk(chunk)
            merged_rows, carry = merge_consecutive_messages(processed_chunk, carry, text_col="text")
            if merged_rows:
                out_df = pd.DataFrame(merged_rows)
                out_file = os.path.join(source_out_path, f"part_{idx:05d}.parquet")
                out_df.to_parquet(out_file, index=False)

            del processed_chunk  # free memory

        # After all chunks, if there's a carry, write it out
        if carry is not None:
            out_df = pd.DataFrame([carry])
            out_file = os.path.join(source_out_path, f"part_final.parquet")
            out_df.to_parquet(out_file, index=False)

In [4]:
# load data and process and save as parquet
stream_csv_to_parquet(
    folder_path=folder_path,
    out_path=out_path,
    chunksize=200_000
)

Processing dialogueText.csv: 6it [00:11,  1.98s/it]
Processing dialogueText_196.csv: 47it [02:24,  3.07s/it]
Processing dialogueText_301.csv: 83it [04:19,  3.12s/it]
