<a href="https://colab.research.google.com/github/VrindaBajaj20/ai-attack-snow3g-nea1/blob/main/data_generation/nea1_snow3g.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#SNOW3G NEA1 ALGORITHM TO GENERATE KEYSTREAMS

In [6]:
# Full 3GPP-compliant SNOW-3G keystream generator (Google Colab-ready)
# Run in Google Colab. Upload CSV with columns: KEY, Direction, LENGTH, COUNT, Bearer ID
from google.colab import files
import pandas as pd
from typing import List

# -----------------------
# helpers
# -----------------------
def u32(x: int) -> int:
    return x & 0xFFFFFFFF

def SWAP32(x: int) -> int:
    return ((x & 0xFF000000) >> 24) | ((x & 0x00FF0000) >> 8) | ((x & 0x0000FF00) << 8) | ((x & 0x000000FF) << 24)

# -----------------------
# S-box tables (official)
# -----------------------
SR = [
0x63,0x7C,0x77,0x7B,0xF2,0x6B,0x6F,0xC5,0x30,0x01,0x67,0x2B,0xFE,0xD7,0xAB,0x76,
0xCA,0x82,0xC9,0x7D,0xFA,0x59,0x47,0xF0,0xAD,0xD4,0xA2,0xAF,0x9C,0xA4,0x72,0xC0,
0xB7,0xFD,0x93,0x26,0x36,0x3F,0xF7,0xCC,0x34,0xA5,0xE5,0xF1,0x71,0xD8,0x31,0x15,
0x04,0xC7,0x23,0xC3,0x18,0x96,0x05,0x9A,0x07,0x12,0x80,0xE2,0xEB,0x27,0xB2,0x75,
0x09,0x83,0x2C,0x1A,0x1B,0x6E,0x5A,0xA0,0x52,0x3B,0xD6,0xB3,0x29,0xE3,0x2F,0x84,
0x53,0xD1,0x00,0xED,0x20,0xFC,0xB1,0x5B,0x6A,0xCB,0xBE,0x39,0x4A,0x4C,0x58,0xCF,
0xD0,0xEF,0xAA,0xFB,0x43,0x4D,0x33,0x85,0x45,0xF9,0x02,0x7F,0x50,0x3C,0x9F,0xA8,
0x51,0xA3,0x40,0x8F,0x92,0x9D,0x38,0xF5,0xBC,0xB6,0xDA,0x21,0x10,0xFF,0xF3,0xD2,
0xCD,0x0C,0x13,0xEC,0x5F,0x97,0x44,0x17,0xC4,0xA7,0x7E,0x3D,0x64,0x5D,0x19,0x73,
0x60,0x81,0x4F,0xDC,0x22,0x2A,0x90,0x88,0x46,0xEE,0xB8,0x14,0xDE,0x5E,0x0B,0xDB,
0xE0,0x32,0x3A,0x0A,0x49,0x06,0x24,0x5C,0xC2,0xD3,0xAC,0x62,0x91,0x95,0xE4,0x79,
0xE7,0xC8,0x37,0x6D,0x8D,0xD5,0x4E,0xA9,0x6C,0x56,0xF4,0xEA,0x65,0x7A,0xAE,0x08,
0xBA,0x78,0x25,0x2E,0x1C,0xA6,0xB4,0xC6,0xE8,0xDD,0x74,0x1F,0x4B,0xBD,0x8B,0x8A,
0x70,0x3E,0xB5,0x66,0x48,0x03,0xF6,0x0E,0x61,0x35,0x57,0xB9,0x86,0xC1,0x1D,0x9E,
0xE1,0xF8,0x98,0x11,0x69,0xD9,0x8E,0x94,0x9B,0x1E,0x87,0xE9,0xCE,0x55,0x28,0xDF,
0x8C,0xA1,0x89,0x0D,0xBF,0xE6,0x42,0x68,0x41,0x99,0x2D,0x0F,0xB0,0x54,0xBB,0x16
]

SQ = [
0x25,0x24,0x73,0x67,0xD7,0xAE,0x5C,0x30,0xA4,0xEE,0x6E,0xCB,0x7D,0xB5,0x82,0xDB,
0xE4,0x8E,0x48,0x49,0x4F,0x5D,0x6A,0x78,0x70,0x88,0xE8,0x5F,0x5E,0x84,0x65,0xE2,
0xD8,0xE9,0xCC,0xED,0x40,0x2F,0x11,0x28,0x57,0xD2,0xAC,0xE3,0x4A,0x15,0x1B,0xB9,
0xB2,0x80,0x85,0xA6,0x2E,0x02,0x47,0x29,0x07,0x4B,0x0E,0xC1,0x51,0xAA,0x89,0xD4,
0xCA,0x01,0x46,0xB3,0xEF,0xDD,0x44,0x7B,0xC2,0x7F,0xBE,0xC3,0x9F,0x20,0x4C,0x64,
0x83,0xA2,0x68,0x42,0x13,0xB4,0x41,0xCD,0xBA,0xC6,0xBB,0x6D,0x4D,0x71,0x21,0xF4,
0x8D,0xB0,0xE5,0x93,0xFE,0x8F,0xE6,0xCF,0x43,0x45,0x31,0x22,0x37,0x36,0x96,0xFA,
0xBC,0x0F,0x08,0x52,0x1D,0x55,0x1A,0xC5,0x4E,0x23,0x69,0x7A,0x92,0xFF,0x5B,0x5A,
0xEB,0x9A,0x1C,0xA9,0xD1,0x7E,0x0D,0xFC,0x50,0x8A,0xB6,0x62,0xF5,0x0A,0xF8,0xDC,
0x03,0x3C,0x0C,0x39,0xF1,0xB8,0xF3,0x3D,0xF2,0xD5,0x97,0x66,0x81,0x32,0xA0,0x00,
0x06,0xCE,0xF6,0xEA,0xB7,0x17,0xF7,0x8C,0x79,0xD6,0xA7,0xBF,0x8B,0x3F,0x1F,0x53,
0x63,0x75,0x35,0x2C,0x60,0xFD,0x27,0xD3,0x94,0xA5,0x7C,0xA1,0x05,0x58,0x2D,0xBD,
0xD9,0xC7,0xAF,0x6B,0x54,0x0B,0xE0,0x38,0x04,0xC8,0x9D,0xE7,0x14,0xB1,0x87,0x9C,
0xDF,0x6F,0xF9,0xDA,0x2A,0xC4,0x59,0x16,0x74,0x91,0xAB,0x26,0x61,0x76,0x34,0x2B,
0xAD,0x99,0xFB,0x72,0xEC,0x33,0x12,0xDE,0x98,0x3B,0xC0,0x9B,0x3E,0x18,0x10,0x3A,
0x56,0xE1,0x77,0xC9,0x1E,0x9E,0x95,0xA3,0x90,0x19,0xA8,0x6C,0x09,0xD0,0xF0,0x86
]

# -----------------------
# GF(2^8) helpers used exactly as in spec
# -----------------------
def MULx(V: int, c: int) -> int:
    # input V: 8-bit, return (V * x) in GF(2^8) with polynomial c (8-bit)
    if V & 0x80:
        return ((V << 1) ^ c) & 0xFF
    else:
        return (V << 1) & 0xFF

def MULxPOW(V: int, i: int, c: int) -> int:
    if i == 0:
        return V & 0xFF
    t = V & 0xFF
    for _ in range(i):
        t = MULx(t, c)
    return t & 0xFF

def MULalpha(c: int) -> int:
    a0 = MULxPOW(c, 23, 0xA9)
    a1 = MULxPOW(c, 245, 0xA9)
    a2 = MULxPOW(c, 48, 0xA9)
    a3 = MULxPOW(c, 239, 0xA9)
    return ((a0 << 24) | (a1 << 16) | (a2 << 8) | a3) & 0xFFFFFFFF

def DIValpha(c: int) -> int:
    a0 = MULxPOW(c, 16, 0xA9)
    a1 = MULxPOW(c, 39, 0xA9)
    a2 = MULxPOW(c, 6, 0xA9)
    a3 = MULxPOW(c, 64, 0xA9)
    return ((a0 << 24) | (a1 << 16) | (a2 << 8) | a3) & 0xFFFFFFFF

# -----------------------
# S-box 32-bit transforms S1, S2 (spec)
# -----------------------
def S1(w: int) -> int:
    srw0 = SR[(w >> 24) & 0xFF]
    srw1 = SR[(w >> 16) & 0xFF]
    srw2 = SR[(w >> 8) & 0xFF]
    srw3 = SR[w & 0xFF]
    r0 = ( (MULx(srw0, 0x1b)) ^ srw1 ^ srw2 ^ ((MULx(srw3, 0x1b)) ^ srw3) ) & 0xFF
    r1 = ( ((MULx(srw0, 0x1b)) ^ srw0) ^ (MULx(srw1, 0x1b)) ^ srw2 ^ srw3 ) & 0xFF
    r2 = ( srw0 ^ ((MULx(srw1, 0x1b)) ^ srw1) ^ (MULx(srw2, 0x1b)) ^ srw3 ) & 0xFF
    r3 = ( srw0 ^ srw1 ^ ((MULx(srw2, 0x1b)) ^ srw2) ^ (MULx(srw3, 0x1b)) ) & 0xFF
    return ((r0 << 24) | (r1 << 16) | (r2 << 8) | r3) & 0xFFFFFFFF

def S2(w: int) -> int:
    sqw0 = SQ[(w >> 24) & 0xFF]
    sqw1 = SQ[(w >> 16) & 0xFF]
    sqw2 = SQ[(w >> 8) & 0xFF]
    sqw3 = SQ[w & 0xFF]
    r0 = ( (MULx(sqw0, 0x69)) ^ sqw1 ^ sqw2 ^ ((MULx(sqw3, 0x69)) ^ sqw3) ) & 0xFF
    r1 = ( ((MULx(sqw0, 0x69)) ^ sqw0) ^ (MULx(sqw1, 0x69)) ^ sqw2 ^ sqw3 ) & 0xFF
    r2 = ( sqw0 ^ ((MULx(sqw1, 0x69)) ^ sqw1) ^ (MULx(sqw2, 0x69)) ^ sqw3 ) & 0xFF
    r3 = ( sqw0 ^ sqw1 ^ ((MULx(sqw2, 0x69)) ^ sqw2) ^ (MULx(sqw3, 0x69)) ) & 0xFF
    return ((r0 << 24) | (r1 << 16) | (r2 << 8) | r3) & 0xFFFFFFFF

# -----------------------
# SNOW-3G State (LFSR S0..S15, FSM R1..R3)
# -----------------------
class SNOW3G:
    def __init__(self):
        self.S = [0]*16
        self.R1 = 0
        self.R2 = 0
        self.R3 = 0

    def ClockLFSRInitializationMode(self, F: int):
        # v = ((S0 <<8) & 0xffffff00) ^ MULalpha((u8)(S0>>24)) ^ S2 ^ ((S11>>8) & 0x00ffffff) ^ DIValpha((u8)S11) ^ F
        v = ( ((self.S[0] << 8) & 0xFFFFFF00) ^
              MULalpha((self.S[0] >> 24) & 0xFF) ^
              self.S[2] ^
              ((self.S[11] >> 8) & 0x00FFFFFF) ^
              DIValpha(self.S[11] & 0xFF) ^
              F ) & 0xFFFFFFFF
        # shift
        for i in range(15):
            self.S[i] = self.S[i+1]
        self.S[15] = v

    def ClockLFSRKeyStreamMode(self):
        v = ( ((self.S[0] << 8) & 0xFFFFFF00) ^
              MULalpha((self.S[0] >> 24) & 0xFF) ^
              self.S[2] ^
              ((self.S[11] >> 8) & 0x00FFFFFF) ^
              DIValpha(self.S[11] & 0xFF) ) & 0xFFFFFFFF
        for i in range(15):
            self.S[i] = self.S[i+1]
        self.S[15] = v

    def ClockFSM(self) -> int:
        F = ( (self.S[15] + self.R1) & 0xFFFFFFFF ) ^ self.R2
        r = ( (self.R2 + (self.R3 ^ self.S[5])) & 0xFFFFFFFF )
        self.R3 = S2(self.R2)
        self.R2 = S1(self.R1)
        self.R1 = r
        return F

    def Initialize(self, k: List[int], IV: List[int]):
        # Follow C reference mappings exactly
        self.S[15] = u32(k[3] ^ IV[0])
        self.S[14] = u32(k[2])
        self.S[13] = u32(k[1])
        self.S[12] = u32(k[0] ^ IV[1])
        self.S[11] = u32(k[3] ^ 0xFFFFFFFF)
        self.S[10] = u32(k[2] ^ 0xFFFFFFFF ^ IV[2])
        self.S[9]  = u32(k[1] ^ 0xFFFFFFFF ^ IV[3])
        self.S[8]  = u32(k[0] ^ 0xFFFFFFFF)
        self.S[7]  = u32(k[3])
        self.S[6]  = u32(k[2])
        self.S[5]  = u32(k[1])
        self.S[4]  = u32(k[0])
        self.S[3]  = u32(k[3] ^ 0xFFFFFFFF)
        self.S[2]  = u32(k[2] ^ 0xFFFFFFFF)
        self.S[1]  = u32(k[1] ^ 0xFFFFFFFF)
        self.S[0]  = u32(k[0] ^ 0xFFFFFFFF)
        self.R1 = 0
        self.R2 = 0
        self.R3 = 0
        # clock FSM + LFSR initialization 32 times
        for _ in range(32):
            F = self.ClockFSM()
            self.ClockLFSRInitializationMode(F)

    def GenerateKeystream(self, n_words: int) -> List[int]:
        z = [0]*n_words
        # step once as in reference
        self.ClockFSM()
        self.ClockLFSRKeyStreamMode()
        for t in range(n_words):
            F = self.ClockFSM()
            z[t] = u32(F ^ self.S[0])
            self.ClockLFSRKeyStreamMode()
        return z

# -----------------------
# Conversion helpers and f8 wrapper
# -----------------------
def keyhex_to_u32_words_for_C_ref(key_hex: str) -> List[int]:
    # identical to C reference packing: K[3-i] = (key[4*i]<<24) | ...
    if key_hex.startswith("0x") or key_hex.startswith("0X"):
        key_hex = key_hex[2:]
    if len(key_hex) != 32:
        raise ValueError("Key hex must be 32 hex chars (128-bit)")
    b = bytes.fromhex(key_hex)
    K = [0]*4
    for i in range(4):
        # pack big-endian bytes into word same as C reference
        K[3-i] = ((b[4*i] << 24) | (b[4*i+1] << 16) | (b[4*i+2] << 8) | (b[4*i+3])) & 0xFFFFFFFF
    return K

def make_iv_for_f8(count: int, bearer: int, direction: int) -> List[int]:
    # Per C reference f8:
    # IV[3] = count;
    # IV[2] = (bearer << 27) | ((dir & 0x1) << 26);
    # IV[1] = IV[3];
    # IV[0] = IV[2];
    IV = [0]*4
    IV[3] = u32(count & 0xFFFFFFFF)
    IV[2] = u32(((bearer & 0x1F) << 27) | ((direction & 0x1) << 26))
    IV[1] = IV[3]
    IV[0] = IV[2]
    return IV

def keystream_words_to_hex_trim(words: List[int], length_bits: int) -> str:
    # Convert list of 32-bit words to hex big-endian string, trim to length_bits
    hex_stream = ''.join(f"{w:08x}" for w in words)  # each word in hex (big-endian)
    total_bits = len(words) * 32
    # create bit string and slice
    bit_str = bin(int(hex_stream, 16))[2:].zfill(total_bits)
    bit_str = bit_str[:length_bits]
    # convert back to hex (full nibbles)
    hex_len = (length_bits + 3) // 4
    return hex(int(bit_str, 2))[2:].zfill(hex_len)

def keystream_words_to_bin_trim(words: List[int], length_bits: int) -> str:
    hex_stream = ''.join(f"{w:08x}" for w in words)
    total_bits = len(words) * 32
    bit_str = bin(int(hex_stream, 16))[2:].zfill(total_bits)
    return bit_str[:length_bits]

# -----------------------
# Top-level keystream generator (f8 wrapper for confidentiality)
# -----------------------
def generate_snow3g_keystream_hex(key_hex: str, count: int, bearer: int, direction: int, length_bits: int) -> str:
    K = keyhex_to_u32_words_for_C_ref(key_hex)
    IV = make_iv_for_f8(count, bearer, direction)
    n_words = (length_bits + 31) // 32
    sn = SNOW3G()
    sn.Initialize(K, IV)
    ks_words = sn.GenerateKeystream(n_words)
    return keystream_words_to_hex_trim(ks_words, length_bits)

def generate_snow3g_keystream_bin(key_hex: str, count: int, bearer: int, direction: int, length_bits: int) -> str:
    K = keyhex_to_u32_words_for_C_ref(key_hex)
    IV = make_iv_for_f8(count, bearer, direction)
    n_words = (length_bits + 31) // 32
    sn = SNOW3G()
    sn.Initialize(K, IV)
    ks_words = sn.GenerateKeystream(n_words)
    return keystream_words_to_bin_trim(ks_words, length_bits)




In [7]:
print("Please upload CSV file with columns: KEY, Direction, LENGTH, COUNT, Bearer ID")
uploaded = files.upload()

for filename in uploaded.keys():
    df_in = pd.read_csv(filename)
    # validate columns (case-insensitive)
    cols_lower = [c.lower() for c in df_in.columns]
    required = ['key','direction','length','count','bearer id']
    if not all(r in cols_lower for r in required):
        raise ValueError(f"Input CSV must contain columns: KEY, Direction, LENGTH, COUNT, Bearer ID (found: {df_in.columns.tolist()})")

    # normalize columns
    colmap = {c.lower(): c for c in df_in.columns}
    rows = []
    for _, r in df_in.iterrows():
        key_hex = str(r[colmap['key']]).strip()
        direction = int(r[colmap['direction']])
        length_bits = int(r[colmap['length']])
        count = int(r[colmap['count']])
        bearer = int(r[colmap['bearer id']])

        ks_hex = generate_snow3g_keystream_hex(key_hex, count, bearer, direction, length_bits)
        ks_bin = generate_snow3g_keystream_bin(key_hex, count, bearer, direction, length_bits)

        rows.append({
            'KEY': key_hex,
            'Direction': direction,
            'LENGTH': length_bits,
            'COUNT': count,
            'Bearer ID': bearer,
            'Keystream_hex': ks_hex,
            'Keystream_bin': ks_bin
        })

    out_df = pd.DataFrame(rows)
    out_csv = 'snow3g_keystream_output.csv'
    out_df.to_csv(out_csv, index=False)
    print(f"Wrote {len(out_df)} rows to {out_csv}")
    files.download(out_csv)

Please upload CSV file with columns: KEY, Direction, LENGTH, COUNT, Bearer ID


Saving sample-data.csv to sample-data.csv
Wrote 100 rows to snow3g_keystream_output.csv


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>