<a href="https://colab.research.google.com/github/Vince7778/real-vs-fake-words/blob/main/real_vs_fake_words.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [120]:
import re
import os
import random
import numpy as np
import json
import csv
import time

from google.colab import drive
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [121]:
# options

dict_path = '/content/drive/MyDrive/CS 229 Project/data/cmudict-0.7b'
popular_path = '/content/drive/MyDrive/CS 229 Project/data/words_by_popularity.txt'
symbols_path = '/content/drive/MyDrive/CS 229 Project/data/cmudict-0.7b.symbols'
model_save_path = '/content/drive/MyDrive/CS 229 Project/model-save-50000-noemph-pop-new-adam/'
pseudoword_path = '/content/drive/MyDrive/CS 229 Project/data/pseudowords.csv'
pseudoword_output = '/content/drive/MyDrive/CS 229 Project/pseudoword-output-new.json'
dict_output = '/content/drive/MyDrive/CS 229 Project/clean-dict-output-new.json'
popular_words_output = '/content/drive/MyDrive/CS 229 Project/popular-word-output-new.json'

N = 50000 # training sample count
n_epochs = 40
train_test_split = [0.8, 0.1, 0.1]

# whether saved model should be used
load_model_from_file = True

# whether emphasis should be ignored
ignore_emphasis = True

# use top n most common english words as training data
use_popularity = True

# which dictionaries to predict on (pseudoword, real, popular)
prediction_todos = []

do_evaluation = False

# made some changes (non backwards compatible)
fixed_algo = True

In [122]:
max_length = 15
min_length = 3

bad_chars = "[^A-Z]"
start_seq = "START"
end_seq = "END"

random.seed(42)

In [123]:
def clean_dictionary():

    def alternate_spelling(word):
        return word[-1] == ')' and word[-3] == '(' and word[-2].isdigit()

    def skip(word):
        if not word[0].isalpha() or word[-1] == '.' or re.search(bad_chars, word) or len(word) > max_length or len(word) < min_length:
            return True
        else:
          return False

    clean_dict = {}

    with open(dict_path, encoding="ISO-8859-1") as raw_dict:
        for line in raw_dict:

            # Skip commented lines
            if line[0:3] == ';;;':
                continue

            word, phonetic = line.strip().split('  ')

            if alternate_spelling(word):
                continue

            if skip(word):
                continue

            phonetic = start_seq + " " + phonetic + " " + end_seq
            if word not in clean_dict:
              phonemes = phonetic.split(' ')
              if ignore_emphasis:
                for i in range(len(phonemes)):
                  if phonemes[i][-1].isnumeric():
                    phonemes[i] = phonemes[i][:-1]
              clean_dict[word] = phonemes

    return clean_dict

# Words to Phonemes Model

In [124]:
clean_dict = clean_dictionary()
print(len(clean_dict))

# take n training samples
def get_training_data(N):
  sample_dict = dict()
  if use_popularity:
    with open(popular_path) as popular_file:
      for line in popular_file:
        word = line.split("\t")[0].upper()
        if word in clean_dict:
          sample_dict[word] = clean_dict[word]
          if len(sample_dict) >= N:
            break
    if len(sample_dict) < N:
      print("Only found " + str(len(sample_dict)) + " words")
  else:
    sample_dict = dict(random.sample(list(clean_dict.items()), N))
  return sample_dict

def split_data(words):
  words = list(words.items())
  random.shuffle(words)
  n = len(words)
  inds = [0] + [int(n * sum(train_test_split[:i+1])) for i in range(len(train_test_split))]
  x_out, y_out = [], []
  for i in range(len(train_test_split)):
    words_split = words[inds[i] : inds[i+1]]
    x_data = [v[0] for v in words_split]
    y_data = [v[1] for v in words_split]
    x_out.append(x_data)
    y_out.append(y_data)
  return x_out, y_out

all_data = get_training_data(N)
[input_words, valid_words, test_words], [input_phonetics, valid_phonetics, test_phonetics] = split_data(all_data)

# returns a list of the alphabet characters
def get_alphabet(input_list):
  alphabet = set()
  for v in input_list:
    for c in v:
      alphabet.add(c)
  return sorted(list(alphabet))

characters_alphabet = get_alphabet(input_words)
phonetics_alphabet = get_alphabet(input_phonetics)

num_characters = len(characters_alphabet)
num_phonetics = len(phonetics_alphabet)

word_max_len = max([len(word) for word in input_words])
phonetics_max_len = max([len(phon) for phon in input_phonetics])
if not fixed_algo:
  phonetics_max_len -= 1

characters_index = dict([(char, i) for i, char in enumerate(characters_alphabet)])
phonetics_index = dict([(phon, i) for i, phon in enumerate(phonetics_alphabet)])

116047


In [125]:
def char_to_1_hot(ch):
  vec = np.zeros((num_characters))
  vec[characters_index[ch]] = 1.
  return vec

def phone_to_1_hot(ph):
  vec = np.zeros((num_phonetics))
  vec[phonetics_index[ph]] = 1.
  return vec

encoder_input_tensor = np.zeros((len(input_words), word_max_len, num_characters), dtype='float32')
decoder_input_tensor = np.zeros((len(input_words), phonetics_max_len, num_phonetics), dtype='float32')
decoder_target_tensor = np.zeros((len(input_words), phonetics_max_len, num_phonetics), dtype='float32')

for i, (input_word, input_phonetic) in enumerate(zip(input_words, input_phonetics)):
  for t, char in enumerate(input_word):
    encoder_input_tensor[i, t, :] = char_to_1_hot(char)
  for t, phon in enumerate(input_phonetic):
    if fixed_algo:
      decoder_input_tensor[i, t, :] = phone_to_1_hot(phon)
    else:
      if t < len(input_phonetic) - 1:
        decoder_input_tensor[i, t, phonetics_index[phon]] = 1.
      if t > 0:
        decoder_target_tensor[i, t-1, phonetics_index[phon]] = 1.

if fixed_algo:
  decoder_target_tensor = np.pad(decoder_input_tensor, ((0, 0), (0, 1), (0, 0)), mode="constant")[:, 1:, :]

In [126]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense
from tensorflow.keras.callbacks import EarlyStopping

latent_dim = 256

encoder_inputs =Input(shape=(None, num_characters))
encoder = LSTM(latent_dim, return_state=True)
_, state_h, state_c = encoder(encoder_inputs)
encoder_states = [state_h, state_c]

decoder_inputs = Input(shape=(None, num_phonetics))
decoder_lstm = LSTM(latent_dim, return_sequences=True, return_state=True)
decoder_outputs = decoder_lstm(decoder_inputs, initial_state=encoder_states)[0]
decoder_dense = Dense(num_phonetics, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)

seq2seq = Model([encoder_inputs, decoder_inputs], decoder_outputs)

In [127]:
def save_model(models):
  if not os.path.exists(model_save_path):
    os.makedirs(model_save_path)
  for (i, model) in enumerate(models):
    save_path = model_save_path + "weights-" + str(i)
    model.save_weights(save_path)

def save_history(history):
  if not os.path.exists(model_save_path):
    os.makedirs(model_save_path)
  import json
  data = history.history
  json.dump(data, open(model_save_path + "history.json", "w"))

def load_model(models):
  for (i, model) in enumerate(models):
    save_path = model_save_path + "weights-" + str(i)
    model.load_weights(save_path)

In [128]:
seq_optimizer = 'adam' if fixed_algo else 'rmsprop'
seq2seq.compile(optimizer=seq_optimizer, loss='categorical_crossentropy')

encoder_model = Model(encoder_inputs, encoder_states)

decoder_state_input_h = Input(shape=(latent_dim,))
decoder_state_input_c = Input(shape=(latent_dim,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
decoder_outputs, state_h, state_c = decoder_lstm(
    decoder_inputs, initial_state=decoder_states_inputs)
decoder_states = [state_h, state_c]
decoder_outputs = decoder_dense(decoder_outputs)
decoder_model = Model(
    [decoder_inputs] + decoder_states_inputs,
    [decoder_outputs] + decoder_states)

stopper = None
if fixed_algo:
  stopper = EarlyStopping(monitor="val_loss", patience=3)

if not load_model_from_file:
  history = seq2seq.fit([encoder_input_tensor, decoder_input_tensor],
                        decoder_target_tensor,
                        batch_size=64,
                        epochs=n_epochs,
                        validation_split=0.2,
                        callbacks=[stopper] if fixed_algo else None)
  save_model([seq2seq, encoder_model, decoder_model])
  save_history(history)
else:
  load_model([seq2seq, encoder_model, decoder_model])



# Words to Phonemes Prediction

In [129]:
def decode_sequence(input_word):
    input_word = input_word.upper()
    input_seq = np.zeros((1, word_max_len, num_characters))
    for t, char in enumerate(input_word):
      input_seq[0, t, characters_index[char]] = 1.

    states_value = encoder_model(input_seq)

    target_seq = np.zeros((1, 1, num_phonetics))
    target_seq[0, 0, phonetics_index[start_seq]] = 1.

    stop_condition = False
    decoded_sentence = ''
    while not stop_condition:
        output_tokens, h, c = decoder_model([target_seq] + states_value)

        # Sample a token
        sampled_token_index = np.argmax(output_tokens[0, -1, :])
        sampled_char = phonetics_alphabet[sampled_token_index]

        if sampled_char != end_seq:
          decoded_sentence += sampled_char + " "

        # Exit condition: either hit max length
        # or find stop character.
        if (sampled_char == end_seq or
           len(decoded_sentence) > phonetics_max_len):
            stop_condition = True

        # Update the target sequence (of length 1).
        target_seq = np.zeros((1, 1, num_phonetics))
        target_seq[0, 0, sampled_token_index] = 1.

        # Update states
        states_value = [h, c]

    return decoded_sentence.strip()


In [130]:
# Accuracy evaluation

# Using BLEU score
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

# guessed should be a string of the phonemes
def get_bleu_score(y, guessed):
  smoothing = SmoothingFunction().method1
  res = sentence_bleu([y], guessed.split(), smoothing_function=smoothing)
  #print(y, guessed.split(), res)
  return res

def evaluate(x_test, y_test, decoder):
  bleu_scores = []
  print(f"Evaluating on {len(x_test)} examples")
  for i in range(len(x_test)):
    decoded = start_seq + " " + decode_sequence(x_test[i]) + " " + end_seq
    score = get_bleu_score(y_test[i], decoded)
    bleu_scores.append(score)
    if i % 500 == 499:
      print(f"Finished {i+1} evaluations")
  return np.mean(bleu_scores)

In [131]:
# evaluate model
if do_evaluation:
  score = evaluate(test_words, test_phonetics, decode_sequence)
  print(f"Average BLEU score: {score}")

In [132]:
import csv
import json
import time

def save_to_file(path, output_dict):
  json_output = json.dumps(output_dict)

  with open(path, "w") as output_file:
    output_file.write(json_output)

def load_pseudowords():
  output = []
  with open(pseudoword_path) as pseudoword_file:
    reader = csv.reader(pseudoword_file)
    output = [row[0].upper() for row in reader]
  return output

def load_popular_words():
  output = []
  with open(popular_path) as popular_file:
    for line in popular_file:
      word = line.split("\t")[0].upper()
      output.append(word.upper())
  return output[:30000]

def run_predictions(output_path, words, limit=None):
  random.shuffle(words)
  start_time = time.time()
  output_dict = {}
  word_count = 0
  for word in words:
    if len(word) > max_length or len(word) < min_length:
      continue
    if re.search(bad_chars, word):
      continue
    res = decode_sequence(word).split(' ')
    output_dict[word] = res
    word_count += 1
    if word_count % 500 == 0:
      elapsed_time = time.time() - start_time
      print(f"Done {word_count} in {elapsed_time} seconds")
      save_to_file(output_path, output_dict)
    if limit is not None and word_count >= limit:
      break

if "pseudoword" in prediction_todos:
  pseudowords = load_pseudowords()
  run_predictions(pseudoword_output, pseudowords)

if "real" in prediction_todos:
  words = list(clean_dict.keys())
  run_predictions(dict_output, words, 13000)

if "popular" in prediction_todos:
  popular_words = load_popular_words()
  run_predictions(popular_words_output, popular_words, 13000)


# Phonemes to Real/Fake Model

In [133]:
# options

pseudoword_input = '/content/drive/MyDrive/CS 229 Project/pseudoword-output-new.json'
dict_input = '/content/drive/MyDrive/CS 229 Project/clean-dict-output-new.json'
model_save_path = '/content/drive/MyDrive/CS 229 Project/models-rnn/model-save-2023-12-07-full/'

padding_amount = 17
pad_str = "PAD"
train_test_split = [0.8, 0.1, 0.1]

embedding_dim = 32
num_epochs = 10

load_model_from_file = False

In [134]:
def load_json(file):
  with open(file) as json_file:
    return json.load(json_file)

clean_dict = load_json(dict_input)
pseudoword_dict = load_json(pseudoword_input)

print(len(clean_dict.items()))

def pad_phonemes(p):
  while len(p) < padding_amount:
    p.append(pad_str)

def pad_dict(d):
  for (k, v) in d.items():
    pad_phonemes(v)

pad_dict(clean_dict)
pad_dict(pseudoword_dict)

13000


In [135]:
phoneme_alphabet = get_alphabet(list(clean_dict.values()) + list(pseudoword_dict.values()))
print(phoneme_alphabet)
phoneme_indices = dict([(p, i) for (i, p) in enumerate(phoneme_alphabet)])

['AA', 'AE', 'AH', 'AO', 'AW', 'AY', 'B', 'CH', 'D', 'EH', 'ER', 'EY', 'F', 'G', 'HH', 'IH', 'IY', 'JH', 'K', 'L', 'M', 'N', 'NG', 'OW', 'OY', 'P', 'PAD', 'R', 'S', 'SH', 'T', 'TH', 'UH', 'UW', 'V', 'W', 'Y', 'Z', 'ZH']


In [136]:
# create input data

# shuffle order of words
def get_combined_words():
  # appends a 0 to pseudowords and a 1 to real words
  app_pseudo = [x + "0" for x in pseudoword_dict.keys()]
  app_real = [x + "1" for x in clean_dict.keys()]
  random.shuffle(app_pseudo)
  random.shuffle(app_real)

  min_len = min([len(app_pseudo), len(app_real)])
  app_pseudo = app_pseudo[:min_len]
  app_real = app_real[:min_len]

  combined = app_pseudo + app_real
  random.shuffle(combined)
  return combined

combined_words = get_combined_words()
print(combined_words[:30])
print(len(combined_words))

def phonemes_to_indices(phonemes):
  return np.asarray([phoneme_indices[p] for p in phonemes])

def create_training_data(words):
  n = len(words)
  x = np.zeros((n, padding_amount))
  y = np.zeros((n, ))
  for (i, w) in enumerate(words):
    phonemes = clean_dict[w[:-1]] if w[-1] == "1" else pseudoword_dict[w[:-1]]
    x[i, :] = phonemes_to_indices(phonemes)
    y[i] = int(w[-1])
  return x, y

def split_data(words):
  n = len(words)
  inds = [0] + [int(n * sum(train_test_split[:i+1])) for i in range(len(train_test_split))]
  x_out, y_out = [], []
  for i in range(len(train_test_split)):
    words_split = words[inds[i] : inds[i+1]]
    x_data, y_data = create_training_data(words_split)
    x_out.append(x_data)
    y_out.append(y_data)
  return x_out, y_out

[x_train, x_valid, x_test], [y_train, y_valid, y_test] = split_data(combined_words)

['FLESENSITERVE0', 'KNIZE0', 'ADVEMONY0', 'DETAY0', 'TOWEAK0', 'DIVED0', 'SHOCIPE0', 'ACCUPPORY0', 'PINNOW1', 'BURNHAM1', 'GROWDER0', 'PINCERS1', 'DEGAETANO1', 'BUICLAMERSION0', 'CAMOVESS0', 'JOUCAN0', 'USEBT0', 'SOCOVERT0', 'COMBES1', 'ARGERSULD0', 'INTRO1', 'CITIZEN1', 'LAWSUE0', 'FEAPT0', 'EFFERITH0', 'RAPTUE0', 'FAMAN0', 'HOAGIE1', 'VIRUCT0', 'WEILE0']
26000


In [137]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense

model = Sequential()

model.add(Embedding(input_dim=len(phoneme_alphabet), output_dim=embedding_dim, input_length=padding_amount))
model.add(LSTM(64, return_sequences=True))
model.add(LSTM(64, return_sequences=False))
model.add(Dense(1, activation='sigmoid'))

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

model.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding_1 (Embedding)     (None, 17, 32)            1248      
                                                                 
 lstm_18 (LSTM)              (None, 17, 64)            24832     
                                                                 
 lstm_19 (LSTM)              (None, 64)                33024     
                                                                 
 dense_9 (Dense)             (None, 1)                 65        
                                                                 
Total params: 59169 (231.13 KB)
Trainable params: 59169 (231.13 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [138]:
if not load_model_from_file:
  history = model.fit(x_train, y_train, epochs=num_epochs, validation_data=(x_valid, y_valid))
  save_history(history)
  save_model([model])
else:
  load_model([model])

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [139]:
test_loss, test_acc = model.evaluate(x_test, y_test)
print(f"Test Accuracy: {test_acc}")

Test Accuracy: 0.629230797290802


# Combine models

In [None]:
# convert to phoneme then to real/fake prediction
def run_whole_pipeline(word):
  phonemes = decode_sequence(word).split(" ")
  pad_phonemes(phonemes)
  indices = np.zeros((1, padding_amount))
  indices[0, :] = np.asarray(phonemes_to_indices(phonemes))
  output = model(indices)
  return float(output[0])

In [None]:
run_whole_pipeline("fjkhdkfjhsdf")

0.35639292001724243

# Feeding text directly in
(instead of phonemes)

In [None]:
pad_char = "[" # Z + 1

def get_combined_words():
  # appends a 0 to pseudowords and a 1 to real words
  app_pseudo = [x.upper() + "0" for x in pseudoword_dict.keys() if len(x) >= min_length and len(x) <= max_length]
  app_real = [x.upper() + "1" for x in clean_dict.keys() if len(x) >= min_length and len(x) <= max_length]
  random.shuffle(app_pseudo)
  random.shuffle(app_real)

  min_len = min([len(app_pseudo), len(app_real)])
  app_pseudo = app_pseudo[:min_len]
  app_real = app_real[:min_len]

  combined = app_pseudo + app_real
  random.shuffle(combined)
  return combined

combined_words = get_combined_words()

def letters_to_indices(word):
  return [ord(c) - ord("A") for c in word]

def pad_word(word):
  while len(word) < max_length:
    word += pad_char
  return word

def create_training_data(words):
  n = len(words)
  x = np.zeros((n, max_length))
  y = np.zeros((n, ))
  for (i, w) in enumerate(words):
    padded_word = pad_word(w[:-1])
    x[i, :] = letters_to_indices(padded_word)
    y[i] = int(w[-1])
  return x, y

[x_train, x_valid, x_test], [y_train, y_valid, y_test] = split_data(combined_words)

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense

model = Sequential()

model.add(Embedding(input_dim=27, output_dim=embedding_dim, input_length=max_length))
model.add(LSTM(64, return_sequences=True))
model.add(LSTM(64, return_sequences=False))
model.add(Dense(1, activation='sigmoid'))

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

model.summary()

Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding_2 (Embedding)     (None, 15, 32)            864       
                                                                 
 lstm_10 (LSTM)              (None, 15, 64)            24832     
                                                                 
 lstm_11 (LSTM)              (None, 64)                33024     
                                                                 
 dense_5 (Dense)             (None, 1)                 65        
                                                                 
Total params: 58785 (229.63 KB)
Trainable params: 58785 (229.63 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [None]:
history = model.fit(x_train, y_train, epochs=num_epochs, validation_data=(x_valid, y_valid))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
test_loss, test_acc = model.evaluate(x_test, y_test)
print(f"Test Accuracy: {test_acc}")

Test Accuracy: 0.7538461685180664
