# ETL Pipeline: Transform

## Imports

In [None]:
import sys

sys.path.insert(1, '../..')

import datetime
import logging

import matplotlib.pyplot as plt
import pandas as pd

from src.config.config import Config
from src.database.chroma_manager import ChromaManager
from src.database.database import Database
from src.load.data_loader import DataLoader
from src.transform.email_summary import summarize_messages
from src.transform.llm_invoker import LLMInvoker
from src.transform.message_classification import classify_categories
from src.transform.product_classification import classify_products
from src.transform.ner import extract_entities_from_messages
from src.transform.spam_classification import (
    classify_spam_messages_with_llm,
    zero_shot_classify_spam_messages,
)
from src.transform.topic_modelling import TopicModellor
from src.utils.checkpoint import DataFrameCheckpointer

logging.basicConfig(level=logging.INFO)
config = Config.from_json("../../config.json")
llm_invoker = LLMInvoker(model_name="phi3:3.8b-mini-4k-instruct-fp16", use_ollama=config.use_ollama)
database = Database.from_credentials(username=config.db_user, password=config.db_password, host=config.db_host, database=config.db_name)
loader = DataLoader(database)
                         
DATA_DIR = '../../data'
PST_DIR = config.pst_directory
DATE = datetime.datetime.now().strftime("%Y-%m-%d")

checkpointer = DataFrameCheckpointer(DATA_DIR + '/checkpoints')

## Transformations

### Retrieve from Checkpoint

In [None]:
df = pd.read_csv(f"{DATA_DIR}/interim/preprocessed_messages.csv")

### Retrieve Quarterly, Monthly, and Weekly Sets of Messages from DB

### Filter Emails

Most feature engineering tasks don't need to be run on all emails. The following feature engineering tasks are intended for customer oriented emails. We can safely disregard internal emails and outgoing emails.

In [None]:
message_df = df.loc[(df["is_internal"] == False) & (df["from_address"] != "info@qib.com.qa")]

#### Spam Classification

Further filter by removing spam emails.

In [None]:
# spam_df = classify_spam_messages_with_llm(message_df, llm_invoker)
spam_df = zero_shot_classify_spam_messages(message_df)

In [None]:
checkpointer.save("spam_classification", spam_df)

In [None]:
message_df = message_df.merge(spam_df, on="message_id")
message_df = message_df.loc[message_df["is_spam"] == False]

In [None]:
checkpointer.save("spam_classified_messages", message_df)

### Vectorization of Emails

Setup Sentence Transformer and ChromaDB

In [None]:
chroma = ChromaManager("message_embeddings", model_name=config.embedding_model_name)

Get or Create Sentence Embeddings

In [None]:
message_df = chroma.populate_embeddings(message_df)

In [None]:
checkpointer.save("message_embeddings", message_df)

### Feature Engineering and Modelling

#### Intent Analysis 

In [None]:
topic_modellor = TopicModellor(message_df, llm_invoker)
topic_df = topic_modellor.topic_df

In [None]:
topics_to_describe = topic_df[topic_df["topic_id"] != -1].groupby("topic_id").filter(lambda x: len(x) >= 5)

In [None]:
topic_descriptions = topic_modellor.get_topic_descriptions(topics_to_describe, llm_invoker)[["topic_id", "description"]]

In [None]:
checkpointer.save("topic_descriptions", topic_descriptions)

In [None]:
message_df = topic_df[["message_id", "topic_id"]].merge(message_df, on="message_id")
topics_df = topic_df.merge(topic_descriptions, on="topic_id")[["topic_id", "description"]]

In [None]:
word_frequencies = topic_modellor.get_topic_word_frequencies(topic_df)[["topic_id", "word", "frequency"]]

In [None]:
checkpointer.save("topics", topic_df)
checkpointer.save("word_frequencies", word_frequencies)
checkpointer.save("topic_messages", message_df)

Top 10 Clusters, their Descriptions, and their Sizes

In [None]:
topics_df.head(10)

#### Message Classification

In [None]:
class_df = classify_categories(message_df)
checkpointer.save("classification", class_df)

#### Product Classification

In [None]:
product_df = classify_products(message_df)
checkpointer.save("products", product_df)

#### Named Entity Recognition

In [None]:
entities_df = extract_entities_from_messages(message_df, llm_invoker)
checkpointer.save("entities", entities_df)

#### Email Summarization

In [None]:
summary_df = summarize_messages(message_df, llm_invoker)
checkpointer.save("summaries", summary_df)

### Final DataFrames

Separate list-like columns into new dataframes

In [None]:
def create_address_df(df: pd.DataFrame) -> pd.DataFrame:
    def split_addresses(addresses):
        return addresses.split(",") if addresses else []

    # Explode each address type into separate rows
    from_df = pd.DataFrame({
        "message_id": df["message_id"],
        "address_type": "from",
        "address": df["from_address"]
    })

    to_df = df[["message_id", "to_address"]].assign(address_type="to")
    to_df = to_df.explode("to_address").rename(columns={"to_address": "address"})

    cc_df = df[["message_id", "cc_address"]].assign(address_type="cc")
    cc_df = cc_df.explode("cc_address").rename(columns={"cc_address": "address"})

    bcc_df = df[["message_id", "bcc_address"]].assign(address_type="bcc")
    bcc_df = bcc_df.explode("bcc_address").rename(columns={"bcc_address": "address"})

    # Combine all address types into a single dataframe
    address_df = pd.concat([from_df, to_df, cc_df, bcc_df], ignore_index=True)

    return address_df

In [None]:
def create_reference_df(df: pd.DataFrame) -> pd.DataFrame:
    return df[["message_id", "references"]].explode("references").rename(columns={"references": "reference_message_id"})

In [None]:
def create_domain_df(df: pd.DataFrame) -> pd.DataFrame:
    return df[["message_id", "domain"]].explode("domain")

In [None]:
address_df = create_address_df(message_df)
reference_df = create_reference_df(message_df)
domain_df = create_domain_df(message_df)

checkpointer.save("addresses", address_df)
checkpointer.save("references", reference_df)
checkpointer.save("domains", domain_df)

### Exporting Dataframes

In [None]:
message_df.to_csv(config.output_directory + f"/messages_{DATE}.csv", index=False)
address_df.to_csv(config.output_directory + f"/addresses_{DATE}.csv", index=False)
reference_df.to_csv(config.output_directory + f"/references_{DATE}.csv", index=False)
domain_df.to_csv(config.output_directory + f"/domains_{DATE}.csv", index=False)
word_frequencies.to_csv(config.output_directory + f"/word_frequencies_{DATE}.csv", index=False)
topics_df.to_csv(config.output_directory + f"/topics_{DATE}.csv", index=False)
class_df.to_csv(config.output_directory + f"/classification_{DATE}.csv", index=False)
product_df.to_csv(config.output_directory + f"/products_{DATE}.csv", index=False)
entities_df.to_csv(config.output_directory + f"/entities_{DATE}.csv", index=False)
summary_df.to_csv(config.output_directory + f"/summaries_{DATE}.csv", index=False)

### Load

In [None]:
loader.load_dataframe(message_df, "messages")
loader.load_dataframe(address_df, "addresses")
loader.load_dataframe(reference_df, "references")
loader.load_dataframe(domain_df, "domains")
loader.load_dataframe(word_frequencies, "word_frequencies")
loader.load_dataframe(topics_df, "topics")
loader.load_dataframe(class_df, "classifications")
loader.load_dataframe(product_df, "products")
loader.load_dataframe(entities_df, "entities")
loader.load_dataframe(summary_df, "summaries")

#### message_df:
    - message_id
    - topic_id
    - is_spam
    - subject
    - subject_prefix
    - submit_time
    - delivery_time
    - html_body
    - plain_text_body
    - from_name
    - previous_message_id
    - first_in_thread
    - num_previous_messages
    - thread_id
    - sender_domain
    - is_internal
    - clean_text
    - response_time
    - language

### address_df:
    - message_id
    - address_type
    - address

### reference_df:
    - message_id
    - reference_message_id

### domain_df:
    - message_id
    - domain
    
#### word_frequencies
    - topic_id
    - word
    - frequency

#### topics_df
    - topic_id
    - topic_description

#### class_df
    - message_id
    - category

#### product_df
    - message_id
    - product

#### entities_df
    - message_id
    - entity_type
    - entity_value

#### summary_df
    - message_id
    - summary