# Cracking the Enigma Using Apache Spark

The Enigma machine was a configurable device used by the Germans during World War II to encrypt confidential
information. Although Nazi Germany placed great faith in the capabilities of the machine, it suffered from a few design
flaws that the Allies managed to exploit, enabling them to read secret information. The story of deciphering the Enigma
has been depicted in the film "The Imitation Game."

In this post, I will demonstrate how I cracked the Enigma machine using Apache Spark. This framework is widely used in
big data analysis because it allows for distributed calculations and storage of data. This way, one can execute a
massive amount of calculations on different machines, which can dramatically speed up the overall execution time. The
process of cracking the Enigma involves several number crunching steps that can be executed in parallel. Because of
this, Spark is an ideal tool to speed up the process.

## The Overall Approach for Cracking the Enigma

In this post, I won't go into detail about the underlying workings of the Enigma machine or the algorithms that can be
used to crack it. In short, the Enigma machine is a device that scrambles input letters to produce different output
letters. With each button press, the mapping changes. This, combined with the fact that the Enigma machine could be
configured in 158,962,555,217,826,360,000 different ways, led the Germans to believe that it was uncrackable. However,
the fact that each input letter differs from the output makes it relatively easy to align a "crib," a short known piece
of the original text, with the encrypted text. Numberphile has some
informative [videos](https://www.youtube.com/watch?v=G2_Q9FoD-oQ) about the Enigma in which they explain the algorithms
involved in breaking the Enigma. I implemented these algorithms and an Enigma emulator in a separate Python package,
which I used in a standalone script to break the Enigma with parallel computation on Databricks (sort of Spark as a
service). All the code for this project is available on Github.



### Setting up the infrastructure
In this project I use Databricks as the Spark provider. To set up the Databricks workspace in azure cloud I use
[Terraform](https://www.terraform.io/). Assuming that terraform is installed, enter the terraform folder and run:
```commandline
terraform init
terraform apply
```
After the whole infrastructure is created, which will probably take a few minutes, you can navigate to
the databricks workspace by clicking on the URL that is outputed in the terminal.
The wokspace if preconfigured with a single node cluster with the engima-emulator installed and this notebook that you can run interactively


### Cracking the Enigma using a crib
We start with encrypting a message using the emulator with a configured Enigma machine that we are going to crack.
In this project we asume that we have a crib, that is, a decipherd/original piece of the encrypted text. In reality, these cribs were obtained either by informants or simply because they where trivial and included in daily messages. In this notebook I assume that we no the alignment of the crib with the cipher text, it is just the first piece of the text.

In [0]:
import pyspark.sql.functions as F
from typing import List
from enigma.crack import get_possible_rotor_settings, PlugBoardResolver, InvalidSettings
from enigma.emulator import Enigma
from pyspark.sql.session import SparkSession
from pyspark.sql import Row
from functools import partial


# build spark session
spark = SparkSession.builder.getOrCreate()

crib = "WettervorhersageXXXfurxdiexRegionXXXOstXXXMoskau".upper()
complete_message = f"{crib}xxxextremxKaltexxxTemperaturenxxxundxxxstarkerxxxSchneefallxxxseixxxgewarnt"

e = Enigma(rotor_types="II,III,I", ring_settings="I,A,A", plugboard_pairs="CU,DL,EP,KN,MO,XZ")
cypher = e.encrypt(crib)
print(cypher)

INFO:py4j.clientserver:Received command c on object id p0


AWSQGVWKHXXAGSITSYIIKWHGEROPFSVRMPIMACSLKLWAJSLX


In the next steps we will try to crack the enigma using the crib and the cypher text. For this, we will first declare a few generic functions that we are going to use.


In [0]:
NOT_RESOLVABLE = "NOT_RESOLVABLE"  # sentinel to indicate that rotor settings are not valid/resolvable

def translate_with_settings(rotor_types: str, reflector_type: str, ring_settings: str, plugboard_pairs: str, text: str):
    """Translates a text using Enigma machine and provided configurations

    Args:
        rotor_types (str): The rotor types as comma separated string e.g. "III,II,I"
        reflector_type (str): The reflector to use e.g. "B"
        ring_settings (str): The ring settings to use (offsets) e.g. "A,B,A"
        plugboard_pairs (str): The plugboard config as comma separated string e.g. "AC,BD"
        text (str): The text to encrypt
    Returns:
        encrypted_text (str): The encrypted text
    """
    enigma = Enigma(rotor_types=rotor_types, reflector_type=reflector_type, ring_settings=ring_settings,
                    plugboard_pairs=plugboard_pairs)
    return enigma.encrypt(text)


def score_setting(rotor_types: str, reflector_type: str, ring_settings: str, plugboard_pairs: str, cypher: str,
                  crib: str) -> int:
    """Score the enigma settings using a known crib and the corresponding cypher text based on their
    similarity

    Args:
        rotor_types (str): The rotor types as comma separated string e.g. "III,II,I"
        reflector_type (str): The reflector to use e.g. "B"
        ring_settings (str): The ring settings to use (offsets) e.g. "A,B,A"
        plugboard_pairs (str): The plugboard config as comma separated string e.g. "AC,BD"
        cypher (str): The encrypted text
        crib (str): The original text
    Returns:
        score (int): the score of the settings
    """
    output = translate_with_settings(rotor_types, reflector_type, ring_settings, plugboard_pairs, cypher)
    return len([l for l, r in zip(output, crib) if l == r])


def resolve_plugboard_possibilities(rotor_types: str, reflector_type: str, ring_settings: str, cypher: str,
                                    crib: str) -> List[str]:
    """Find possible plugboard configurations for 'rotor' configured enigma machine that do not contradict

    Args:
        rotor_types (str): The rotor types as comma separated string e.g. "III,II,I"
        reflector_type (str): The reflector to use e.g. "B"
        ring_settings (str): The ring settings to use (offsets) e.g. "A,B,A"
        cypher (str): The encrypted text
        crib (str): The original text
    Returns:
        plugboard_configurations (List[str]): a list of possible plugboard configurations
    """

    plugboard_resolver = PlugBoardResolver(crib, cypher, rotor_types, reflector_type, ring_settings)
    try:
        plugboard_resolver.eliminate_pairs()
        if plugboard_resolver.get_remaining() > 1000:
            raise InvalidSettings()
        else:
            return plugboard_resolver.get_remaining_pairs()
    except InvalidSettings:
        return [NOT_RESOLVABLE]



INFO:py4j.clientserver:Received command c on object id p0


And a few partial functions for this particular crib and cypher text


In [0]:
# To find best matching rotor settings without plugboard configured
naive_score_function = partial(score_setting, cypher=cypher, crib=crib, plugboard_pairs=None)

# To retrieve possible plugboard configurations
resolve_plugboard_function = partial(resolve_plugboard_possibilities, cypher=cypher, crib=crib)

# To score a completely configured enigma
score_function = partial(score_setting, cypher=cypher, crib=crib)

INFO:py4j.clientserver:Received command c on object id p0


The first step is to resolve the rotor settings. Since not all letters are connected by the plugboard, the rotor configuration 'leaks' trough the cypher text. We will use brute force to retrieve the most probable rotor configurations

In [0]:
# get al lrotor settings and their translations
rdd = spark.sparkContext.parallelize(
    [(v, Row(rotor_types=v[0], reflector_type=v[1], ring_settings=v[2])) for v in get_possible_rotor_settings()], 2)

# Get scores for every rotor_settings without any plugboard configuration.
rdd = rdd.mapValues(
    lambda r: Row(rotor_types=r.rotor_types, reflector_type=r.reflector_type, ring_settings=r.ring_settings,
                  naive_score=naive_score_function(r.rotor_types, r.reflector_type, r.ring_settings)))



INFO:py4j.clientserver:Received command c on object id p0


Next, we will take the most probable rotor settings and try to resolve the plugboard by eliminating as much as possible. After that, we will assess all posiblities score them.

In [0]:
# only consider the best naive rotor settings
rdd = spark.sparkContext.parallelize(rdd.top(100, key=lambda r: r[1].naive_score))

rdd = rdd.flatMapValues(
    lambda r: [Row(rotor_types=r.rotor_types,
                   reflector_type=r.reflector_type,
                   ring_settings=r.ring_settings,
                   plugboard_pairs=pp) for pp in
               resolve_plugboard_function(r.rotor_types, r.reflector_type, r.ring_settings)])

rdd = rdd.filter(lambda x: x[1].plugboard_pairs != NOT_RESOLVABLE)

rdd = rdd.mapValues(lambda r: Row(rotor_types=r.rotor_types,
                                  reflector_type=r.reflector_type,
                                  ring_settings=r.ring_settings,
                                  plugboard_pairs=r.plugboard_pairs,
                                  score=score_function(r.rotor_types, r.reflector_type, r.ring_settings,
                                                       r.plugboard_pairs),
                                  translated=translate_with_settings(r.rotor_types, r.reflector_type, r.ring_settings,
                                                                     r.plugboard_pairs, cypher)

                                  ))


INFO:py4j.clientserver:Received command c on object id p0


We will convert the results to a dataframe and order them by their score to see if we found the right settings...

In [0]:
df = rdd.map(lambda v: v[1]).toDF()
df = df.filter(F.col('plugboard_pairs') != NOT_RESOLVABLE)
df = df.orderBy(F.col('score'), ascending=False)
df.show(truncate=False)


INFO:py4j.clientserver:Received command c on object id p0


+-----------+--------------+-------------+-----------------------------+-----+------------------------------------------------+
|rotor_types|reflector_type|ring_settings|plugboard_pairs              |score|translated                                      |
+-----------+--------------+-------------+-----------------------------+-----+------------------------------------------------+
|II,III,I   |B             |I,A,A        |CU,DL,EP,KN,MO,XZ            |48   |WETTERVORHERSAGEXXXFURXDIEXREGIONXXXOSTXXXMOSKAU|
|II,III,I   |B             |I,A,A        |BJ,CU,DL,EP,KN,MO,XZ         |47   |WETTERVORHERSAGEXXXFURXDIEXREGIONXXXOSTXXXMOOKAU|
|II,III,I   |B             |I,A,A        |BQ,CU,DL,EP,KN,MO,XZ         |47   |WETOERVORHERSAGEXXXFURXDIEXREGIONXXXOSTXXXMOSKAU|
|II,III,I   |B             |I,A,A        |BY,CU,DL,EP,KN,MO,XZ         |47   |WETTERVORHERSAGEXPXFURXDIEXREGIONXXXOSTXXXMOSKAU|
|II,III,I   |B             |I,A,A        |BJ,BQ,CU,DL,EP,JQ,KN,MO,XZ   |46   |WETWERVORHERSAGEXXXFURXDIE

That's it! We found the settings! The Enigma that we cracked in this notebook was relatively easy. When more letters are connected through te plugboard it becomes harder to resolve the right rotor configuration. In such case, it would make sense to assess more rotor configurations and use a larger cluster.