In [31]:
import re
import pandas as pd
import random
import pickle
from pathlib import Path
from team_comm_tools.utils.check_embeddings import load_liwc_dict, sort_words
from team_comm_tools.features.lexical_features_v2 import get_liwc_count
from team_comm_tools import FeatureBuilder

# Functions

In [32]:
emojis_to_preserve = {"(:", "(;", "):", "/:", ":(", ":)", ":/", ";)"} 
alphabet = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"]
filler = ["Lorem", "ipsum", "dolor", "amet", "consectetur", "adipiscing", "sed", "euismod", "tempor"]
connectors = [
    ",",  # Comma
    ".",  # Period
    ";",  # Semicolon
    ":",  # Colon
    "-",  # Hyphen
    "–",  # En dash
    "—",  # Em dash
    "!",  # Exclamation mark
    "?",  # Question mark
    "(",  # Open parenthesis
    ")",  # Close parenthesis
    "[",  # Open bracket
    "]",  # Close bracket
    "{",  # Open brace
    "}",  # Close brace
    "\"",  # Double quotation mark
    "'",  # Single quotation mark (apostrophe)
    "...",  # Ellipsis
    "/",  # Slash
    "\\",  # Backslash
    "|",  # Vertical bar (pipe)
    "+",  # Plus sign
    "=",  # Equal sign
    "<",  # Less than
    ">",  # Greater than
    # "*",  # Asterisk
    "^",  # Caret
    "~",  # Tilde
    "$",  # Dollar sign
    "€",  # Euro sign
    "¥",  # Yen sign
    "£",  # Pound sign
    "#",  # Hash/Pound
    "&",  # Ampersand
    "%",  # Percent
    # "_",  # Underscore: doesn't work with word boundaries \b
]

def fill_wordlist(selected_words: list):
    result = []
    for word in selected_words:
        word = word.strip()
        # for strings in selected_words that end with *, append a random number of letters to that string
        if word.endswith("*"):
            word = word[:-1] #+ ''.join(random.sample(alphabet, random.randint(1, 5)))
        result.append(word)
        # randomly add filler words
        num_fillers = random.randint(1, 2)
        fillers = random.choices(filler, k=num_fillers)
        result.extend(fillers)
    # randomly add connectors
    output = ""
    for i, word in enumerate(result):
        output += word
        if i < len(result) - 1:  # Only add a connector if not the last word
            output += random.choice(connectors)
            output += " "

    return output

def extract_words_from_regex(pattern):
    words = []
    # Remove word boundaries (\b), negative lookbehind ((?<!\w)), and lookahead ((?!\w))
    cleaned_pattern = re.sub(r"\\b|\(\?<!\\w\)|\(\?!\\w\)", "", pattern)

    # Split by '|' (OR operator)
    for segment in re.split(r"\|", cleaned_pattern):
        segment = segment.strip()
        segment = segment.replace(r"\(", "(").replace(r"\)", ")")
        segment_with_asterisk = re.sub(r"\\S\*|\.\*", "*", segment)
        words.append(segment_with_asterisk)
    return words

# Load LIWC dictionary

In [33]:
version = '2007' 
# version = '2015'

root_dir = Path().resolve().parent.parent
if version == '2015':
    custom_liwc_dictionary_path = root_dir / 'src/team_comm_tools/features/lexicons/liwc_2015.dic'
    with open(custom_liwc_dictionary_path, 'r', encoding='utf-8-sig') as file:
        dicText = file.read()
        lexicons_dict = load_liwc_dict(dicText)
elif version == '2007':
    lexicon_pkl_file_path = root_dir / "src/team_comm_tools/features/assets/lexicons_dict.pkl"
    with open(lexicon_pkl_file_path, "rb") as lexicons_pickle_file:
        lexicons_dict = pickle.load(lexicons_pickle_file)
    lexicons_dict
else:
    raise ValueError("Invalid version. Please choose either 2007 or 2015.")

In [None]:
### regenerate LIWC lexicons pkl file

# def read_in_lexicons(directory, lexicons_dict):
#     for file in directory.iterdir():
#         if file.is_file() and not file.name.startswith("."):
#             with open(file, encoding = "mac_roman") as lexicons:
#                 clean_name = re.sub('.txt', '', file.name)
#                 lexicons_dict[clean_name] = sort_words(lexicons)

# lexicons_dict = {}
# read_in_lexicons(root_dir / "src/team_comm_tools/features/lexicons/liwc_lexicons/", lexicons_dict) # Reads in LIWC Lexicons
# read_in_lexicons(root_dir / "src/team_comm_tools/features/lexicons/other_lexicons/", lexicons_dict) # Reads in Other Lexicons

# with open("lexicons_dict.pkl", "wb") as lexicons_pickle_file:
#           pickle.dump(lexicons_dict, lexicons_pickle_file)

# Test

## 1. One category at a time

In [53]:
min_words, max_words = 3, 30
num_tests = 10
one_cat_test_lst = []
for i in range(num_tests):
    error = False
    for cat, regex in lexicons_dict.items():
        wordList = extract_words_from_regex(regex)
        expected_value = random.randint(min_words, min(max_words, len(wordList)))
        selected_words = random.sample(wordList, expected_value)
        test_string = fill_wordlist(selected_words)
        count = get_liwc_count(lexicons_dict[cat], test_string)
        if count != expected_value:
            print(' ' * 100, end="\r")
            print(f"{cat} ERROR")
            print(f"{test_string}")
            print(f"expected_value: {expected_value}, count: {count}")
            error = True
            break
        else:
            print(' ' * 100, end="\r")
            print(f"{cat} SUCCESS", end="\r", flush=True)
        one_cat_test_lst.append({
            "conversation_num": f"{i+1}_{cat}",
            "speaker_nickname": cat,
            "message": test_string,
            "expected_column": f"{cat}_lexical_wordcount",
            "expected_value": expected_value
        })
    if error:
        break
else:
    print(' ' * 100, end="\r")
    print("All tests passed!")

one_cat_test_df = pd.DataFrame(one_cat_test_lst)

All tests passed!                                                                                   


## 2. Mixed category

In [54]:
min_words, max_words = 3, 30
num_tests = 10
mix_cat_test_lst = []
for i in range(num_tests):
    wordList = []
    for category, pattern in lexicons_dict.items():
        wordList.extend(extract_words_from_regex(pattern))
    expected_value = random.randint(min_words, min(max_words, len(wordList)))
    selected_words = random.sample(wordList, expected_value)
    # ground truth
    selected_words_truth = []
    for word in selected_words:
        word = word.strip()
        if word.endswith("*"):
            word = word[:-1]
        selected_words_truth.append(word)
    test_string_truth = " ".join(selected_words_truth)
    results = {category: re.findall(pattern, test_string_truth) for category, pattern in lexicons_dict.items()}
    
    # test string: add filler and connectors
    test_string = fill_wordlist(selected_words)
    test_results = {category: re.findall(pattern, test_string) for category, pattern in lexicons_dict.items()}
    # Compare results
    error = False
    for cat, matches in results.items():
        expected, found = len(matches), len(test_results[cat])
        if expected != found:
            print(f"{cat} ERROR")
            print(f"{test_string}")
            print(f"expected_value: {expected}, count: {found}")
            error = True
            break
        else:
            print(' ' * 100, end="\r")
            print(f"{cat} SUCCESS", end="\r", flush=True)
        mix_cat_test_lst.append({
            "conversation_num": f"{i+1}_mix",
            "speaker_nickname": cat,
            "message": test_string,
            "expected_column": f"{cat}_lexical_wordcount",
            "expected_value": expected
        })
    if error:
        break
else:
    print(' ' * 100, end="\r")
    print("All tests passed!")

mix_cat_test_df = pd.DataFrame(mix_cat_test_lst)

All tests passed!                                                                                   


## Append to current test

In [60]:
liwc_test_df = pd.concat([one_cat_test_df, mix_cat_test_df], ignore_index=True)

In [61]:
test_chat_path = root_dir / 'tests/data/cleaned_data/test_chat_level.csv'
test_chat_df = pd.read_csv(test_chat_path)

In [None]:
test_chat_df = test_chat_df[~test_chat_df['expected_column'].str.contains('_lexical_wordcount')]
test_chat_df = pd.concat([test_chat_df, liwc_test_df], ignore_index=True)
# test_chat_df.to_csv(test_chat_path, index=False)