<a href="https://colab.research.google.com/github/calipark1213/3340FinalProject/blob/main/Coin_flip_paper_codes.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
#@title Import libraries

import numpy as np
import random
import csv
import pandas as pd
import statistics as st

from keras.models import Sequential, Model
from keras.layers import Dense, Input, LeakyReLU, BatchNormalization
import numpy as np
from keras.optimizers import Adam
from sklearn.model_selection import train_test_split

from sklearn.svm import SVC, OneClassSVM
from sklearn import metrics
from sklearn.metrics import precision_score, recall_score, classification_report, roc_auc_score, average_precision_score
from sklearn.decomposition import PCA

# set seed
random.seed(10)
np.random.seed(10)


In [None]:
#@title Sequence truncation
sequences = []
with open('fake.txt', 'r') as file: # fake.txt is a txt file of all the coin flip sequences that were handwritten
  for line in file:
    if line[0] == '0' or line[0] == '1':
      line = line.strip()
      newline = [int(line[i]) for i in range(200)]
      sequences.append(newline)

print(len(sequences))

data = pd.DataFrame(np.array(sequences))

data.to_csv('fake_sequences_handwritten.csv', float_format='{:f}'.format, encoding='utf-8', index = False)

# Tricky algorithm (from Mike's paper)

In [None]:
class Bernoulli:
    @staticmethod
    def pmf(x, p):
        f = p ** x * (1 - p) ** (1 - x)
        return f

    @staticmethod
    def mean(p):
        return p

    @staticmethod
    def var(p):
        return p * (1 - p)

    @staticmethod
    def std(p):
        return Bernoulli.var(p) ** (1 / 2)

    @staticmethod
    def rvs(p, size):
        rvs = np.random.choice([-1, 1], size=size, p=[1 - p, p])
        return rvs


def compute_prob(p, k, sequence, probabilities, i):
    if k == 0:
        return p

    m_values = sequence[i - k:i - 1]
    beta_values = [probabilities[j] for j in range(len(m_values))]
    sum_val = np.sum([m * beta for m, beta in zip(m_values, beta_values)])
    denom = 2 ** (k - 1) * Bernoulli.pmf(1, probabilities[i - k])
    prob = p + sum_val / denom

    # Ensure the conditional probability is within [0, 1]
    prob = max(0, min(1, prob))
    return prob


def generate_sequence(N, k):
    valid_params = False
    max_attempts = 1000
    attempt_count = 0
    sequence = []
    probabilities = []
    while not valid_params and attempt_count < max_attempts:
        sequence = []
        probabilities = []

        for _ in range(N):
            p = random.uniform(0, 1)
            probabilities.append(p)
            if len(sequence) == 0:
                sequence.append(1 if random.uniform(0, 1) <= p else -1)
            else:
                probability = compute_prob(p, min(k, len(sequence)), sequence,
                                           probabilities, len(sequence))
                while not 0 <= probability <= 1:
                    p = random.uniform(0, 1)
                    probabilities[-1] = p
                    probability = compute_prob(p, min(k, len(sequence)),
                                               sequence, probabilities,
                                               len(sequence))

                    # Regenerate beta values
                    beta_values = []
                    for j in range(1, min(k, len(sequence)) + 1):
                        beta = random.uniform(-1, 1)
                        beta_values.append(beta)
                    probabilities[-k:] = beta_values

                sequence.append(1 if random.uniform(0, 1) <= probability else -1)

        sequence = [0 if val == -1 else val for val in sequence]

        p_bar, beta_bar = estimate_params(sequence, probabilities, Nc)
        error = calculate_error(probabilities[-1], beta_bar, p_bar, beta_bar, Nc)
        if error <= 0.5:  # Adjusted threshold for easier generation
            valid_params = True

        attempt_count += 1

    if not valid_params:
        print(f"Warning: Maximum attempts "
              f"({max_attempts}) reached without finding valid parameters.")
        sequence = []
        probabilities = []

    return sequence, probabilities


def estimate_params(sequence, probabilities, Nc):
    p_bar = np.mean(sequence)
    beta_bar = []

    for j in range(1, Nc + 1):
        beta = np.mean([sequence[i] * sequence[i - j] for i in range(j, len(sequence))])
        beta_bar.append(beta)

    return p_bar, beta_bar


def calculate_error(p, beta, p_bar, beta_bar, Nc):
    error = (p - p_bar) ** 2
    for j in range(Nc):
        error += (beta[j] - beta_bar[j]) ** 2
    error /= (Nc + 1)
    return error

# Parameters
Nf = 200  # Number of flips
Nc = 199  # Number of pair-wise covariances

# Generate multiple sequences
num_sequences = 137
all_sequences = []
all_probabilities = []

for _ in range(num_sequences):
    sequence, probabilities = generate_sequence(Nf, Nc)
    all_sequences.append(sequence)
    all_probabilities.append(probabilities)

# Print generated sequences
print("Generated Sequences:")
for i, sequence in enumerate(all_sequences):
    print(sequence)

# Exporting sequences
csv_filename = "fake_sequences_tricky.csv"

# Writing the nested list to the CSV file
with open(csv_filename, mode='w', newline='') as csv_file:
    csv_writer = csv.writer(csv_file)
    for row in all_sequences:
        csv_writer.writerow(row)

print(f"Nested list has been written to '{csv_filename}'")


# GAN

In [None]:
# Length of the binary sequences
seq_length = 200
latent_dim = 100  # Dimension of the generator's input noise vector


# Generator model
def build_generator():
    model = Sequential()
    model.add(Dense(512, input_dim=latent_dim))
    model.add(LeakyReLU(0.2))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Dense(1024))
    model.add(LeakyReLU(0.2))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Dense(seq_length, activation='sigmoid'))
    model.compile(loss='binary_crossentropy', optimizer='adam')
    return model


# Discriminator model
def build_discriminator():
    model = Sequential()
    model.add(Dense(1024, input_dim=seq_length))
    model.add(LeakyReLU(0.2))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Dense(512))
    model.add(LeakyReLU(0.2))
    model.add(BatchNormalization(momentum=0.8))
    model.add(Dense(1, activation='sigmoid'))
    model.compile(loss='binary_crossentropy', optimizer='adam',
                  metrics=['accuracy'])
    return model


# GAN model combining generator and discriminator
def build_gan(generator, discriminator):
    discriminator.trainable = False
    gan_input = Input(shape=(latent_dim,))
    x = generator(gan_input)
    gan_output = discriminator(x)
    gan = Model(gan_input, gan_output)
    gan.compile(loss='binary_crossentropy', optimizer='adam')
    return gan


# Generate sequences using the trained generator
def generate_sequences(generator, num_sequences=64):
    noise = np.random.randn(num_sequences, latent_dim)
    generated_sequences = generator.predict(noise)
    return (generated_sequences > 0.5).astype(int)


# Build and compile the models
generator = build_generator()
discriminator = build_discriminator()
gan = build_gan(generator, discriminator)


# Training loop
def train_gan(epochs=2000, batch_size=64):
    for epoch in range(epochs):
        # Generate real and fake samples
        real_sequences = np.random.randint(0, 2, size=(batch_size, seq_length))
        noise = np.random.randn(batch_size, latent_dim) #latent vector fron N(0,1)
        generated_sequences = generator.predict(noise)

        # Label real and fake samples
        y_real = np.ones((batch_size, 1))
        y_fake = np.zeros((batch_size, 1))

        # Train discriminator
        d_loss_real = discriminator.train_on_batch(real_sequences, y_real)
        d_loss_fake = discriminator.train_on_batch(generated_sequences, y_fake)
        d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

        # Generate noise for training generator
        noise = np.random.randn(batch_size, latent_dim)
        y_gen = np.ones((batch_size, 1))

        # Train generator
        g_loss = gan.train_on_batch(noise, y_gen)

        # Print progress
        if epoch % 100 == 0:
            print(f"Epoch {epoch} | D loss: {d_loss[0]} | D accuracy: "
                  f"{100 * d_loss[1]} | G loss: {g_loss}")


# Train the GAN
train_gan(epochs=2000, batch_size=64)

# Generate some sequences
generated_sequences = generate_sequences(generator, num_sequences=10)
print(generated_sequences)


generated_sequences = generate_sequences(generator, num_sequences=137).tolist()
for i in generated_sequences:
    print(len(i), i)

for i in range(1, len(generated_sequences)):
    print(generated_sequences[i] == generated_sequences[i-1])

file_name = 'fake_sequences_gan.csv'

# Writing to CSV
with open(file_name, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(generated_sequences)


# Loading sequences

# Specify the CSV file path
csv_file_path = 'fake_sequences_handwritten.csv'

# Initialize an empty list to store the data
fake_sequences_handwritten = []

# Read the CSV file and populate the fake_sequences_handwritten
with open(csv_file_path, 'r') as file:
    csv_reader = csv.reader(file)
    for row in csv_reader:
        for item in range(len(row)):
            row[item] = int(row[item])
        fake_sequences_handwritten.append(row)

# Print the resulting list
fake_sequences_handwritten.pop(0)
lst = np.array(fake_sequences_handwritten).tolist()
fake_sequences_handwritten = lst
print(len(fake_sequences_handwritten))


# Specify the CSV file path
csv_file_path = 'fake_sequences_tricky.csv'

# Initialize an empty list to store the data
fake_sequences_tricky = []

# Read the CSV file and populate the fake_sequences_handwritten
with open(csv_file_path, 'r') as file:
    csv_reader = csv.reader(file)
    for row in csv_reader:
        for item in range(len(row)):
            row[item] = int(row[item])
        fake_sequences_tricky.append(row)

# Print the resulting list
fake_sequences_tricky = np.array(fake_sequences_tricky).tolist()
print(len(fake_sequences_tricky))


# Specify the CSV file path
csv_file_path = 'reals.csv'h

# Initialize an empty list to store the data
reals = [[random.randint(0, 1) for i in range(200)] for j in
         range(4 * len(fake_sequences_handwritten))]


# Specify the CSV file path
csv_file_path = 'fake_sequences_dmom.csv'

# Initialize an empty list to store the data
fake_sequences_dmom = []

# Read the CSV file and populate the fake_sequences_dmom
with open(csv_file_path, 'r') as file:
    csv_reader = csv.reader(file)
    for row in csv_reader:
        for item in range(len(row)):
            row[item] = int(row[item])
        fake_sequences_dmom.append(row)

fake_sequences_dmom = np.array(fake_sequences_dmom).tolist()
print(len(fake_sequences_dmom))


# Take 80% of each type of sequence for training and leaving rest for testing
train_ratio = 0.8

reals_training, reals_testing = \
    train_test_split(reals, train_size=train_ratio, random_state=42)
print('No. reals training: ' + str(len(reals_training)))
print('No. reals testing: ' + str(len(reals_testing)) + '\n')

hand_fakes_training, hand_fakes_testing =\
    train_test_split(fake_sequences_handwritten,
                     train_size=train_ratio, random_state=43)
print('No. hand fakes training: ' + str(len(hand_fakes_training)))
print('No. hand fakes testing: ' + str(len(hand_fakes_testing)) + '\n')

tricky_fakes_training, tricky_fakes_testing = \
    train_test_split(fake_sequences_tricky,
                     train_size=train_ratio, random_state=44)
print('No. tricky fakes training: ' + str(len(tricky_fakes_training)))
print('No. tricky fakes testing: ' + str(len(tricky_fakes_testing)) + '\n')

dmom_fakes_training, dmom_fakes_testing = \
    train_test_split(fake_sequences_dmom,
                     train_size=train_ratio, random_state=45)
print('No. dmom fakes training: ' + str(len(dmom_fakes_training)))
print('No. dmom fakes testing: ' + str(len(dmom_fakes_testing)) + '\n')

gan_fakes_training, gan_fakes_testing =\
    train_test_split(generated_sequences,
                     train_size=train_ratio, random_state=46)
print('No. GAN fakes training: ' + str(len(gan_fakes_training)))
print('No. GAN fakes testing: ' + str(len(gan_fakes_testing)) + '\n')


# Training

outside_sequences = np.array(hand_fakes_training + tricky_fakes_training +
                             dmom_fakes_training + gan_fakes_training)
reals = np.array(reals_training)

# Combine real and outside sequences
combined_sequences = np.vstack((reals, outside_sequences))
labels = np.vstack((np.ones((reals.shape[0], 1)),
                    np.zeros((outside_sequences.shape[0], 1))))

# Shuffle the data
indices = np.arange(combined_sequences.shape[0])
np.random.shuffle(indices)
combined_sequences = combined_sequences[indices]
labels = labels[indices]

# Train the discriminator
discriminator.fit(combined_sequences, labels, epochs=1000, batch_size=64)


# Testing

test_sequences = reals_testing + hand_fakes_testing + tricky_fakes_testing +\
                 dmom_fakes_testing + gan_fakes_testing

test_labels = [1] * len(reals_testing) + [0] * len(hand_fakes_testing) + [0] *\
              len(tricky_fakes_testing) + [0] * len(dmom_fakes_testing) + [0] *\
              len(gan_fakes_testing)

# Loop through sequences and print results
# Loop through sequences and print results
for i in range(len(test_sequences)):
    sequence = np.array(test_sequences[i])  # Convert to numpy array
    true_label = test_labels[i]

    # Reshape the sequence if necessary (depending on your model's input shape)
    sequence = np.reshape(sequence, (1, -1))

    # Predict the label for the sequence
    prediction = discriminator.predict(sequence)

    # Convert the prediction to 0 or 1 based on a threshold (e.g., 0.5)
    predicted_label = 1 if prediction > 0.5 else 0

    # Print the results
    print(f"Sequence {i+1}: True Label={true_label},"
          f" Predicted Label={predicted_label}")


# Evaluate the discriminator for overall accuracy
test_loss, test_accuracy = discriminator.evaluate(test_sequences, test_labels)

print(f"\nOverall Test Loss: {test_loss}")
print(f"Overall Test Accuracy: {test_accuracy}")


# DMOM Testing
test_sequences = dmom_fakes_testing

test_labels = [0] * (len(dmom_fakes_testing))

# Loop through sequences and print results
for i in range(len(test_sequences)):
    sequence = test_sequences[i]
    true_label = test_labels[i]

    # Reshape the sequence if necessary (depending on your model's input shape)
    sequence = np.reshape(sequence, (1, -1))

    # Predict the label for the sequence
    prediction = discriminator.predict(sequence)

    # Convert the prediction to 0 or 1 based on a threshold (e.g., 0.5)
    predicted_label = 1 if prediction > 0.5 else 0

    # Print the results
    print(f"Sequence {i+1}: True Label={true_label},"
          f" Predicted Label={predicted_label}")

# Evaluate the discriminator for overall accuracy
test_loss, test_accuracy = discriminator.evaluate(test_sequences, test_labels)

print(f"\nOverall Test Loss: {test_loss}")
print(f"Overall Test Accuracy: {test_accuracy}")


# DMOM

In [None]:
s = 128
print(s)
b = 2
print(b)
N = 200
print(N)

# Canonical model
# test for s = 2
p = []
for i in range(2):
    p_r = np.random.uniform(0, 1, 2)
    p_r /= p_r.sum()
    p.append(p_r)
p = np.array(p)
q0_real0 = [[0.5, 0.5], [0.5, 0.5]]
q1_real0 = [[0.5, 0.5], [0.5, 0.5]]
q = np.array([q0_real0, q1_real0])
mu = np.array([[0.25, 0.25], [0.25, 0.25]])

num_states = b * 2
states = []  # List of the states in the joint MC
for i in range(2):
    for j in range(b):
        states.append((i, j))

states_dict = {}  # Assigns a temp value to each state in the joint MC
for i in range(len(states)):
    states_dict[i] = states[i]

# Creating transition matrix
lst = []
for state in states_dict:
    row = []
    for state2 in states_dict:
        initial = states_dict[state]
        final = states_dict[state2]
        p_val = p[initial[0], final[0]]
        q_val = q[final[0]][initial[1], final[1]]
        row.append(p_val * q_val)
    lst.append(row)

joint_transition_mat = np.array(lst)
print('Joint transition matrix: \n', joint_transition_mat)


# Initialize the sequence by randomly choosing the initial state based off mu
seq = []
lst = [x for x in range(b * 2)]
weights = []
for i in lst:
    weights.append(mu[states_dict[i][0], states_dict[i][1]])
seq.append(np.random.choice(lst, 1, p=weights)[0])

# Randomly choosing the next state based off the probabilities in the joint
# transition matrix
while len(seq) != 201:
    initial = seq[-1]
    seq.append(np.random.choice(lst, 1,  p=[item for item in
                                            joint_transition_mat[initial]])[0])

# Removing the hidden state and keeping only the observations
final_seq = []
for item in seq:
    final_seq.append(states_dict[item][1])
final_seq.pop(0)

print(final_seq, len(final_seq))
one_count = 0
print(final_seq.count(0), final_seq.count(1))


def read_csv_as_array(name):
    data_frame = pd.read_csv(name, header=None)
    data_array = data_frame.values
    return data_array


hand_fakes = read_csv_as_array('fake_sequences_handwritten.csv').tolist()
hand_fakes.pop(0)
print('No. hand fakes: ' + str(len(hand_fakes)))

gan_fakes = read_csv_as_array('fake_sequences_gan.csv').tolist()
print('No. gan fakes: ' + str(len(gan_fakes)))

tricky_fakes = read_csv_as_array('fake_sequences_tricky.csv').tolist()
print('No. tricky fakes: ' + str(len(tricky_fakes)))

random_arrays = [final_seq]
reals = [[random.randint(0, 1) for i in range(200)] for j in
         range(4 * (len(hand_fakes)) - 1)]
random_arrays += reals
print('No. reals: ' + str(len(random_arrays)) + '\n')

# Test-train split
train_ratio = 0.8

reals_training, reals_testing = \
    train_test_split(reals, train_size=train_ratio, random_state=42)
print('No. reals training: ' + str(len(reals_training)))
print('No. reals testing: ' + str(len(reals_testing)) + '\n')

hand_fakes_training, hand_fakes_testing = \
    train_test_split(hand_fakes,
                     train_size=train_ratio, random_state=43)
print('No. hand fakes training: ' + str(len(hand_fakes_training)))
print('No. hand fakes testing: ' + str(len(hand_fakes_testing)) + '\n')

tricky_fakes_training, tricky_fakes_testing = \
    train_test_split(tricky_fakes,
                     train_size=train_ratio, random_state=44)
print('No. tricky fakes training: ' + str(len(tricky_fakes_training)))
print('No. tricky fakes testing: ' + str(len(tricky_fakes_testing)) + '\n')

gan_fakes_training, gan_fakes_testing = \
    train_test_split(gan_fakes,
                     train_size=train_ratio, random_state=46)
print('No. GAN fakes training: ' + str(len(gan_fakes_training)))
print('No. GAN fakes testing: ' + str(len(gan_fakes_testing)) + '\n')


# Real sequences
def ForwardpiZero(mu_matrix):
    pi_zero = mu_matrix
    return pi_zero


def Forwardpi(p, q, mu, N, Y):
    pi = np.zeros((s,N)).reshape(s,N) # Initialization
    rho = np.zeros((s, N))
    a = np.zeros(N)
    for x in range(s):
        rho[x, 0] = np.sum([np.sum(mu[x_0, :]*p[x_0, x]*q[x, :, Y[0]]) for x_0
                            in range(s)])
    a[0] = np.sum(rho[:, 0])
    pi[:, 0] = rho[:, 0]/a[0]

    for n in range(1, N):
        rho[:, n] =\
            q[:, Y[n-1],
                Y[n]]*(np.sum(pi[:, n - 1].reshape(s, 1)*p[:, :], axis=0))
        a[n] = np.sum(rho[:, n])
        pi[:, n] = rho[:, n]/a[n]
    return pi, a, rho


def Backwardchi(p, q, a, N, Y):
    chi = np.zeros((s, N-1)).reshape(s, N-1)
    chi[:, N - 2] = q[:, Y[N-2], Y[N-1]]

    for n in range(N-3, -1, -1):
        chi[:, n] = ((q[:, Y[n],Y[n+1]].reshape(s,1)/a[n+1]) *
                     (np.sum((chi[:, n+1].reshape(1, s) *
                              p[:, :]), axis=1).reshape(s, 1))).reshape(s, )

    return chi


def BackwardchiZero(chi, p, q, a, Y):
    chi_zero = np.zeros((s, b)).reshape(s,b)
    for y in range(b):
        chi_zero[:, y] = ((q[:, y, Y[0]].reshape(s, 1)/a[0]) *
                          (np.sum((chi[:, 0].reshape(1, s)*p[:, :]),
                                  axis=1).reshape(s, 1))).reshape(s, )
    return chi_zero


def MQ(p, pi_zero, pi, chi_zero, chi, N, Y):
    q_n = np.zeros((s, b, b))
    for y in range(b):
        q_n[:, y, Y[0]] = (chi_zero[:, y].reshape(s, 1) *
                           (np.sum(p[:, :] * pi_zero[:, y].reshape(s, 1),
                                   axis=0).reshape(s, 1))).reshape(s, )
    for n in range(N - 1):
        q_n[:, Y[n], Y[n + 1]] = (q_n[:, Y[n], Y[n + 1]].reshape(s, 1) + chi[:, n].reshape(s, 1) * (np.sum(p[:, :] * pi[:, n].reshape(s, 1), axis = 0).reshape(s, 1))).reshape(s, )
    r_sum = q_n.sum(axis=2)
    for r in range(b):
        for x in range(s):
            if r_sum[x, r] != 0:
                q_n[x, r, :] = q_n[x, r, :] / r_sum[x, r]

    return q_n


def MMu(p, mu, chi_zero):
    m_new = np.zeros((s, b))
    for y in range(b):
        m_new[:, y] = (mu[:, y].reshape(s, 1) *
                       (p @ chi_zero[:, y].reshape(s, 1))).reshape(s, )
        total = sum(list(m_new.sum(axis=1)))
    for i in range(s):
        for j in range(b):
            if total != 0:
                m_new[i, j] = m_new[i, j] / total

    return m_new


def MP(p, pi_zero, pi, chi_zero, chi):
    old = p
    new = old * (pi_zero @ chi_zero.T + pi[:, :-1] @ chi.T)
    r_sum = new.sum(axis=1)
    new = new / r_sum[:, np.newaxis]

    return new


def InitialP(k):
    p_v = []
    for i in range(k):
        r = np.random.uniform(0, 1, k)
        r /= r.sum()
        p_v.append(r)
    p = np.array(p_v)
    return p


def InitialQ(N, Y, s: int):
    q = np.zeros((s, b, b))
    for x in range(s):
        for n in range(N-1):
            q[x, Y[n],Y[n+1]] += 1
        r_sum = q.sum(axis=2)
        for r in range(b):
            for c in range(b):
                if r_sum[x, r] > 0 and q[x, r, c] > 0:
                    q[x, r, c] = q[x, r, c] * np.random.uniform(0, 1, 1)
        row_sum_new = q.sum(axis=2)
        for i in range(b):
            for j in range(b):
                if row_sum_new[x, i] != 0:
                    q[x, i, j] = q[x, i, j]/row_sum_new[x, i]
    return q


def Initialmu(q, Y):
    mu = np.random.rand(s, b)
    for x in range(s):
        for r in range(b):
            if q[x, r, Y[0]] > 0:
                continue
            else:
                mu[x, r] = 0

    sum = mu.sum()
    mu = mu/sum
    return mu


def Q_ref(Y, N):
    q_bar = np.zeros((b, b))
    for i in range(1, N):
        q_bar[Y[i-1], Y[i]] += 1
    r_sum = list(q_bar.sum(axis=1))
    for r in range(b):
        if r_sum[r] != 0:
            q_bar[r] = q_bar[r] / r_sum[r]
    return q_bar


def Predictorlist(p, q, mu, q_bar, Y, N):
    predictor_dist = np.zeros((N, s))
    temp = np.zeros(s)
    for x in range(s):
        temp_ = np.array([(q[x, z, Y[0]]/q_bar[z, Y[0]] if q_bar[z, Y[0]] != 0
                           else 0) for z in range(b)])
        temp[x] = np.sum([np.sum(mu[x_0, :]*p[x_0, x]*temp_)
                          for x_0 in range(s)])

    for i in range(N):
        for x in range(s):
            if i == 0:
                predictor_dist[0, x] = temp @ p[:, x]
            else:
                temp_ = q[:, Y[i - 1], Y[i]]/q_bar[Y[i - 1], Y[i]] if q_bar[Y[i - 1], Y[i]] != 0 else 0
                predictor_dist[i, x] = np.sum(temp_ * p[:, x] * predictor_dist[i - 1, :])

    return predictor_dist


def BayesFactor(predictor_rho, q, N, Y):
    B = np.zeros(N)
    for i in range(N):
        B[i] = np.sum(predictor_rho[i, :])
    B[-1] = np.sum([((predictor_rho[N-2, x] * q[x, Y[-2], Y[-1]]/q_bar[Y[-2], Y[-1]]) if q_bar[Y[-2], Y[-1]] != 0 else 0) for x in range(s)])
    return B


def Stop(p_new, mu_new, q_new, p, mu, q, k):
    p_new = p_new
    p = p
    difp = np.subtract(p_new, p)
    sdifp = np.nansum(np.abs(difp))
    q = q
    q_new = q_new
    mu_new = mu_new
    mu = mu
    difmu = np.subtract(mu_new, mu)
    sdifmu = np.nansum(np.abs(difmu))

    np.set_printoptions(suppress=True)

    if sdifp < 5e-3 and sdifmu < 5e-3:
        return 'stop'

    else:
        return 'continue'


def RunModel(p, q, mu, N, Y, stoplimit = 1000):
    K = stoplimit
    for k in range(K):
        pi_zero = ForwardpiZero(mu)
        pi, a, rho = Forwardpi(p,q,mu,N,Y)
        chi = Backwardchi(p,q, a,N,Y)
        chi_zero = BackwardchiZero(chi, p, q, a, Y)
        q_new = MQ(p, pi_zero, pi, chi_zero, chi, N, Y)
        mu_new = MMu(p, mu, chi_zero)
        p_new = MP(p, pi_zero, pi, chi_zero, chi)
        stopcondition = Stop(p_new,mu_new,q_new, p, mu, q, k)

        if stopcondition == "stop":
            # print("Stopping criteria reached at k = ", k)
            break

        else:
            p = p_new
            mu = mu_new
            q = q_new

        return p, q, mu


pqmus_real = {}
generated_sequences = []

for k in range(len(reals_training)):

    Y = random_arrays[k]

    q_bar = Q_ref(Y, N)

    num_particles = 10
    model_initializations = {}
    model_results = {}
    comparison_dict = {}
    for i in range(1, num_particles + 1):
        model_name = f"model{i}"

        model_initializations[model_name] = {}
        model_initializations[model_name]['P'] = InitialP(s)
        model_initializations[model_name]['Q'] = InitialQ(N, Y, s)
        model_initializations[model_name]['mu'] = Initialmu(model_initializations[model_name]['Q'], Y)

        FinalP, FinalQ, Finalmu = RunModel(model_initializations[model_name]['P'], model_initializations[model_name]['Q'], model_initializations[model_name]['mu'], N, Y)
        Predictor = Predictorlist(FinalP, FinalQ, Finalmu, q_bar, Y, N)
        comparison_dict[model_name] = np.log(BayesFactor(Predictor, FinalQ, N, Y)[-1]) # Take log of the Bayes Factor, since it expands exponentially.

        model_results[model_name] = {}
        model_results[model_name]['P'] = FinalP
        model_results[model_name]['Q'] = FinalQ
        model_results[model_name]['mu'] = Finalmu

    pqmus_real[k] = {'p':FinalP, 'q':FinalQ, 'mu':Finalmu}

    if k < len(hand_fakes):
        num_states = b * s
        states = [] # List of the states in the joint MC
        for i in range(s):
            for j in range(b):
                states.append((i, j))

        states_dict = {} # Assigns a temp value to each state in the joint MC
        for i in range(len(states)):
            states_dict[i] = states[i]

        # Creating transition matrix
        lst = []
        for state in states_dict:
            row = []
            for state2 in states_dict:
                initial = states_dict[state]
                final = states_dict[state2]
                p_val = FinalP[initial[0], final[0]]
                q_val = FinalQ[final[0]][initial[1], final[1]]
                row.append(p_val * q_val)
            lst.append(row)

        joint_transition_mat = np.array(lst)
        # print('Joint transition matrix: \n', joint_transition_mat)

        # Initialize the sequence by randomly choosing the initial state
        # based off mu
        seq = []
        lst = [x for x in range(b * s)]
        weights = []
        for i in lst:
            weights.append(Finalmu[states_dict[i][0], states_dict[i][1]])
        seq.append(np.random.choice(lst, 1, p = weights)[0])

        # Randomly choosing the next state based off the probabilities in the
        # joint transition matrix
        while len(seq) != 201:
            initial = seq[-1]
            seq.append(np.random.choice(lst, 1, p = [item for item in joint_transition_mat[initial]])[0])

        # Removing the hidden state and keeping only the observations
        final_seq = []
        for item in seq:
            final_seq.append(states_dict[item][1])
        final_seq.pop(0)

        generated_sequences.append(final_seq)

    print(f'Num reals left: {len(reals_training) - k}')


# Dimension increase
s = s * 2

# Bayes Factor Reals Dimension increase
pqmus_real_higher_dim = {}
for sequence in pqmus_real:
    p_old = pqmus_real[sequence]['p']
    q_old = pqmus_real[sequence]['q']
    mu_old = pqmus_real[sequence]['mu']

    # Increase dimension of p_old
    array = p_old.tolist()
    vals = []
    for i in array:
        for j in i:
            vals.append(j)
    for i in range(s):
        if i % 2 == 0:
            for j in range(s):
                if j % 2 == 1:
                    array[i].insert(j, 0)
        if i % 2 == 1:
            row = []
            for k in range(s):
                row.append(np.random.uniform(0, st.mean(vals)))
            row_sum = sum(row)
            row = np.divide(row, row_sum)
            array.insert(i, row)

    p_new = 0.5 * np.array(array) + (1 / (2 * s)) * np.ones_like(s)

    # Increase dimension of q_old
    q_new_matrices = {}
    for i in range(s):
        if i % 2 == 0:
            q_new_matrices[i] = q_old[int(i / 2)]

        elif i % 2 == 1:
            matrix = []
            for j in range(b):
                row = [np.random.uniform(0, st.mean(q_old[int((i-1) / 2)].tolist()[j])) for _ in range(b)]
                row = np.divide(row, sum(row))
                matrix.append(row)
            q_new_matrices[i] = np.array(matrix)
    array = []
    for i in range(s):
        array.append(q_new_matrices[i])
    q_new = np.array(array)

    # Showing that the sum of each row in each q in q_new is 1
    q_sums = []
    for i in range(s):
        q_sums.append([f'q_new{i}'] + [sum(row) for row in q_new_matrices[i].tolist()])

    # Increase dimension of mu_old
    array = mu_old.tolist()
    vals = []
    for i in array:
        for j in i:
            vals.append(j)
    for i in range(s):
        if i % 2 == 1:
            row = []
            for j in range(b):
                row.append(np.random.uniform(0, st.mean(vals)))
            array.insert(i, row)
    mu_new = np.array(array) * 1 / (np.array(array).sum())

    # Showing that the matrix sum of mu_new is 1

    pqmus_real_higher_dim[sequence] = {'p': p_new, 'q': q_new, 'mu': mu_new}


# Generated sequences
generated_training, generated_testing = \
    train_test_split(generated_sequences, train_size=train_ratio,
                     random_state=50)
print('No. dmom training: ' + str(len(generated_sequences)))
print('No. dmom testing: ' + str(len(generated_testing)) + '\n')


# Fake sequences
random_arrays_fakes = hand_fakes_training + tricky_fakes_training +\
                      gan_fakes_training + generated_training


# Fake sequence p, q, mu
pqmus_fake = {}

for k in range(len(random_arrays_fakes)):
    Y = random_arrays_fakes[k]

    print(Y, len(Y))
    q_bar = Q_ref(Y, N)

    num_particles = 10
    model_initializations = {}
    model_results = {}
    comparison_dict = {}
    for i in range(1, num_particles + 1):
        model_name = f"model{i}"

        model_initializations[model_name] = {}
        model_initializations[model_name]['P'] = InitialP(s)
        model_initializations[model_name]['Q'] = InitialQ(N, Y, s)
        model_initializations[model_name]['mu'] = Initialmu(model_initializations[model_name]['Q'], Y)

        FinalP, FinalQ, Finalmu = RunModel(model_initializations[model_name]['P'], model_initializations[model_name]['Q'], model_initializations[model_name]['mu'], N, Y)
        Predictor = Predictorlist(FinalP, FinalQ, Finalmu, q_bar, Y, N)
        comparison_dict[model_name] = np.log(BayesFactor(Predictor, FinalQ, N, Y)[-1]) # Take log of the Bayes Factor, since it expands exponentially.

        model_results[model_name] = {}
        model_results[model_name]['P'] = FinalP
        model_results[model_name]['Q'] = FinalQ
        model_results[model_name]['mu'] = Finalmu

    pqmus_fake[k] = {'p':FinalP, 'q':FinalQ, 'mu':Finalmu}

    print(f'Num fakes left: {len(random_arrays_fakes) - k}')


# Testing reals
highest_bayes = []
theoretical = [f'R' for i in range(len(reals_testing))]

count = 0
for j in range(len(reals_testing)):
    bayes_factors = {}
    sequence = random_arrays[j]
    q_bar = np.array([[.5, .5], [.5, .5]])
    for i in pqmus_real_higher_dim:
        p = pqmus_real_higher_dim[i]['p']
        q = pqmus_real_higher_dim[i]['q']
        mu = pqmus_real_higher_dim[i]['mu']
        lst1 = BayesFactor(Predictorlist(p, q, mu, q_bar, sequence, N), q, N, sequence)
        bayes_factors['Real' + str(i)] = lst1[-1]
    for i in pqmus_fake:
        p = pqmus_fake[i]['p']
        q = pqmus_fake[i]['q']
        mu = pqmus_fake[i]['mu']
        lst3 = BayesFactor(Predictorlist(p, q, mu, q_bar, sequence, N), q, N, sequence)
        bayes_factors['Fake' + str(i)] = lst3[-1]
    print(max(bayes_factors, key=bayes_factors.get)[:1])
    highest_bayes.append(max(bayes_factors, key=bayes_factors.get)[:1])

print(theoretical)
print(highest_bayes)
print(f"Correct: {highest_bayes.count('R') / len(reals_testing) * 100}%")


# Testing fakes
fakes_testing = hand_fakes_testing + gan_fakes_testing + tricky_fakes_testing + generated_testing

highest_bayes = []
theoretical = [f'F' for i in range(len(fakes_testing))]

count = 0
for j in range(len(fakes_testing)):
    bayes_factors = {}
    sequence = random_arrays[j]
    q_bar = np.array([[.5, .5], [.5, .5]])
    for i in pqmus_real_higher_dim:
        p = pqmus_real_higher_dim[i]['p']
        q = pqmus_real_higher_dim[i]['q']
        mu = pqmus_real_higher_dim[i]['mu']
        lst1 = BayesFactor(Predictorlist(p, q, mu, q_bar, sequence, N), q, N, sequence)
        bayes_factors['Real' + str(i)] = lst1[-1]
    for i in pqmus_fake:
        p = pqmus_fake[i]['p']
        q = pqmus_fake[i]['q']
        mu = pqmus_fake[i]['mu']
        lst3 = BayesFactor(Predictorlist(p, q, mu, q_bar, sequence, N), q, N, sequence)
        bayes_factors['Fake' + str(i)] = lst3[-1]
    print(max(bayes_factors, key=bayes_factors.get)[:1])
    highest_bayes.append(max(bayes_factors, key=bayes_factors.get)[:1])

print(theoretical)
print(highest_bayes)
print(f'All correctly identified? {theoretical == highest_bayes}')
print(f"Correct: {highest_bayes.count('F') / len(fakes_testing) * 100}%")

print('Done')


# SVM

In [3]:
num_sequences = 200
seq_length = 200

# Generate 10 random binary sequences of length 200 for reals and fakes (for now using different random functions)
fakes = []
reals = []
for i in range(num_sequences):
    fake = [random.choice([0, 1]) for j in range(seq_length)]
    fakes.append(fake)
    real = [random.randint(0, 1) for j in range(seq_length)]
    reals.append(real)

fake_labels = [0]*num_sequences
real_labels = [1]*num_sequences

combined_sequences = reals + fakes
combined_labels = real_labels + fake_labels


#for sequence, label in zip(combined_sequences, combined_labels):
#    print(f"{sequence}, Label: {label}")

In [8]:
from sklearn.metrics._plot.confusion_matrix import confusion_matrix
X_train, X_test, y_train, y_test = train_test_split(combined_sequences, combined_labels, test_size=0.2, random_state=1)
#random_state = 0 ensures same split every time
#test_size = 0.2 is 80:20 split

svm_model = SVC(kernel='linear')
svm_model.fit(X_train, y_train)

svm_predictions = svm_model.predict(X_test)

print(classification_report(y_test, svm_predictions))
print(confusion_matrix(y_test, svm_predictions))


              precision    recall  f1-score   support

           0       0.64      0.60      0.62        47
           1       0.47      0.52      0.49        33

    accuracy                           0.56        80
   macro avg       0.55      0.56      0.55        80
weighted avg       0.57      0.56      0.56        80

[[28 19]
 [16 17]]


In [20]:
one_class_svm_model = OneClassSVM(kernel='rbf', nu=0.1)  # You can adjust the hyperparameters as needed
one_class_svm_model.fit(X_train)

ocsvm_predictions = one_class_svm_model.predict(X_test)

#modify 0s to -1's in y_test
y_test_mod = np.where(y_test == 0, -1, y_test)

roc_auc = roc_auc_score(y_test_mod, ocsvm_predictions)
pr_auc = average_precision_score(y_test_mod, ocsvm_predictions)

print("One-Class SVM Evexaluation Metrics:")
print("AUC-ROC: {:.2f}".format(roc_auc))
print("AUC-PR: {:.2f}".format(pr_auc))


One-Class SVM Evexaluation Metrics:
AUC-ROC: 0.50
AUC-PR: 0.41
              precision    recall  f1-score   support

          -1       0.00      0.00      0.00       0.0
           0       0.00      0.00      0.00      47.0
           1       0.00      0.00      0.00      33.0

    accuracy                           0.00      80.0
   macro avg       0.00      0.00      0.00      80.0
weighted avg       0.00      0.00      0.00      80.0

[[ 0  0  0]
 [47  0  0]
 [33  0  0]]


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
