# This is our Training code Transformers strokes based methods. Please note we cannot uploade the huge dataset in submission so reading the files is not possible here. We can show demo in our system if needed. Also, this training takes a lot of time on PCs, and needs GPUs for training

In [None]:
import xml.etree.ElementTree as ET
import requests
import os
import concurrent.futures

def fetch_xml(xml_url):
    response = requests.get(xml_url)
    if response.status_code != 200:
        print(f"Failed to fetch XML: {response.status_code}")
        return None
    return response.content

def parse_xml(xml_content):
    root = ET.fromstring(xml_content)
    namespace = {'s3': 'http://doc.s3.amazonaws.com/2006-03-01'}
    base_url = "https://storage.googleapis.com/quickdraw_dataset/"

    file_urls = []
    for content in root.findall(".//s3:Contents", namespace):
        key = content.find("s3:Key", namespace).text
        if key.startswith("sketchrnn/") and key.endswith(".npz") and not key.endswith(".full.npz"):
            file_urls.append(base_url + key)
    return file_urls

def download_file(file_url, download_folder):
    file_path = os.path.join(download_folder, os.path.basename(file_url))
    if os.path.exists(file_path):
        print(f"Already exists: {file_path}")
        return

    print(f"Downloading: {file_url}")
    response = requests.get(file_url)
    if response.status_code == 200:
        with open(file_path, "wb") as f:
            f.write(response.content)
    else:
        print(f"Failed to download: {file_url}")

def download_npy_files(xml_url, download_folder):
    if not os.path.exists(download_folder):
        os.makedirs(download_folder)

    xml_content = fetch_xml(xml_url)
    if xml_content is None:
        return

    file_urls = parse_xml(xml_content)

    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        executor.map(lambda url: download_file(url, download_folder), file_urls)

xml_url = "https://storage.googleapis.com/quickdraw_dataset?prefix=sketchrnn/"
download_folder = "data"
download_npy_files(xml_url, download_folder)


Downloading: https://storage.googleapis.com/quickdraw_dataset/sketchrnn/The Great Wall of China.npz
Downloading: https://storage.googleapis.com/quickdraw_dataset/sketchrnn/The Eiffel Tower.npz
Downloading: https://storage.googleapis.com/quickdraw_dataset/sketchrnn/The Mona Lisa.npz
Downloading: https://storage.googleapis.com/quickdraw_dataset/sketchrnn/aircraft carrier.npz
Downloading: https://storage.googleapis.com/quickdraw_dataset/sketchrnn/airplane.npz
Downloading: https://storage.googleapis.com/quickdraw_dataset/sketchrnn/alarm clock.npz
Downloading: https://storage.googleapis.com/quickdraw_dataset/sketchrnn/ambulance.npz
Downloading: https://storage.googleapis.com/quickdraw_dataset/sketchrnn/angel.npz
Downloading: https://storage.googleapis.com/quickdraw_dataset/sketchrnn/animal migration.npz
Downloading: https://storage.googleapis.com/quickdraw_dataset/sketchrnn/ant.npz
Downloading: https://storage.googleapis.com/quickdraw_dataset/sketchrnn/anvil.npz
Downloading: https://storage

In [None]:
# import
import numpy as np
import os
from sklearn.utils import shuffle
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.sequence import pad_sequences
import tensorflow as tf
from tensorflow.keras import layers, models

DATA_DIR = '/content/data'
MAX_CLASSES = 345
MAX_SAMPLES_PER_CLASS = 80000
MAX_SEQ_LEN = 130
POINT_DIM = 3


In [None]:
data = np.load('data/camera.npz', allow_pickle=True, encoding='latin1')
sample = data['train']
print(len(sample))

70000


In [None]:
def preprocess_stroke(stroke, max_len=MAX_SEQ_LEN):
    stroke = stroke.astype(np.float32)
    stroke[:, 0] = np.cumsum(stroke[:, 0])
    stroke[:, 1] = np.cumsum(stroke[:, 1])
    stroke[:, 0] -= stroke[:, 0].mean()
    stroke[:, 1] -= stroke[:, 1].mean()

    if len(stroke) > max_len:
        # stroke too big, we reduce to max sizee
        stroke = stroke[:max_len]
    else:
        pad = np.zeros((max_len - len(stroke), 3), dtype=np.float32)
        stroke = np.vstack([stroke, pad])  # we pad to make all same

    return stroke

def load_data(data_dir, max_classes, max_samples_per_class):
    # we only get files who end like .npz, not other file
    files = sorted([f for f in os.listdir(data_dir) if f.endswith('.npz')])[:max_classes]
    X, y = [], []

    for class_id, file in enumerate(files):
        data = np.load(os.path.join(data_dir, file), allow_pickle=True, encoding="latin1")
        for stroke in data['train'][:max_samples_per_class]:
            X.append(preprocess_stroke(stroke))
            y.append(class_id)

    X = np.array(X, dtype=np.float32)
    y = to_categorical(y, num_classes=max_classes)
    return shuffle(X, y, random_state=42)  # shuffle all so model no memorize order


In [None]:
from keras.saving import register_keras_serializable

@register_keras_serializable()
class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.ff_dim = ff_dim
        self.rate = rate

        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = tf.keras.Sequential([
            layers.Dense(ff_dim, activation='relu'),
            layers.Dense(embed_dim),
        ])
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training=None):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)  # we do mix input and attention, then fix with norm
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)  # again mix and norm to make nice output

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "ff_dim": self.ff_dim,
            "rate": self.rate
        })
        return config


@register_keras_serializable()
class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, maxlen, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.maxlen = maxlen
        self.embed_dim = embed_dim
        self.pos_embedding = self.add_weight(
            name="pos_embedding",
            shape=(1, maxlen, embed_dim),
            initializer="random_normal"  # just put some noise to start, learn later
        )

    def call(self, x):
        return x + self.pos_embedding  # we add position info so model

    def get_config(self):
        config = super().get_config()
        config.update({
            "maxlen": self.maxlen,
            "embed_dim": self.embed_dim
        })
        return config


def build_transformer_model(seq_len=130, input_dim=3, num_classes=50, embed_dim=128, num_heads=4, ff_dim=256):
    inputs = layers.Input(shape=(seq_len, input_dim))
    x = layers.Dense(embed_dim)(inputs)
    x = PositionalEncoding(seq_len, embed_dim)(x)

    for _ in range(2):
        x = TransformerBlock(embed_dim, num_heads, ff_dim)(x)

    x = layers.GlobalAveragePooling1D()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.3)(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    model = models.Model(inputs=inputs, outputs=outputs)  # buld full model with all that transformer stuff
    return model


In [None]:
model = build_transformer_model()
model.summary()


#lead data
X, y = load_data(DATA_DIR, MAX_CLASSES, MAX_SAMPLES_PER_CLASS)
split = int(0.9 * len(X))
X_train, X_val = X[:split], X[split:]
y_train, y_val = y[:split], y[split:]





In [None]:
from tensorflow.keras.optimizers.schedules import ExponentialDecay
#setting upa scheduler to decay the learning rate so make model fit best
lr_schedule = ExponentialDecay(
    initial_learning_rate=1e-3,
    decay_steps=10000,
    decay_rate=0.9,
    staircase=True
)

optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule, clipnorm=1.0)
# from tensorflow.keras.optimizers import AdamW

# optimizer = tf.keras.optimizers.Adam(learning_rate=1e-2, clipnorm=1.0)

model.compile(
    optimizer=optimizer,
    loss='categorical_crossentropy',
    metrics=['accuracy']
)


In [None]:
# Train the model and save best nodel
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau

callbacks = [
    ModelCheckpoint("best_transformer_model.keras", monitor="val_accuracy", save_best_only=True),
    EarlyStopping(monitor="val_loss", patience=5, restore_best_weights=True),
    ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3)
]

history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    batch_size=256,
    epochs=15,
    callbacks=callbacks
)



Epoch 1/15
[1m1758/1758[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m218s[0m 114ms/step - accuracy: 0.3135 - loss: 2.5410 - val_accuracy: 0.5400 - val_loss: 1.6487 - learning_rate: 0.0010
Epoch 2/15
[1m1758/1758[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m244s[0m 110ms/step - accuracy: 0.5316 - loss: 1.6916 - val_accuracy: 0.6006 - val_loss: 1.4049 - learning_rate: 0.0010
Epoch 3/15
[1m1758/1758[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m197s[0m 107ms/step - accuracy: 0.5913 - loss: 1.4673 - val_accuracy: 0.6514 - val_loss: 1.2443 - learning_rate: 0.0010
Epoch 4/15
[1m1758/1758[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m205s[0m 109ms/step - accuracy: 0.6222 - loss: 1.3513 - val_accuracy: 0.6523 - val_loss: 1.2264 - learning_rate: 0.0010
Epoch 5/15
[1m1758/1758[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m185s[0m 105ms/step - accuracy: 0.6471 - loss: 1.2675 - val_accuracy: 0.6802 - val_loss: 1.1398 - learning_rate: 0.0010
Epoch 6/15
[1m1758/1758[0m [32m━━━━━━

In [None]:
from tensorflow.keras.models import load_model

from tensorflow.keras.models import load_model

model = load_model("best_transformer_model.keras", custom_objects={
    "PositionalEncoding": PositionalEncoding,
    "TransformerBlock": TransformerBlock
})


history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    initial_epoch=15,  # (optional)
    epochs=30,
    batch_size=256,
    callbacks=callbacks
)

In [None]:
# chck accruacy
val_loss, val_acc = model.evaluate(X_val, y_val)
print("Validation Accuracy:", val_acc)

[1m1563/1563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 6ms/step - accuracy: 0.7614 - loss: 0.8470
Validation Accuracy: 0.7626399993896484
