In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"]="4"
from keras.models import Sequential
from keras.layers import Dense, Activation, Dropout, MaxPooling1D, Flatten
from keras.layers import LSTM,Convolution1D
from keras.optimizers import RMSprop
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.utils.data_utils import get_file
import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
import random
import sys
import os
import glob

In [None]:
'''
Generate pseudo-random numbers with congruential random number generator.
Only excecute once since data will be saved to hard disk.
'''
m_=24 # try with m_=16 first to see if the network is learning

def rngint(nbit=8):
    return int(rng()*(2**nbit))

def rng(m=2**m_, a=1103515245, c=12345):
    rng.current = (a*rng.current + c) % m
    return float(rng.current)/m

# setting the seed
rng.current = 10

data = np.array([rngint() for i in range(10000000)])
data.tofile('CRNG_10M_M24.bin')
# print (data[:10])

In [None]:
'''
Generate another sequence for testing
'''
# setting the seed
rng.current = 139 # maybe use another seed

data = np.array([rngint() for i in range(10000000)])
data.tofile('CRNG_10M_M24_test.bin')
# print (data[:10])

In [None]:
data = np.fromfile('CRNG_10M_M24.bin', dtype='<i8')
text = list(data)
text = list(map(str,text))
print (data.shape)
print (data[:10])

In [None]:
# Treating each number as a "word". Creating a dictionary.
data = data.astype(np.str)
chars = sorted(list(set(data)))
print(chars)
del data
print('Total words:', len(chars))
char_indices = dict((c, i) for i, c in enumerate(chars))
indices_char = dict((i, c) for i, c in enumerate(chars))

In [None]:
# Length of input. Treating each input that consists of 100 "words" as a "sentence".
maxlen = 10
# Distance between 2 consecutive "sentences"
step = 5

In [None]:
sentences = []
next_chars = []
for i in range(0, len(text) - maxlen, step):
    sentences.append(text[i: (i + maxlen)])
    next_chars.append(text[(i + maxlen)])
print('Number of sentences:', len(sentences))


print('Start vectorization...')
X = np.zeros((len(sentences), maxlen, len(chars)), dtype=np.bool)
y = np.zeros((len(sentences), len(chars)), dtype=np.bool)
for i, sentence in enumerate(sentences):    
    for t, char in enumerate(sentence):        
        X[i, t, char_indices[char]] = 1    
    y[i, char_indices[next_chars[i]]] = 1
print('Done vectorization!')    

In [None]:
# build the RCNN model
print('Build model...')
model = Sequential()
model.add(Convolution1D(filters=64, kernel_size=9, padding='same', activation='relu', input_shape=(maxlen, len(chars))))
model.add(MaxPooling1D(pool_size=2))
model.add(Convolution1D(filters=128, kernel_size=3, padding='same', activation='relu'))
model.add(MaxPooling1D(pool_size=2))
model.add(LSTM(128, return_sequences=False))
model.add(Dropout(0.2))
model.add(Dense(len(chars)))
model.add(Activation('softmax'))

optimizer = RMSprop(lr=0.0005)
model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
print (model.summary())
early_stopping = EarlyStopping(monitor='val_loss', patience=4, verbose=1)
monitoring = ModelCheckpoint('weights_a20_125M.hdf5', monitor='val_loss', verbose=1, save_best_only=True)


In [None]:
model.fit(X, y, epochs=100, batch_size=128, validation_split=0.2, verbose=1, callbacks=[early_stopping,monitoring])


In [None]:
model.load_weights('weights_a20_125M.hdf5')

In [None]:
# Load test data
data = np.fromfile('CRNG_10M_M24_test.bin', dtype='<i8')
test_len=int(0.2*data.shape[0]) # 5 test sets

test1 = data[:test_len]
test2 = data[test_len:test_len*2]
test3 = data[test_len*2:test_len*3]
test4 = data[test_len*3:test_len*4]
test5 = data[test_len*4:]

test1 = list(test1)
test2 = list(test2)
test3 = list(test3)
test4 = list(test4)
test5 = list(test5)

test1 = list(map(str,test1))
test2 = list(map(str,test2))
test3 = list(map(str,test3))
test4 = list(map(str,test4))
test5 = list(map(str,test5))

In [None]:
tests = [test1,test2,test3,test4,test5]
del test1
del test2
del test3
del test4
del test5

In [None]:
for test in tests:
    y_true = []
    y_pred = []
    maxlen = 10
    step = 10 # increase to downsample test set
    sentences = []
    next_chars = []
    for i in range(0, len(test) - maxlen, step):
        sentences.append(test[i: (i + maxlen)])
        next_chars.append(test[(i + maxlen)])
    
    Xt = np.zeros((len(sentences), maxlen, len(chars)), dtype=np.bool)
    yt = np.zeros((len(sentences), len(chars)), dtype=np.bool)
    for i, sentence in enumerate(sentences):
        
        for t, char in enumerate(sentence):        
            Xt[i, t, char_indices[char]] = 1       
        yt[i, char_indices[next_chars[i]]] = 1
    n_true = 0
    diversity = 1
    
    batch_size = 1000
    nb_batch = int(Xt.shape[0]/batch_size)
    
    for i in range(1,nb_batch+1):
        if i % 100 == 0:
            print ("Processed %d predictions, %d correct!" % (i*batch_size,n_true))
        x = Xt[i*batch_size:(i+1)*batch_size]
        preds = model.predict(x, verbose=0)
        pred_next_indexes = list(np.argmax(preds,axis=1))
        pred_next_chars = [indices_char[next_index] for next_index in pred_next_indexes]        
        y_pred += pred_next_chars
        
        true_next_indexes = list(np.argmax(yt[i*batch_size:(i+1)*batch_size],axis=1))
        true_next_chars = [indices_char[next_index] for next_index in true_next_indexes]
        y_true += true_next_chars
        
        n_true += np.sum(np.array(pred_next_chars)==np.array(true_next_chars))
        
    y_true = list(map(int,y_true))
    y_pred = list(map(int,y_pred))
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)
    mse = np.mean(np.square(y_true-y_pred))
    print ('mse',mse)
    print ('Total {0:d} predictions, {1:d} correct, accuracy is {2:.2f}%.'.format(yt.shape[0],n_true,float(n_true)/yt.shape[0]*100.0))