In [28]:
import os
import re
import pandas as pd
import numpy as np
import tqdm
from bs4 import BeautifulSoup
from collections import defaultdict
from glob import glob
from nltk.corpus import stopwords
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.pipeline import make_pipeline


class TextAnalysis:
    def __init__(self, data_path, scores_path, train_dataset_path):
        self.stop_words = set(stopwords.words('english'))
        self.data_path = data_path
        self.scores_path = scores_path
        self.train_dataset_path = train_dataset_path
        self.df, self.code2convos = self._load_and_process_data()
        self.labeled_data_df, self.clf, self.vectorizer = self._train_naive_bayes_classifier()
        self.models, self.evaluation = self._train_models_per_question()

    def _remove_stopwords(self, tokens):
        return [token for token in tokens if token.lower() not in self.stop_words]

    def _load_and_process_data(self):
        code2convos = dict()

        pbar = tqdm.tqdm(sorted(list(glob(self.data_path))))
        for path in pbar:
            # print(Path.cwd() / path)
            file_code = os.path.basename(path).split(".")[0]
            with open(path, "r", encoding="latin1") as fh:
                    
                # get the file id to use it as key later on
                fid = os.path.basename(path).split(".")[0]

                # read the html file
                html_page = fh.read()

                # parse the html file with bs4 so we can extract needed stuff
                soup = BeautifulSoup(html_page, "html.parser")

                # grab the conversations with the data-testid pattern
                data_test_id_pattern = re.compile(r"conversation-turn-[0-9]+")
                conversations = soup.find_all("div", attrs={"data-testid": data_test_id_pattern})

                convo_texts = []

                for i, convo in enumerate(conversations):
                    convo = convo.find_all("div", attrs={"data-message-author-role":re.compile( r"[user|assistant]") })
                    if len(convo) > 0:
                        role = convo[0].get("data-message-author-role")
                        convo_texts.append({
                                "role" : role,
                                "text" : convo[0].text
                            }
                        )
                        
                code2convos[file_code] = convo_texts

        prompts = []
        answers = []
        code2prompts = defaultdict(list)
        code2answers = defaultdict(list)
        for code , convos in code2convos.items():
            user_prompts = []
            for conv in convos:
                if conv["role"] == "user":
                    prompts.append(conv["text"].lower())
                    user_prompts.append(conv["text"].lower()) # Adding the lower case version of the prompt
                else:
                    answers.append(conv["text"].lower())
                    code2answers[code].append(conv["text"].lower()) # Adding the lower case version of the answer

            code2prompts[code] = user_prompts


        # mapping prompts to answers
        code2prompt_answer_pairs = defaultdict(list)

        for code in code2convos:
            for prompt, answer in zip(code2prompts[code], code2answers[code]):
                code2prompt_answer_pairs[code].append((prompt, answer))


        code2prompt_answer_pairs["0031c86e-81f4-4eef-9e0e-28037abf9883"][0]

        # Converting the dictionary to a DataFrame
        refactored_data = []
        for code, pairs in code2prompt_answer_pairs.items():
            vectorized_pairs = [(prompt.split(), answer.split()) for prompt, answer in pairs]
            refactored_data.append({'code': code, 'prompt_answer_pairs': vectorized_pairs})

        df = pd.DataFrame(refactored_data)


        # reading the scores
        scores = pd.read_csv("data/scores.csv", sep=",")
        scores["code"] = scores["code"].apply(lambda x: x.strip())

        # selecting the columns we need and we care
        scores = scores[["code", "grade"]]

        # join the scores with the df
        df = df.merge(scores, on="code")
        df = df.sort_values(by=["grade"], ascending=False)


        return df, code2convos

    def _train_naive_bayes_classifier(self):
        labeled_data_df = pd.read_csv(self.train_dataset_path, sep="\t")
        labeled_data_df['prompt'] = labeled_data_df['prompt'].str.lower()

        X_train, X_test, y_train, y_test = train_test_split(
            labeled_data_df['prompt'],
            labeled_data_df['related_question'],
            test_size=0.2,
            random_state=42
        )

        vectorizer = TfidfVectorizer()
        X_train = vectorizer.fit_transform(X_train)
        X_test = vectorizer.transform(X_test)

        clf = MultinomialNB()
        clf.fit(X_train, y_train)

        return labeled_data_df, clf, vectorizer
    
    def _train_models_per_question(self):
        new_df = pd.DataFrame(columns=["prompt", "which_question", "grade"])

        for row in self.df.itertuples():
            for prompt, answer in row.prompt_answer_pairs:
                #removing stop words
                prompt = self._remove_stopwords(prompt)
                #convert prompt to string
                promptStr = " ".join(prompt)
                #add it to a new a new df without using append
                qNo = self._predict_question_number(promptStr)

                new_df.loc[len(new_df.index)] = [promptStr, qNo, row.grade]


        # replace Nan values with the mean of the column for column grade
        new_df["grade"].fillna((new_df["grade"].mean()), inplace=True)
        self.df = new_df.copy()


        models = {}
        evaluation = {}
        for question_number in new_df['which_question'].unique():
            question_data = new_df[new_df['which_question'] == question_number]
            X = question_data['prompt']
            y = question_data['grade']
            X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
            pipeline = make_pipeline(TfidfVectorizer(), LinearRegression())
            pipeline.fit(X_train, y_train)
            y_pred = pipeline.predict(X_test)
            mse = mean_squared_error(y_test, y_pred)
            r2 = r2_score(y_test, y_pred)
            models[question_number] = pipeline
            evaluation[question_number] = {'MSE': mse, 'R2': r2}

        return models, evaluation
    
    def _predict_question_number(self, prompt):
        prompt_vect = self.vectorizer.transform([prompt.lower()])
        return self.clf.predict(prompt_vect)[0]
    

    def predict_with_similarity_adjustment(self, prompt):
        prompt = self._remove_stopwords(prompt.split())
        prompt = " ".join(prompt)
        question_number = self._predict_question_number(prompt)
        print(f"Predicted question number: {question_number}")
        pipeline = self.models.get(question_number)
        if not pipeline:
            raise ValueError(f"No model found for question number {question_number}.")
        vectorizer = pipeline.named_steps['tfidfvectorizer']
        model = pipeline.named_steps['linearregression']
        prompt_vector = vectorizer.transform([prompt])
        train_vectors = vectorizer.transform(self.df[self.df['which_question'] == question_number]['prompt'])
        similarities = cosine_similarity(prompt_vector, train_vectors)
        max_similarity = np.max(similarities)
        predicted_score = model.predict(prompt_vector)[0]
        adjusted_score = predicted_score * max_similarity
        return adjusted_score, max_similarity


In [32]:

ta = TextAnalysis("data/html/*.html", "data/scores.csv", "data/labeled_data/train_dataset.csv")

100%|██████████| 127/127 [00:05<00:00, 21.63it/s]


In [34]:
new_prompt = 'Tune Hyperparameters'
predicted_score, max_similarity = ta.predict_with_similarity_adjustment(new_prompt)
print("Predicted Score:", predicted_score, "Max Similarity:", max_similarity)

Predicted question number: 5
Predicted Score: 90.92055817080906 Max Similarity: 1.0000000000000002
