In [1]:
!pip install py-enigma
!pip install torchtext==0.10.0

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import math
import torch
import torchtext.legacy
import random
import re
import copy
import time
import spacy
import numpy as np
from torch import nn, Tensor
import matplotlib.pyplot as plt
from torchtext.legacy.data import Field
import torch.nn.functional as F
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from torch.nn import Transformer
from random import randrange, choices
from enigma.machine import *

In [3]:
def generateSample(rotor = False, ring = False, reflector = False, plug = False, start = False, plainText = 'random'):
  rotorSet = ["I", "II", "III", "IV", "V"]
  reflectorSet = ['B', 'C']
  alphabet = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M'
      , 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z']

  rotorSettings = ''
  ringSettings = ''
  reflectorSettings = ''
  plugSettings = ''
  startSettings = ''

  # Get rotor settings
  if(rotor):
    rotorSettings = ' '.join(random.sample(rotorSet, 3))
  else:
    rotorSettings = 'I II III'

  # Get ring settings
  if(ring):
    ringSettings = ' '.join([str(randrange(26) + 1), str(randrange(26) + 1), str(randrange(26) + 1)])
  else:
    ringSettings = '1 1 1'

  # Get reflector settings
  if(reflector):
    reflectorSettings = random.sample(reflectorSet, 1)[0]
  else:
    reflectorSettings = 'B'

  # Get plugboard Settings
  if(plug):
    plugSet = alphabet
    for x in range(10):
      plugSettings += (plugSet.pop(randrange(len(plugSet))) + plugSet.pop(randrange(len(plugSet))) + ' ')
    plugSettings = plugSettings[:-1]
  else:
    plugSettings = 'AB CD EF GH IJ KL MN OP QR ST'

  # Get rotor starting positions
  if(start):
    startSettings = ''.join(random.sample(alphabet, 3))
  else:
    startSettings = ('AAA')

  # Generate random text or use provided text
  if(plainText == 'random'):
    plainText = ''.join(random.choices(alphabet, k=250))
  plainText = re.sub(r'[^a-zA-Z]', '', plainText).upper()

  # Create machine
  machine = EnigmaMachine.from_key_sheet(
                  rotors=rotorSettings,
                  reflector=reflectorSettings,
                  ring_settings=ringSettings,
                  plugboard_settings=plugSettings)
  # Set starting position
  machine.set_display(startSettings)
  cipherText = machine.process_text(plainText, replace_char=None)

  targetParts = (rotorSettings, ringSettings, startSettings, reflectorSettings, plugSettings, plainText)

  target = '-'.join(targetParts)
  target = target + ('_' * (310 - len(target)))
  sample = cipherText + ('_' * (310 - len(cipherText)))

  return sample, target

In [4]:
sample, target = generateSample(rotor=True, ring=True, start=True, reflector=True, plug=True)
print(len(sample))
print(len(target))
sample, target = generateSample(rotor=True, ring=True, start=True, reflector=True, plug=True)
print(sample)
print(target)

310
310
EORXUNYQPISSPMRAUZEMFEDYKSZYRWBLECGXCKKXXMSUYCUYQOYTRDVDHESGRYALUHXWBCCTORRJBGZSHDXWRJPPJGLWHFUBZYAFKFAWHORSNQQGJLUWIMNQTRGSPBQKZRWOWOZFECZJIGKUUIOHERWAQRAMPBLTOBDZYJVIKTLJEQZYNUJMBDJPEDQSVFZRILNSOCPSQWKGDTMZZFOAVFVSCGRLJFSCGOENSUDBCCOYJUXYSPZHKKPLVI____________________________________________________________
IV V III-18 3 16-FXO-B-IS JL MN WD HA RP CU KB TE GV-YFFZOQXYYYXYQOQQZQFFZQFFXOYFOZQOFXZZYOOOZXQYOOYFYZZYXQZZQYOXFFYOXYFQZYXYFOQZOOFQYFZQFYFXZZXXQXFZQXXYZZZQZXFOXXXQZXQZQQFXZQQOQQOZXXXFOQYQZQFOFYZYQXQXYFOQZYXFFQXOFZOFZXQOQOZXFZQOXQOZOQXYXFFYXXFQQZFZQXQOXYXFXXZXFOFQFYFOFZZOYZYZXZQXZYOYQFYQOYZQFOYXOZOQOO_______


In [5]:
class Embedder(nn.Module):
  def __init__(self, vocab_size, d_model):
    super().__init__()
    self.embed = nn.Embedding(vocab_size, d_model)
  def forward(self, x):
    return self.embed(x)
        
class PositionalEncoder(nn.Module):
  def __init__(self, d_model, max_seq_len = 310):
    super().__init__()
    self.d_model = d_model
    
    # create constant 'pe' matrix with values dependant on 
    # pos and i
    pe = torch.zeros(max_seq_len, d_model)
    for pos in range(max_seq_len):
      for i in range(0, d_model, 2):
        pe[pos, i] = \
        math.sin(pos / (10000 ** ((2 * i)/d_model)))
        pe[pos, i + 1] = \
        math.cos(pos / (10000 ** ((2 * (i + 1))/d_model)))
          
    pe = pe.unsqueeze(0)
    self.register_buffer('pe', pe)

  
  def forward(self, x):
    # make embeddings relatively larger
    x = x * math.sqrt(self.d_model)
    #add constant to embedding
    seq_len = x.size(1)
    x = x + self.pe[:,:seq_len]#.cuda()
    return x

class MultiHeadAttention(nn.Module):
  def __init__(self, heads, d_model, dropout = 0.1):
    super().__init__()
    
    self.d_model = d_model
    self.d_k = d_model // heads
    self.h = heads
    
    self.q_linear = nn.Linear(d_model, d_model)
    self.v_linear = nn.Linear(d_model, d_model)
    self.k_linear = nn.Linear(d_model, d_model)
    self.dropout = nn.Dropout(dropout)
    self.out = nn.Linear(d_model, d_model)
  
  def forward(self, q, k, v, mask=None):
    bs = q.size(0)
    
    # perform linear operation and split into h heads
    
    k = self.k_linear(k).view(bs, -1, self.h, self.d_k)
    q = self.q_linear(q).view(bs, -1, self.h, self.d_k)
    v = self.v_linear(v).view(bs, -1, self.h, self.d_k)
    
    # transpose to get dimensions bs * h * sl * d_model
    
    k = k.transpose(1,2)
    q = q.transpose(1,2)
    v = v.transpose(1,2)# calculate attention using function we will define next
    scores = attention(q, k, v, self.d_k, mask, self.dropout)
    
    # concatenate heads and put through final linear layer
    concat = scores.transpose(1,2).contiguous()\
    .view(bs, -1, self.d_model)
    
    output = self.out(concat)

    return output

def attention(q, k, v, d_k, mask=None, dropout=None):
  scores = torch.matmul(q, k.transpose(-2, -1)) /  math.sqrt(d_k)

  if mask is not None:
    mask = mask.unsqueeze(1)
    scores = scores.masked_fill(mask == 0, -1e9)
  scores = F.softmax(scores, dim=-1)
  
  if dropout is not None:
    scores = dropout(scores)
      
  output = torch.matmul(scores, v)
  return output

class FeedForward(nn.Module):
  def __init__(self, d_model, d_ff=2048, dropout = 0.1):
    super().__init__() 
    # We set d_ff as a default to 2048
    self.linear_1 = nn.Linear(d_model, d_ff)
    self.dropout = nn.Dropout(dropout)
    self.linear_2 = nn.Linear(d_ff, d_model)
  def forward(self, x):
    x = self.dropout(F.relu(self.linear_1(x)))
    x = self.linear_2(x)
    return x

class Norm(nn.Module):
  def __init__(self, d_model, eps = 1e-6):
    super().__init__()

    self.size = d_model
    # create two learnable parameters to calibrate normalisation
    self.alpha = nn.Parameter(torch.ones(self.size))
    self.bias = nn.Parameter(torch.zeros(self.size))
    self.eps = eps
  def forward(self, x):
    norm = self.alpha * (x - x.mean(dim=-1, keepdim=True)) \
    / (x.std(dim=-1, keepdim=True) + self.eps) + self.bias
    return norm

class EncoderLayer(nn.Module):
  def __init__(self, d_model, heads, dropout = 0.1):
    super().__init__()
    self.norm_1 = Norm(d_model)
    self.norm_2 = Norm(d_model)
    self.attn = MultiHeadAttention(heads, d_model)
    self.ff = FeedForward(d_model)
    self.dropout_1 = nn.Dropout(dropout)
    self.dropout_2 = nn.Dropout(dropout)
      
  def forward(self, x, mask):
    x2 = self.norm_1(x)
    x = x + self.dropout_1(self.attn(x2,x2,x2,mask))
    x2 = self.norm_2(x)
    x = x + self.dropout_2(self.ff(x2))
    return x
    

class DecoderLayer(nn.Module):
  def __init__(self, d_model, heads, dropout=0.1):
    super().__init__()
    self.norm_1 = Norm(d_model)
    self.norm_2 = Norm(d_model)
    self.norm_3 = Norm(d_model)
    
    self.dropout_1 = nn.Dropout(dropout)
    self.dropout_2 = nn.Dropout(dropout)
    self.dropout_3 = nn.Dropout(dropout)
    
    self.attn_1 = MultiHeadAttention(heads, d_model)
    self.attn_2 = MultiHeadAttention(heads, d_model)
    self.ff = FeedForward(d_model)#.cuda()
  def forward(self, x, e_outputs, src_mask, trg_mask):
    x2 = self.norm_1(x)
    x = x + self.dropout_1(self.attn_1(x2, x2, x2, trg_mask))
    x2 = self.norm_2(x)
    x = x + self.dropout_2(self.attn_2(x2, e_outputs, e_outputs,
    src_mask))
    x2 = self.norm_3(x)
    x = x + self.dropout_3(self.ff(x2))
    return x
def get_clones(module, N):
  return nn.ModuleList([copy.deepcopy(module) for i in range(N)])

class Encoder(nn.Module):
  def __init__(self, vocab_size, d_model, N, heads):
    super().__init__()
    self.N = N
    self.embed = Embedder(vocab_size, d_model)
    self.pe = PositionalEncoder(d_model)
    self.layers = get_clones(EncoderLayer(d_model, heads), N)
    self.norm = Norm(d_model)
  def forward(self, src, mask):
    x = self.embed(src)
    x = self.pe(x)
    for i in range(self.N):
      x = self.layers[i](x, mask)
    return self.norm(x)
    
class Decoder(nn.Module):
  def __init__(self, vocab_size, d_model, N, heads):
    super().__init__()
    self.N = N
    self.embed = Embedder(vocab_size, d_model)
    self.pe = PositionalEncoder(d_model)
    self.layers = get_clones(DecoderLayer(d_model, heads), N)
    self.norm = Norm(d_model)
  def forward(self, trg, e_outputs, src_mask, trg_mask):
    x = self.embed(trg)
    x = self.pe(x)
    for i in range(self.N):
      x = self.layers[i](x, e_outputs, src_mask, trg_mask)
    return self.norm(x)

class Transformer(nn.Module):
  def __init__(self, src_vocab, trg_vocab, d_model, N, heads):
    super().__init__()
    self.encoder = Encoder(src_vocab, d_model, N, heads)
    self.decoder = Decoder(trg_vocab, d_model, N, heads)
    self.out = nn.Linear(d_model, trg_vocab)
  def forward(self, src, trg, src_mask, trg_mask):
    e_outputs = self.encoder(src, src_mask)
    d_output = self.decoder(trg, e_outputs, src_mask, trg_mask)
    output = self.out(d_output)
    return output

In [6]:
def createMasks(sample, target):
  sampleLength = len(sample)
  sampleMask = np.ones((1, sampleLength, sampleLength))
  sampleMask = torch.from_numpy(sampleMask)

  targetLength = len(target)
  targetMask = np.tril(np.ones((1, targetLength, targetLength)), k=0).astype('uint8')
  targetMask = torch.from_numpy(targetMask)

  return sampleMask, targetMask

In [7]:
class tokenize():
  sampleAlphabet = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '_']
  targetAlphabet = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', ' ', '-', '_']
  def __init__(self):
    en = spacy.load("en_core_web_sm")

    def tokenize_sample(input):
      return [tok.text for tok in tokenize_sample(input)]

    def tokenize_target(input):
      return [tok.text for tok in tokenize_target(input)]

    self.sampleText = Field(tokenize=tokenize_sample)
    self.sampleText.build_vocab(self.sampleAlphabet)
    self.targetText = Field(tokenize=tokenize_target, init_token="<sos>", eos_token="<eos>")
    self.targetText.build_vocab(self.targetAlphabet)
  def makeTokens(self, sample, target):
    sampleOutput = []

    while(len(sample) > 0):
      sampleOutput.append(self.sampleText.vocab.stoi[sample[0]])
      sample = sample[1:]

    targetOutput = []

    while(len(target) > 0):
      targetOutput.append(self.targetText.vocab.stoi[target[0]])
      target = target[1:]

    return torch.tensor(sampleOutput, dtype=torch.int), torch.tensor(targetOutput, dtype=torch.int)


In [8]:
def generateBatch(batchSize):
  i = 0
  iterations = []
  tkn = tokenize()
  while i  < batchSize:
    i += 1
    sample, target = generateSample(rotor=True, ring=True, reflector=False, start=False, plug=False)
    sample, target = tkn.makeTokens(sample, target)  

    sampleMask, targetMask = createMasks(sample, target)
    iterations.append([sample, target, sampleMask, targetMask])
  return iterations

In [9]:
d_model = 512
heads = 32
N = 24

batch_size = 100
training_iters = 5000

src_vocab = 50
trg_vocab = 50

model = Transformer(src_vocab, trg_vocab, d_model, N, heads)

for p in model.parameters():
  if p.dim() > 1:
    nn.init.xavier_uniform_(p)
i = 0
losses = []
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)
model.train()
while i < training_iters:
  batch = generateBatch(batch_size)
  j = 0
  for sample in batch:
    preds = model(batch[j][0], batch[j][1], batch[j][2], batch[j][3])

    loss = F.cross_entropy(preds.view(-1, preds.size(-1)), batch[j][1].type(torch.LongTensor))
    optimizer.zero_grad()         
    loss.backward()
    optimizer.step()

    losses.append(loss.item())
    j += 1
  np.save('./losses', losses)
  torch.save(model.state_dict(), './model')

KeyboardInterrupt: ignored