## Required installs/imports


In [1]:
!pip install pure-prng

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pure-prng
  Downloading pure_prng-2.9.0-py3-none-any.whl (32 kB)
Collecting gmpy2>=2.0.8
  Downloading gmpy2-2.1.5-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m76.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting randomgen>=1.20.3
  Downloading randomgen-1.23.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m95.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pure-nrng>=1.1.0
  Downloading pure_nrng-1.1.0-py3-none-any.whl (21 kB)
Installing collected packages: gmpy2, randomgen, pure-nrng, pure-prng
Successfully installed gmpy2-2.1.5 pure-nrng-1.1.0 pure-prng-2.9.0 randomgen-1.23.1


In [2]:
# Necessary imports
from pure_prng_package import pure_prng
import numpy as np
import tensorflow as tf
from keras.layers import *
from keras.models import Sequential
from sklearn.model_selection import train_test_split
from keras.optimizers import Adam
import matplotlib.pyplot as plt
import math
import time

## Classes to provide data generation/evaluation methods

In [10]:
# Class used to access my implmentations of PRNGs
class PRNGManagement():
  # Initialises object and sets the default seed
  def __init__(self, seed=0):
      self.seed = seed
      self.random_number:int = seed
      self.currentGenerator = self.zero_only_PRNG()

  # Method to seed the PRNG (seeds all PRNGs in class)
  def seed_PRNG(self, seed:int):
    self.seed:int = seed
    self.random_number:int = seed

  def set_generator(self, generatorMethod):
    self.currentGenerator = generatorMethod

  def bit_success(self, model, inputData, trueOutputs, sequence_length):
    """ 
    Method to evaluate the provided model and store the amount of
    successful predictions for each bit of the output
    :param model: keras model - Model used to generate predicitions
    :param trueOutputs: list[list[int]]- List containing the expected y outputs
    :param sequence_length: int - Length of generated binary string being predicited
    :return list[int] - Amount of successful predictions for each bit
    """
    # Set initial amount of successful predictions for each bit to zero
    successfulPredicts = [0]*sequence_length
    # Feeds the input data to the model and stores the predictions made
    predicted = (model.predict(inputData).round())
    # Iterate over all outputed data
    for testIndex in range(0, len(inputData)):
      # Iterate over each bit in output
      for i in range(sequence_length):
        # If the predicted bit matches the true bit value then increment the successful predicts for the current bit
        if predicted[testIndex][i] == trueOutputs[testIndex][i]: successfulPredicts[i] += 1
        # Prediction may be greater than 1 if the prediction is made with high certainity
        elif predicted[testIndex][i] > 1 and trueOutputs[testIndex][i] == 1: self.successfulPredicts[i] += 1

    return successfulPredicts
   

  def zero_only_PRNG(self, length=100):
    """ 
    Returns a binary string containing only 0 of specified length.
    Used to test for major flaws in models
    :param length: int - Length of generated binary string
    :return string - generated binary string
    """
    return "0" * length        


  def alternating_bits_PRNG(self, length=100):
    """ 
    Returns output of a basic PRNG implementation that alernates each bit (010101)
    :param length: int - Length of generated binary string
    :return string - generated binary string
    """
    # Use seed to determine the starting bit of the generated binary string
    self.seed = self.seed%2
    # Utilises efficent method to repeat a string pattern
    if (self.seed == 1):
        output = "10" * int(length/2)
    else:
        output = "01" * int(length/2)

    # Length of generated binary string is odd
    if (length%2 == 1):
      # Add final bit to string
      output += str(self.seed)
      # Set the new seed value
      if (self.seed == 0): self.seed = 1
      else: self.seed = 0
    
    return output


  def alternating_num_PRNG(self):
    """ 
    Returns output of a basic PRNG implementation that alernates between two binary strings
    :return string - generated binary string
    """
    # Use seed to determine the binary string to be returned
    self.seed = (self.seed+1)%2
    if (self.seed == 0):
      # Convert integer to a binary string 
      randomBinary = str(bin(1643712566))[2:]
      # Returns binary string after ensuring a minimum length of 32
      return (32-len(randomBinary))*"0" + randomBinary
    else:
      # Convert integer to a binary string 
      randomBinary = str(bin(2372817037))[2:]
      # Returns binary string after ensuring a minimum length of 32
      return (32-len(randomBinary))*"0" + randomBinary


  def basic_equation_based(self, mult:int, add:int, mod:int, leng:int) -> str:
    """ 
    Returns output of a very weak equation based PRNG implementation
    Expected to be predicted near perfectly
    :return string - generated binary string
    """
    # Generates random number using previous output as seed
    self.random_number = (mult * self.random_number + add) % mod
    # Converts generated number to a binary string
    bits_string = bin(self.random_number)[2:]
    # Returns binary string after using padding to ensure a length of 32
    return bits_string.zfill(leng)


  ## Different implmentations of equation based generators
  def basic_equation_based1(self) -> str:
    return self.basic_equation_based(20, 52, 2**32, 32)

  def basic_equation_based2(self) -> str:
    return self.basic_equation_based(36791, 83247, 2**32, 32)

  # Expects odd starting seed
  # Imeplementation of the PRNG 'RANDU' - Outdated PRNG
  def RANDU(self) -> str:
    return self.basic_equation_based(65539, 0, 2**31, 31)

  # Implementation of the Park Miller PRNG - Outdated PRNG
  def Park_Miller(self) -> str:
    return self.basic_equation_based(16807, 0, 2**31-1, 32)

  # Get stream of binary data form current generator
  def get_stream(self, number_of_blocks:int):
    stream = []
    for _ in range(number_of_blocks):
      # Converts generated binary string to list of ints
      block = [int(bit) for bit in self.currentGenerator()] 
      stream.extend(block)
    return stream

In [11]:
# Class used to manage the PRNGs offered by the PRNG library
class PRNGLibManagement():
  def __init__(self, PRNGType:str, outputLen:int):
    self.change_PRNG(PRNGType, outputLen)

  # Change the type of the current PRNG
  def change_PRNG(self, PRNGType:str, outputLen:int):
    self.outputLen = outputLen
    self.PRNGType = PRNGType
    self.currentGen = pure_prng(int(time.time()), prng_type=PRNGType).source_random_number()

  # Seed the current PRNG
  def seed_current(self, seed=int(time.time())):
    self.currentGen = pure_prng(int(seed), prng_type=self.PRNGType).source_random_number()

  # Generates an output from the curent PRNG
  def output_current(self):
    # Converts generated number to a binary string
    bits_string = bin(next(self.currentGen))[2:]
    # Returns binary string after using padding to ensure a consistent length
    return bits_string.zfill(self.outputLen)

  # Generates output from current PRNG as a list containing the integer bits
  def next_ints(self):
    # Converts generated binary string to list of ints
    return [int(bit) for bit in self.output_current()] 

  # Get stream of binary data form current generator
  def get_stream(self, number_of_blocks:int):
    stream = []
    for _ in range(number_of_blocks):
      block = self.next_ints()
      stream.extend(block)
    return stream

## Data generation function to be used during data production

In [18]:
# Creates object to use the 'Ran64' PRNG
PRNGLibHandler = PRNGLibManagement("Ran64", 64)
#
PRNGHandler = PRNGManagement(1)
PRNGHandler.set_generator(PRNGHandler.Park_Miller)

def generatorOutput(blockNum): return PRNGHandler.get_stream(blockNum)
def seedGenerator(seed): PRNGHandler.seed_PRNG(seed)

## Setting parameters for data generation

In [27]:
### Setting parameters
# Sets paramemters for generating train/test data
num_blocks = 15000
sequence_length = 32

# Number of samples must be a multiple of 100 to prevent an error
num_samples = (num_blocks-1)*sequence_length

print("Number of samples: ", num_samples)

Number of samples:  479968


In [28]:
# Seed generator
seedGenerator(23)
# Get stream data 
streamData = generatorOutput(num_blocks)

# Stores output in variable to allow the sample to be featued in both x and y data
X_data, Y_data = [], []

# Extracts stream data samples
for i in range(num_samples):
  X_data.append(streamData[i:i+sequence_length])
  Y_data.append([streamData[i+sequence_length]])

# Deletes stream data to save memory
del streamData

In [29]:
### Separate data into training/testing sets
# Percentage of data used for testing the created prediction model
testDataPerc = 0.2

x_train, x_test, y_train, y_test = train_test_split(X_data, Y_data, test_size = testDataPerc, random_state = 5)

del X_data
del Y_data

In [21]:
# Sets parameters for model
epochs = 10

# Strategy to utilise GPU 
strategy = tf.distribute.OneDeviceStrategy('/gpu:0')

# Model compilation using GPU
with strategy.scope():
  model = Sequential()
  model.add(Dense(sequence_length, input_shape=(sequence_length, ), activation='relu'))
  model.add(BatchNormalization())
  model.add(Dropout(rate=0.1))
  model.add(BatchNormalization())
  model.add(Dense(sequence_length//2, activation='relu'))
  model.add(BatchNormalization())
  model.add(Dense(1, activation='sigmoid'))

model.compile(loss='binary_crossentropy', optimizer="Adam", metrics=['accuracy'])

model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 32)                1056      
                                                                 
 batch_normalization (BatchN  (None, 32)               128       
 ormalization)                                                   
                                                                 
 dropout (Dropout)           (None, 32)                0         
                                                                 
 batch_normalization_1 (Batc  (None, 32)               128       
 hNormalization)                                                 
                                                                 
 dense_1 (Dense)             (None, 16)                528       
                                                                 
 batch_normalization_2 (Batc  (None, 16)               6

In [22]:
batch_size = 100
# Train the model with the x/y train data and validate using the test data after each epoch
history = model.fit(x_train, y_train, validation_data=(x_test, y_test), epochs=epochs, batch_size=batch_size)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
# Sets parameters for model
epochs = 10


# Strategy to utilise GPU 
strategy = tf.distribute.OneDeviceStrategy('/gpu:0')

# Model compilation using GPU
with strategy.scope():
  model = Sequential()
  model.add(LSTM(int(sequence_length), input_shape=(sequence_length, 1), return_sequences=True))
  model.add(LSTM(int(sequence_length/1.5), return_sequences=True))
  model.add(LSTM(int(sequence_length/2)))
  model.add(Dense(1, activation='sigmoid'))


  model.compile(loss='binary_crossentropy', optimizer="Adam", metrics=['accuracy'])

  model.summary()

Model: "sequential_5"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm (LSTM)                 (None, 32, 32)            4352      
                                                                 
 lstm_1 (LSTM)               (None, 32, 21)            4536      
                                                                 
 lstm_2 (LSTM)               (None, 16)                2432      
                                                                 
 dense_15 (Dense)            (None, 1)                 17        
                                                                 
Total params: 11,337
Trainable params: 11,337
Non-trainable params: 0
_________________________________________________________________


In [None]:
batch_size = 100
# Train the model with the x/y train data and validate using the test data after each epoch
history = model.fit(x_train, y_train, validation_data=(x_test, y_test), epochs=epochs, batch_size=batch_size)

In [32]:
from keras.layers import Conv1D, Dense, Flatten
from keras.layers import *

# Sets parameters for produced model
epochs = 10
batch_size = 50

# Strategy to utilise GPU 
strategy = tf.distribute.OneDeviceStrategy('/gpu:0')

# Model compilation using GPU
with strategy.scope():
  model = Sequential()
  model.add(Conv1D(filters=128, kernel_size=3, activation='relu', input_shape=(sequence_length,1)))
  model.add(Flatten())
  model.add(Dense(sequence_length, input_shape=(sequence_length, ), activation='relu'))
  model.add(Dense(1, activation='sigmoid'))

model.compile(loss='binary_crossentropy', optimizer="Adam", metrics=['accuracy'])

model.summary()

Model: "sequential_4"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d_3 (Conv1D)           (None, 30, 128)           512       
                                                                 
 flatten_3 (Flatten)         (None, 3840)              0         
                                                                 
 dense_9 (Dense)             (None, 32)                122912    
                                                                 
 dense_10 (Dense)            (None, 1)                 33        
                                                                 
Total params: 123,457
Trainable params: 123,457
Non-trainable params: 0
_________________________________________________________________


In [33]:
batch_size = 100
# Train the model with the x/y train data and validate using the test data after each epoch
history = model.fit(x_train, y_train, validation_data=(x_test, y_test), epochs=epochs, batch_size=batch_size)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [30]:
from keras.layers import Conv1D, Dense, Flatten
from keras.layers import *

# Sets parameters for produced model
epochs = 10
batch_size = 50

# Strategy to utilise GPU 
strategy = tf.distribute.OneDeviceStrategy('/gpu:0')

# Model compilation using GPU
with strategy.scope():
  model = Sequential()
  model.add(Conv1D(filters=128, kernel_size=3, activation='relu', input_shape=(sequence_length,1)))
  model.add(Flatten())
  model.add(BatchNormalization())
  model.add(Dropout(rate=0.1))
  model.add(Dense(sequence_length, input_shape=(sequence_length, ), activation='relu'))
  model.add(BatchNormalization())
  model.add(Dense(1, activation='sigmoid'))

model.compile(loss='binary_crossentropy', optimizer="Adam", metrics=['accuracy'])

model.summary()

Model: "sequential_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv1d_2 (Conv1D)           (None, 30, 128)           512       
                                                                 
 flatten_2 (Flatten)         (None, 3840)              0         
                                                                 
 batch_normalization_5 (Batc  (None, 3840)             15360     
 hNormalization)                                                 
                                                                 
 dropout_2 (Dropout)         (None, 3840)              0         
                                                                 
 dense_7 (Dense)             (None, 32)                122912    
                                                                 
 batch_normalization_6 (Batc  (None, 32)               128       
 hNormalization)                                      

In [31]:
batch_size = 100
# Train the model with the x/y train data and validate using the test data after each epoch
history = model.fit(x_train, y_train, validation_data=(x_test, y_test), epochs=epochs, batch_size=batch_size)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
