In [1]:
import re

import json
from typing import List, Dict, Tuple, Any

import numpy as np
import numpy.typing as npt


# Define Constants


In [2]:
######################################################
### Constants                                      ###
######################################################
# Base Paths
INPUT_PATH = "./data"
MODEL_PATH = "./model"

# Model File names
VANILLA_MODEL_FILENAME = "vanillamodel.txt"
AVERAGED_MODEL_FILENAME = "averagedmodel.txt"

# Class Identifiers
TRUTHFUL = "True"
DECEPTIVE = "Fake"
POSITIVE = "Pos"
NEGATIVE = "Neg"

# File paths
TRAIN_FILE_PATH = f"{INPUT_PATH}/train-labeled.txt"
CLEANED_DATA_FILE_PATH = f"{INPUT_PATH}/cleaned-data.txt"
PREPROCESSED_DATA_FILE_PATH = f"{INPUT_PATH}/preprocessed-data.txt"

VANILLA_MODEL_FILE_PATH = f"{MODEL_PATH}/{VANILLA_MODEL_FILENAME}"
AVERAGED_MODEL_FILE_PATH = f"{MODEL_PATH}/{AVERAGED_MODEL_FILENAME}"

DEV_FILE_PATH = f"{INPUT_PATH}/dev-text.txt"
DEV_TAGGED_FILE_PATH = f"{INPUT_PATH}/dev-key.txt"

RANDOM_SEED = 42

DATA_COL = 3
SENTIMENT_TARGET_COL = 2
TRUTHFULNESS_TARGET_COL = 1


In [3]:
rng = np.random.default_rng(seed=RANDOM_SEED)
np.random.seed(RANDOM_SEED)


# Helper Functions


In [4]:
line = "07Zfn0z Fake Pos If you're looking for an elegant hotel in downtown Chicago, you have to stay here. The Ambassador East Hotel has very comfortable and beautiful large rooms and is like a home away from home. The perfect place for a business person, and if you have a small pet you can bring them too! I would give this place four stars and would definitely stay here again.\n"


In [5]:
input_regex = re.compile("(\w*) (\w*) (\w*) (.*)\n?")


In [6]:
re.match(input_regex, line).groups()


('07Zfn0z',
 'Fake',
 'Pos',
 "If you're looking for an elegant hotel in downtown Chicago, you have to stay here. The Ambassador East Hotel has very comfortable and beautiful large rooms and is like a home away from home. The perfect place for a business person, and if you have a small pet you can bring them too! I would give this place four stars and would definitely stay here again.")

In [7]:
data = []
data.append(re.match(input_regex, line).groups())
data


[('07Zfn0z',
  'Fake',
  'Pos',
  "If you're looking for an elegant hotel in downtown Chicago, you have to stay here. The Ambassador East Hotel has very comfortable and beautiful large rooms and is like a home away from home. The perfect place for a business person, and if you have a small pet you can bring them too! I would give this place four stars and would definitely stay here again.")]

In [8]:
np.array(data)[0, 1]


'Fake'

In [9]:
# Load Data
def load_data(input_file_path: str, type: str = "TRAIN") -> npt.NDArray:
    input_data = list()
    regex = "(\w*) (\w*) (\w*) (.*)\n?"
    if type == "TEST":
        regex = "(\w*) (.*)\n?"
    elif type == "LABELS":
        regex = "(\w*) (\w*) (\w*)\n?"
    input_regex = re.compile(regex)
    with open(input_file_path, mode="r") as input_file:
        for line in input_file:
            input_data.append(re.match(input_regex, line).groups())
    return np.array(input_data)


In [10]:
# Store Data
def store_data(date_file_path: str, data: npt.NDArray) -> None:
    with open(date_file_path, mode="w") as data_file:
        for row in data:
            data_file.write(f"{row[0]} {row[1]} {row[2]} {row[3]}\n")


In [11]:
# Store Model
def store_model(model_file_path: str, model_data: Any) -> None:
    # TODO: Need to check model_data
    with open(model_file_path, mode="w") as model_file:
        json.dump(model_data, model_file, ensure_ascii=False)


In [12]:
# Load Model
def load_model(model_file_path: str) -> npt.NDArray:
    # TODO: Extract data from JSON and return that only
    with open(model_file_path, mode="r") as model_file:
        model_data = json.load(model_file)
    return model_data


In [13]:
# Store Predictions
def store_predictions(output_file_path: str, predictions: List[Tuple[str, str, str]]) -> None:
    with open(output_file_path, mode="w") as output_file:
        for prediction in predictions:
            output_file.write(f"{prediction[0]} {prediction[1]} {prediction[2]}\n")


In [14]:
# Calculate Scores
def calculate_scores(y_true, y_pred, title: str):
    from sklearn.metrics import classification_report

    print(f"------------------------ {title} ------------------------")
    print(classification_report(y_true, y_pred))
    print("---------------------------------------------------------")


# Load Data


In [15]:
data = load_data(TRAIN_FILE_PATH, type="TRAIN")


# Data Cleaning


In [16]:
# Convert all reviews to lower case (optional according to study)
def to_lower(data: npt.NDArray):
    n_data = data.shape[0]
    result = data.copy()
    for i in range(n_data):
        result[i] = result[i].lower()
    return result


In [17]:
def remove_html_encodings(data: npt.NDArray):
    import re

    n_data = data.shape[0]
    result = data.copy()
    for i in range(n_data):
        result[i] = re.sub(r"&#\d+;", " ", result[i])
    return result


In [18]:
def remove_html_tags(data: npt.NDArray):
    import re

    n_data = data.shape[0]
    result = data.copy()
    for i in range(n_data):
        result[i] = re.sub(r"<[a-zA-Z]+\s?/?>", "", result[i])
    return result


In [19]:
def remove_url(data: npt.NDArray):
    import re

    n_data = data.shape[0]
    result = data.copy()
    for i in range(n_data):
        result[i] = re.sub(r"https?://([\w\-\._]+){2,}/[\w\-\.\-/=\+_\?]+", "", result[i])
    return result


In [20]:
def remove_html_and_url(data):
    """Function to remove
             1. HTML encodings
             2. HTML tags (both closed and open)
             3. URLs

    Args:
        data (npt.NDArray): A Numpy Array of type string

    Returns:
        _type_: npt.NDArray
    """
    import re

    n_data = data.shape[0]
    result = data.copy()
    for i in range(n_data):
        # Remove HTML encodings
        result[i] = re.sub(r"&#\d+;", "", result[i])

        # Remove HTML tags (both open and closed)
        result[i] = re.sub(r"<[a-zA-Z]+\s?/?>", "", result[i])

        # Remove URLs
        result[i] = re.sub(r"https?://([\w\-\._]+){2,}/[\w\-\.\-/=\+_\?]+", "", result[i])

    return result


In [21]:
def replace_digits_with_tag(data: npt.NDArray):
    import re

    n_data = data.shape[0]
    result = data.copy()
    for i in range(n_data):
        result[i] = re.sub(r"\d+", " NUM ", result[i])
    return result


In [22]:
# Remove non-alphabetical characters
def remove_non_alpha_characters(data: npt.NDArray):
    import re

    n_data = data.shape[0]
    result = data.copy()
    for i in range(n_data):
        result[i] = re.sub(r"_+|\\|[^a-zA-Z0-9\s]", " ", result[i])
    return result


In [23]:
# Remove extra spaces
def remove_extra_spaces(data: npt.NDArray):
    import re

    n_data = data.shape[0]
    result = data.copy()
    for i in range(n_data):
        result[i] = re.sub(r"^\s*|\s\s*", " ", result[i])
    return result


In [24]:
# Expanding contractions
def fix_contractions(data: npt.NDArray):
    # TODO: Replace with custom implementation
    import contractions

    def contraction_fixer(txt: str):
        return " ".join([contractions.fix(word) for word in txt.split()])

    n_data = data.shape[0]
    result = data.copy()
    for i in range(n_data):
        result[i] = contraction_fixer(result[i])
    return result


In [25]:
# A dictionary containing the columns and a list of functions to perform on it in order
data_cleaning_pipeline = {
    DATA_COL: [
        to_lower,
        remove_html_encodings,
        remove_html_tags,
        remove_url,
        fix_contractions,
        remove_non_alpha_characters,
        remove_extra_spaces,
    ]
}

cleaned_data = data.copy()

# Process all the cleaning instructions
for col, pipeline in data_cleaning_pipeline.items():
    # Get the column to perform cleaning on
    temp_data = cleaned_data[:, col].copy()

    # Perform all the cleaning functions sequencially
    for func in pipeline:
        print(f"Starting: {func.__name__}")
        temp_data = func(temp_data)
        print(f"Ended: {func.__name__}")

    # Replace the old column with cleaned one.
    cleaned_data[:, col] = temp_data.copy()


Starting: to_lower
Ended: to_lower
Starting: remove_html_encodings
Ended: remove_html_encodings
Starting: remove_html_tags
Ended: remove_html_tags
Starting: remove_url
Ended: remove_url
Starting: fix_contractions
Ended: fix_contractions
Starting: remove_non_alpha_characters
Ended: remove_non_alpha_characters
Starting: remove_extra_spaces
Ended: remove_extra_spaces


In [26]:
store_data(CLEANED_DATA_FILE_PATH, cleaned_data)


# Data Preprocessing


In [27]:
# def tokenize(data: pd.Series):
#     from nltk.tokenize import word_tokenize

#     nltk.download("punkt")

#     return data.apply(word_tokenize)


In [28]:
# def remove_stopwords(data: pd.Series):
#     """Remove stop words using the NLTK stopwords dictionary

#     Args:
#         string (str): a document

#     Returns:
#         str: a document with stopwords removed
#     """
#     from nltk.corpus import stopwords

#     nltk.download("stopwords")

#     stopwords = set(stopwords.words())

#     def remover(word_list: List[str], stopwords: Set[str]):
#         return [word for word in word_list if not word in stopwords]

#     return data.apply(lambda word_list: remover(word_list, stopwords))


In [29]:
# def lemmatize(data: pd.Series, consider_pos_tag: bool = True):
#     from nltk.corpus import wordnet
#     from nltk.stem import WordNetLemmatizer

#     nltk.download("omw-1.4")

#     # POS tagging
#     def perform_nltk_pos_tag(data: pd.Series):
#         from nltk import pos_tag

#         nltk.download("averaged_perceptron_tagger")

#         return data.apply(pos_tag)

#     # Convert POS tag to wordnet pos tags
#     def wordnet_pos_tagger(tag: str):
#         if tag.startswith("J"):
#             return wordnet.ADJ
#         elif tag.startswith("V"):
#             return wordnet.VERB
#         elif tag.startswith("N"):
#             return wordnet.NOUN
#         elif tag.startswith("R"):
#             return wordnet.ADV
#         else:
#             return None

#     lemmatizer = WordNetLemmatizer()
#     lemmatized = list()

#     if consider_pos_tag:
#         pos_tagged_data = data.copy()
#         pos_tagged_data = perform_nltk_pos_tag(data)

#         for row in pos_tagged_data:

#             lemmatized_row = list()

#             if consider_pos_tag:
#                 for word, tag in row:
#                     wordnet_pos_tag = wordnet_pos_tagger(tag)

#                     if wordnet_pos_tag is None:
#                         lemmatized_row.append(word)
#                     else:
#                         result = lemmatizer.lemmatize(word, wordnet_pos_tag)
#                         lemmatized_row.append(lemmatizer.lemmatize(word, wordnet_pos_tag))

#             lemmatized.append(lemmatized_row)
#     else:
#         for row in data:
#             lemmatized_row = list()

#             for word in row:
#                 lemmatized_row.append(lemmatizer.lemmatize(word))

#             lemmatized.append(lemmatized_row)

#     return pd.Series(lemmatized)


In [30]:
# Concatenate lemmatized sentences back into one sentence
# def concatenate(data: pd.Series):
#     return data.apply(lambda words: " ".join(words))


In [31]:
# preprocessing_pipeline = {DATA_COL: [tokenize, lemmatize, concatenate]}

# # Run the pipeline
# preprocessed_data = cleaned_data.copy()

# # Process all the cleaning instructions
# for col, pipeline in preprocessing_pipeline.items():
#     # Get the column to perform cleaning on
#     temp_data = preprocessed_data[col].copy()

#     # Perform all the cleaning functions sequencially
#     for func in pipeline:
#         print(f"Starting: {func.__name__}")

#         if func.__name__ == "lemmatize":
#             temp_data = func(temp_data, consider_pos_tag=True)
#         else:
#             temp_data = func(temp_data)

#         print(f"Ended: {func.__name__}")

#     # Replace the old column with cleaned one.
#     preprocessed_data[col] = temp_data.copy()


# Feature Extraction


In [32]:
class TfIdf:
    # TODO: Implement
    def __init__(self) -> None:
        pass

    def fit(self):
        pass

    def transform(self):
        pass


In [35]:
cleaned_data = load_data(CLEANED_DATA_FILE_PATH)


In [None]:
# TODO: Invoke TF-IDF


# Perceptron Models


In [None]:
class Perceptron:
    # TODO: Implement
    def __init__(self, n_input, n_features) -> None:
        self.weights = np.zeroes(shape=(n_input, n_features))
        self.biases = np.zeroes(shape=(n_input,))

    def fit(self, n_iterations: int, tolerance: float, early_stopping: bool = True):
        pass

    def predict():
        pass

    def export(
        self,
    ):
        return {"weights": self.weights, "biases": self.biases}


In [None]:
class AveragePerceptron:
    # TODO: Implement
    def __init__(self, n_input, n_features) -> None:
        self.weights = np.zeroes(shape=(n_input, n_features))
        self.biases = np.zeroes(shape=(n_input,))
        self.cache = {"weights": np.zeroes(shape=(n_input, n_features)), "biases": np.zeroes(shape=(n_input,))}

    def fit(self, n_iterations: int, tolerance: float, early_stopping: bool = True):
        pass

    def predict():
        pass

    def export(
        self,
    ):
        return {"weights": self.weights, "biases": self.biases}


## Learn


## Classify
