In [None]:
# !pip install hmmlearn
# !pip install scikit-learn

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from hmmlearn import hmm
import os
import glob
import pickle

In [None]:
def getFreqDictPerScore(directory_path):
    score_dict = {}
    for root, dirs, files in os.walk(directory_path):
        if not files:
            continue
        score = root.split("/")[1]
        freq_dict = {}
        for file in files:
            file_path = os.path.join(root, file)
            with open(file_path, 'r') as file:
                content = file.read()
                seq = content.split(" ")
                for s in seq:
                    if s not in freq_dict:
                        freq_dict[s] = 1
                    else:
                        freq_dict[s] += 1
        score_dict[score] = freq_dict
    return score_dict

score_dict = getFreqDictPerScore("essay_hmm")
for s, freq in score_dict.items():
    print(s, freq)

In [None]:
def getFreqDict(directory_path):
    freq_dict = {}
    for root, dirs, files in os.walk(directory_path):
        for file in files:
            file_path = os.path.join(root, file)
            with open(file_path, 'r') as file:
                content = file.read()
                seq = content.split(" ")
                for s in seq:
                    if s not in freq_dict:
                        freq_dict[s] = 1
                    else:
                        freq_dict[s] += 1
    return freq_dict

freq_dict = getFreqDict("essay_hmm")
filtered_dict = {key: value for key, value in freq_dict.items() if value >= 10965}
filtered_dict

In [None]:
##reading from essay_hmm and generating a word dictionary for str -> int
def setWordDict(directory_path, word_dict):
    for root, dirs, files in os.walk(directory_path):
        for file in files:
            file_path = os.path.join(root, file)
            with open(file_path, 'r') as file:
                content = file.read()
                seq = content.split(" ")
                for s in seq:
                    if s not in word_dict and s in filtered_dict:
                        word_dict[s] = len(word_dict) 
    return word_dict
word_dict = {}
word_dict = setWordDict("essay_hmm", word_dict)
print(len(word_dict),word_dict)

In [None]:
##convert the essay into int and get the max seq number
def convertStrToIntEssay(directory_path, word_dict):
    max_seq, max_file = 0, ""
    essay_inputs = []
    for root, dirs, files in os.walk(directory_path):
        for file in files:
            file_path = os.path.join(root, file)
            essay_id = file.split(".")[0]
            with open(file_path, 'r') as file:
                content = file.read()
                seq = content.split(" ")
                essay_int = [word_dict[s] for s in seq if s in word_dict and s in filtered_dict]
                # max_seq = max(max_seq, len(essay_int))
                if len(essay_int) > max_seq:
                    max_seq, max_file = len(essay_int), file
            essay_inputs.append((essay_id, essay_int))
    return max_seq, max_file, essay_inputs

##max_13257
max_seq, max_file, essay_inputs = convertStrToIntEssay("essay_hmm", word_dict)
##list of tuple (essay_id, seq)
print(max_seq,max_file, len(essay_inputs[0][1]))


In [None]:
##keep duplicating itself until it reaches the max length
##essay_inputs is in [(essay_id, seq)] form
def padding_essay(essay_inputs, max_seq):
    for i, (file_name, seq) in enumerate(essay_inputs):
        while len(seq) < max_seq:
            seq.extend(seq)
        seq = seq[0:max_seq]
        essay_inputs[i] = (file_name, seq)

padding_essay(essay_inputs, max_seq)
all(len(tup[1]) == max_seq for tup in essay_inputs)

In [None]:
##keep duplicating itself until it reaches the max length
##content is in "seqKseqK" string form
def convertStrToIntSet(file_path, word_dict):
    set_inputs = []
    with open(file_path, 'r') as file:
        content = file.read()
        seqs = content.split("K")
        # print(len(seqs))
        for seq in seqs:
            # print(seq)
            seq_int = []
            for s in seq.split(" "):
                if len(s) == 0 or s not in word_dict or s not in filtered_dict:
                    continue
                seq_int.append(word_dict[s])
            set_inputs.append(seq_int)
    return set_inputs

##2d array that contains seq 
score_set = []
##train input is a 2d array. First len is number of doc, second is sequence of doc
# for s in ["1.0", "1.5", "2.0", "2.5", "3.0", "4.5", "5.0", "5.5","3.5", "4.0", "0.5", "6.0"]:
for s in ["4.0"]:
    train_seqs = convertStrToIntSet("train/" + s + ".txt", word_dict)
    test_seqs = convertStrToIntSet("test/" + s + ".txt", word_dict)
    # print(len(train_seqs))
    score_set.append((s, train_seqs, test_seqs))

In [None]:
for s, train_seqs, test_seqs in score_set:
    print(s, len(train_seqs), len(test_seqs), len(train_seqs[0]), len(test_seqs[0]))

In [None]:
##keep duplicating itself until it reaches the max length
def padding_set(set_inputs, max_seq):
    for i, seq in enumerate(set_inputs):
        while len(seq) < max_seq:
            seq.extend(seq)
        seq = seq[0:max_seq]
        set_inputs[i] = seq
# def padding_set(set_inputs, max_seq):
#     for i, seq in enumerate(set_inputs):
#         seq = np.pad(seq, (0, max_seq - len(seq)), constant_values=len(word_dict))
#         set_inputs[i] = seq[0:1000]
##calling padding
for s, train_seqs, test_seqs in score_set:
    padding_set(train_seqs, max_seq)
    padding_set(test_seqs, max_seq)

##checking if all has same length
for s, train_seqs, test_seqs in score_set:
    print(s, len(train_seqs), len(test_seqs), all(len(seq) == max_seq for seq in train_seqs), all(len(seq) == max_seq for seq in test_seqs))


In [None]:
for s, train_seqs, test_seqs in score_set:
    print(s, len(train_seqs), len(test_seqs), len(train_seqs[0]), len(test_seqs[0]), np.array(train_seqs).shape)

In [None]:
for s, train_seqs, test_seqs in score_set[0:1]:
    print(np.unique(np.array(train_seqs)))

In [None]:
def findBestModel(n_fits, N, train_input, test_input, seed):
    train_input = train_input + test_input
    best_score = best_model = None
    for idx in range(n_fits):
        model = hmm.CategoricalHMM(
        n_components=N, random_state=seed, 
        init_params='se')  # don't init transition, set it below
    # we need to initialize with random transition matrix probabilities
    # because the default is an even likelihood transition
    # we know transitions are rare (otherwise the casino would get caught!)
    # so let's have an Dirichlet random prior with an alpha value of
    # (0.1, 0.9) to enforce our assumption transitions happen roughly 10%
    # of the time

    ##A
        epsilon = 1e-2
        transmat = np.full((N, N), 1 / N) + epsilon
        transmat /= transmat.sum(axis=1, keepdims=True)
        random_noise = np.random.randn(N, N) * 1e-2  # Adjust the scale of the noise as needed
        transmat += random_noise
        transmat /= transmat.sum(axis=1, keepdims=True)
        # print(transmat)
    # model.transmat_ = np.array([np.random.dirichlet([0.9, 0.1]),
    #                             np.random.dirichlet([0.1, 0.9])])
        model.transmat_ = transmat
        model.fit(train_input)
        score = model.score(train_input)
        print(f'{seed} Model-{s} #{idx}\tScore: {score}')
        if best_score is None or score > best_score:
            best_model = model
            best_score = score
    return best_score, best_model

n_fits = 1
N = 2
for s, train_input, test_input in score_set:
    lowest_best_score, lowest_best_model = findBestModel(n_fits, N, train_input, test_input, N)
    if not os.path.exists("model/" + s):
                os.makedirs("model/" + s)
    with open("model/" + s + "/" + s + "-" + str(N) + "-model" + str(lowest_best_score) + ".pkl", "wb") as file:
            pickle.dump(lowest_best_model, file)

In [None]:
print(lowest_best_model.transmat_)
print(lowest_best_model.emissionprob_)

In [None]:
import numpy as np
from hmmlearn import hmm

# For this example, we will model the stages of a conversation,
# where each sentence is "generated" with an underlying topic, "cat" or "dog"
states = ["cat", "dog"]
id2topic = dict(zip(range(len(states)), states))
# we are more likely to talk about cats first
start_probs = np.array([0.6, 0.4])

# For each topic, the probability of saying certain words can be modeled by
# a distribution over vocabulary associated with the categories

vocabulary = ["tail", "fetch", "mouse", "food"]
# if the topic is "cat", we are more likely to talk about "mouse"
# if the topic is "dog", we are more likely to talk about "fetch"
emission_probs = np.array([[0.25, 0.1, 0.4, 0.25],
                           [0.2, 0.5, 0.1, 0.2]])

# Also assume it's more likely to stay in a state than transition to the other
trans_mat = np.array([[0.8, 0.2], [0.2, 0.8]])


# Pretend that every sentence we speak only has a total of 5 words,
# i.e. we independently utter a word from the vocabulary 5 times per sentence
# we observe the following bag of words (BoW) for 8 sentences:
observations = [["tail", "mouse", "mouse", "food", "mouse"],
        ["food", "mouse", "mouse", "food", "mouse"],
        ["tail", "mouse", "mouse", "tail", "mouse"],
        ["food", "mouse", "food", "food", "tail"],
        ["tail", "fetch", "mouse", "food", "tail"],
        ["tail", "fetch", "fetch", "food", "fetch"],
        ["fetch", "fetch", "fetch", "food", "tail"],
        ["food", "mouse", "food", "food", "tail"],
        ["tail", "mouse", "mouse", "tail", "mouse"],
        ["fetch", "fetch", "fetch", "fetch", "fetch"]]

# Convert "sentences" to numbers:
vocab2id = dict(zip(vocabulary, range(len(vocabulary))))
def sentence2counts(sentence):
    ans = []
    for word, idx in vocab2id.items():
        count = sentence.count(word)
        ans.append(count)
    return ans

X = []
for sentence in observations:
    row = sentence2counts(sentence)
    X.append(row)

data = np.array(X, dtype=int)

# pretend this is repeated, so we have more data to learn from:
lengths = [len(X)]*5
sequences = np.tile(data, (5,1))
print(np.array(sequences).shape)

In [None]:
X1 = [[0.5], [1.0], [-1.0], [0.42], [0.24]]
X2 = [[2.4], [4.2], [0.5], [-0.24]]
X = np.concatenate([X1, X2])
X

In [None]:
import concurrent.futures
num_workers = 10
n_fits = 1
N = 5
with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
    for s, train_input, test_input in score_set:
        # arguments = [(n_fits, N, train_input, test_input)] * num_workers
        arguments = [(n_fits, N, train_input, test_input, i) for i in range(num_workers)]
        futures = [executor.submit(findBestModel, *arg) for arg in arguments]
        concurrent.futures.wait(futures)
        result = [future.result() for future in futures]
        print(result)
        best_result = min(result, key=lambda x: x[0])
        lowest_best_score, lowest_best_model = best_result
        if not os.path.exists("model/" + s):
                os.makedirs("model/" + s)
        with open("model/" + s + "/" + s + "-" + str(N) + "-model" + str(lowest_best_score) + ".pkl", "wb") as file:
            pickle.dump(lowest_best_model, file)

In [None]:
def generateHMMScoreOnEssay(model_path, essay_inputs):
    ##read model
    model_list = []
    scores = [ "0.5", "1.0", "1.5", "2.0", "2.5", "3.0","3.5", "4.0","4.5", "5.0", "5.5",  "6.0"]
    result = []
    for score in scores:
        model_file = glob.glob(os.path.join(model_path, score, '*.pkl'))[0]
        with open(model_file, "rb") as file:
            model = pickle.load(file)
            model_list.append((score,model))
    for i, (essay_id, essay_input) in enumerate(essay_inputs):
        hmm_result = [essay_id]
        for s, model in model_list:
            print(f"{i}-Training essay {essay_id} against model-{s}")
            hmm_score = model.score([essay_input])
            hmm_result.append(hmm_score)
            # hmm_result.append(score)
        result.append(hmm_result)
    return scores, result

scores, hmm_result = generateHMMScoreOnEssay("model",essay_inputs[0:10])
hmm_result_df = pd.DataFrame(hmm_result)
col = ['id'] + scores
hmm_result_df.columns = col

In [None]:
hmm_result_df.head(10)

In [None]:
hmm_result_df.to_csv("essay_hmm_reult.csv")

In [None]:
# ##reading from the text file for observation sequnce. Only using 4.0 for now

# def readInputFromFile(score):
#     file_path = score
#     with open(file_path, "r") as file:
#         lines = file.readlines()
#         seqs = lines[0].split("K")
#         words = " ".join(seqs).split(" ")
#         string_id = {word: i for i, word in enumerate(set(words))}

#         seqs = [s.split(" ") for s in seqs]
#         ##convert words in seqs to numbers
#         seqs = [[string_id[w] for w in s] for s in seqs]
#         # lines = file.readlines()
#         # print(len(lines))
#         # string_to_id = {string: i for i, string in enumerate(set(lines[0]))}
#         # raw_input = lines[0].split("K")
#         # print(len(raw_input))
#         # raw_string_pre_pad = [s.split(" ") for s in raw_input ]
#         #
#         # max_seq = max(len(s) for s in seqs)
#         # hmm_input = [np.pad(s, (0, max_seq - len(s)), constant_values=9999) for s in seqs]
#         hmm_input = seqs
#     return hmm_input