In [1]:
import torch
import numpy as np
from model import cp_2_k_mask, cp_2_k_onnx
from config import args
from dataset import Enigma_simulate_c_2_p, Enigma_simulate_cp_2_k_limited, Enigma_simulate_cp_2_k
from torch.utils.data import DataLoader
import math
from torchsummary import summary
from tqdm import tqdm

Copying the compiled weight to regular models

In [2]:
# Load trained model's weights and config
ckpt = torch.load('CP2K_RNN_ENC_ckpt.pt')
ckpt_args = ckpt['args']

# Initialize new model by configs
model = cp_2_k_mask(args=ckpt_args, out_channels=26)
model.to('cuda')
model.eval()

# Load weights, calculate the number of parameters
weights = []
num_param = 0
for k, v in ckpt['weights'].items():
    weights.append(v)
    num_param += np.prod(v.shape)
print(f"Parameters: {num_param}")

# Copying weights from compile model to new model
for idx, (k, v) in enumerate(model.state_dict().items()):
    # We have to copying like this, I am so confused
    v *= 0
    v += weights[idx].detach()

# Set the model to evaluate mode
model.eval()

Parameters: 27408974


cp_2_k_mask(
  (networks): ModuleList(
    (0): RNN_encoder(
      (enc): LSTM(512, 512, num_layers=2, dropout=0.2, bidirectional=True)
      (dropout): Dropout(p=0.2, inplace=False)
    )
    (1): TransformerEncoder(
      (layers): ModuleList(
        (0): TransformerEncoderLayer(
          (self_attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=1024, out_features=1024, bias=True)
          )
          (linear1): Linear(in_features=1024, out_features=2048, bias=True)
          (dropout): Dropout(p=0.2, inplace=False)
          (linear2): Linear(in_features=2048, out_features=1024, bias=True)
          (norm1): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
          (norm2): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
          (dropout1): Dropout(p=0.2, inplace=False)
          (dropout2): Dropout(p=0.2, inplace=False)
        )
        (1): TransformerEncoderLayer(
          (self_attn): MultiheadAttention(
           

Running a mini batch of examples

In [3]:
dataset = Enigma_simulate_cp_2_k_limited(args=args)
dataloader = DataLoader(
        dataset=dataset,
        batch_size=512,
        collate_fn=dataset.collate_fn_padding,
        shuffle=True
    )

data_iter = iter(dataloader)
inputs, targets, masks = next(data_iter)
inputs, targets, masks = inputs.to('cuda'), targets.to('cuda'), masks.to('cuda')

outputs = model(inputs, masks)

print(f"Input shape: {inputs.shape} {'[seq, batch, feats]'}\n"
          f"Target shape: {targets.shape}  {'[rotor, seq, batch]'}\n"
          f"Mask shape: {masks.shape}  {'[batch, seq]'}\n"
          f"Output shape: {outputs.shape}  {'[rotor, seq, batch, feats]'}")

true_positive = 0
samples = 0

outputs_indices = torch.argmax(outputs, dim=-1) # -> [rotor, seq, batch]
for rotor in range(outputs_indices.shape[0]):
    mask = outputs_indices[rotor][~masks.T] == targets[rotor][~masks.T]
    true_positive += mask.sum()
    samples += math.prod(mask.shape)

print(f"Acc: {true_positive / samples}")

Input shape: torch.Size([30, 512, 52]) [seq, batch, feats]
Target shape: torch.Size([3, 30, 512])  [rotor, seq, batch]
Mask shape: torch.Size([512, 30])  [batch, seq]
Output shape: torch.Size([3, 30, 512, 26])  [rotor, seq, batch, feats]
Acc: 0.9992404580116272


In [4]:
# Make prediction based on user's inputs( Results should be "AES")
from dataset import cipher_plain_text_2_tensor
inputs, masks = cipher_plain_text_2_tensor('VMPDTAJYTXDZNEFOSOTPJOYSMO', 'WEARETHECHAMPIONANDTHEBEST') # Predict on usr inputs
inputs, masks = inputs.to('cuda'), masks.to('cuda')


# Make prediction
with torch.no_grad():
    outputs = model(inputs, masks).argmax(dim=-1).squeeze(-1) # -> shape [rotor, seq]

# print the prediction at the first postion, which is the initial states
print(f"Prediction: {chr(ord('A') + outputs[0, 0])}"
      f"{chr(ord('A') + outputs[1, 0])}"
      f"{chr(ord('A') + outputs[2, 0])}")

Prediction: AES


In [5]:
# Transfer to onnx
import onnx, onnxruntime
onnx_model = cp_2_k_onnx(args=ckpt_args, out_channels=26)
onnx_model.to('cuda')
onnx_model.eval()

# Load weights, calculate the number of parameters
weights = []
num_param = 0
for k, v in ckpt['weights'].items():
    weights.append(v)
    num_param += np.prod(v.shape)
print(f"Parameters: {num_param}")

# Copying weights from compile model to new model
for idx, (k, v) in enumerate(onnx_model.state_dict().items()):
    # We have to copying like this, I am so confused
    v *= 0
    v += weights[idx].detach()

# Testing the pre-export model in batch size 1
dataloader = DataLoader(
        dataset=dataset,
        batch_size=1,
        collate_fn=dataset.collate_fn_padding,
        shuffle=True
    )

# Get samples for dataset
data_iter = iter(dataloader)
inputs, targets, masks = next(data_iter)
inputs, targets, masks = inputs.to('cuda'), targets.to('cuda'), masks.to('cuda')

outputs = onnx_model(inputs)

print(f"Input shape: {inputs.shape} {'[seq, batch, feats]'}\n"
      f"Target shape: {targets.shape}  {'[rotor, seq, batch]'}\n"
      f"Mask shape: {masks.shape}  {'[batch, seq]'}\n"
      f"Output shape: {outputs.shape}  {'[rotor, seq, batch, feats]'}")


true_positive = 0
samples = 0

outputs_indices = torch.argmax(outputs, dim=-1) # -> [rotor, seq, batch]
for rotor in range(outputs_indices.shape[0]):
    mask = outputs_indices[rotor][~masks.T] == targets[rotor][~masks.T]
    true_positive += mask.sum()
    samples += math.prod(mask.shape)

print(f"Acc: {true_positive / samples}")

# Export onnx model
torch.onnx.export(
    onnx_model,
    args=inputs,
    f="seq5-45-large.onnx",
    input_names=["input_1"],
    output_names=["output1"]
)

# Load and verify onnx model
ort_session = onnxruntime.InferenceSession("seq5-45-large.onnx")
inputs, _ = cipher_plain_text_2_tensor('WEARETHECHAMPIONANDTHEBESTPLAY', 'VMPDTAJYTXDZNEFOSOTPJOYSMOEBNX')
outputs = ort_session.run(
    None,
    {"input_1": np.array(inputs)},
)

outputs = outputs[0].argmax(axis=-1).reshape(outputs[0].shape[0], outputs[0].shape[1])
print(outputs.shape)
print(f"Onnx Prediction: {chr(ord('A') + outputs[0, 0])}"
      f"{chr(ord('A') + outputs[1, 0])}"
      f"{chr(ord('A') + outputs[2, 0])}")


Parameters: 27408974
Input shape: torch.Size([30, 1, 52]) [seq, batch, feats]
Target shape: torch.Size([3, 30, 1])  [rotor, seq, batch]
Mask shape: torch.Size([1, 30])  [batch, seq]
Output shape: torch.Size([3, 30, 1, 26])  [rotor, seq, batch, feats]
Acc: 1.0




(3, 30)
Onnx Prediction: AES


In [None]:
# Testint accuracy in different length
testing_args = args
results = {}


for length in range(5, 51):
    testing_args['SEQ_LENGTH'] = [length, length]
    dataset = Enigma_simulate_cp_2_k_limited(args=testing_args, mode='test')
    dataloader = DataLoader(
        dataset=dataset,
        batch_size=1024,
        collate_fn=dataset.collate_fn_padding,
        shuffle=False,
        drop_last=False
    )

    # Tracking
    true_positive = 0
    samples = 0

    bar = tqdm(dataloader, leave=True)
    bar.set_description_str(f"Length: {length}")

    for inputs, targets, masks in bar:
        inputs, targets, masks = inputs.to('cuda'), targets.to('cuda'), masks.to('cuda')

        # Making prediction
        # with torch.cuda.amp.autocast():
        with torch.no_grad():
            outputs = model(inputs, masks)

            # Compute accuracy
            outputs_indices = torch.argmax(outputs, dim=-1) # -> [rotor, seq, batch]
            for rotor in range(outputs_indices.shape[0]):
                mask = outputs_indices[rotor][~masks.T] == targets[rotor][~masks.T]
                true_positive += mask.sum()
                samples += math.prod(mask.shape)

        # Set bar's postfix
        bar.set_postfix_str(f"Acc: {(true_positive / samples).item()}")

    # Output and record the result
    results[length] = (true_positive / samples).item()

Length: 5: 100%|██████████| 35/35 [00:12<00:00,  2.87it/s, Acc: 0.12144970148801804]
Length: 6: 100%|██████████| 35/35 [00:13<00:00,  2.57it/s, Acc: 0.1929999142885208] 
Length: 7: 100%|██████████| 35/35 [00:14<00:00,  2.40it/s, Acc: 0.3028290569782257] 
Length: 8: 100%|██████████| 35/35 [00:16<00:00,  2.09it/s, Acc: 0.44813477993011475]
Length: 9: 100%|██████████| 35/35 [00:17<00:00,  2.03it/s, Acc: 0.5972833037376404]
Length: 10: 100%|██████████| 35/35 [00:17<00:00,  1.98it/s, Acc: 0.725593626499176] 
Length: 11: 100%|██████████| 35/35 [00:21<00:00,  1.62it/s, Acc: 0.8123370409011841]
Length: 12: 100%|██████████| 35/35 [00:22<00:00,  1.59it/s, Acc: 0.8774354457855225]
Length: 13: 100%|██████████| 35/35 [00:23<00:00,  1.50it/s, Acc: 0.9182465672492981]
Length: 14: 100%|██████████| 35/35 [00:25<00:00,  1.36it/s, Acc: 0.9436074495315552]
Length: 15: 100%|██████████| 35/35 [00:27<00:00,  1.29it/s, Acc: 0.9645280838012695]
Length: 16: 100%|██████████| 35/35 [00:28<00:00,  1.23it/s, Acc: 0

Ploting the results

In [None]:
import matplotlib.pyplot as plt


x = list(results.keys())
y = list(results.values())

x, y = np.array(x), np.array(y)
print(x, y)

fig, ax = plt.subplots(figsize=(12, 10), dpi=80)
ax.bar(x,height=y)
plt.title('Seq5-45-large', fontsize=15)
ax.set_ylabel('Acc', fontsize=15)
ax.set_xlabel('Length', fontsize=15)
plt.show()
# ax.plot(bins, y, '--')