# An Hybrid Approach with CNN and LSTM 
- top 1 accuracy: ~94%
- top 3 accuracy: ~98%

In [1]:
import os
import json
import requests
import numpy as np

# Create a directory to store the data
# if not os.path.exists('quickdraw_data'):
#     os.makedirs('quickdraw_data')

# --- Constants ---
# Base URL for simplified drawings
BASE_URL = "https://storage.googleapis.com/quickdraw_dataset/full/simplified/"

# Select 30 classes (adjust as needed)
# Example list:
CLASSES = [
    "apple", "banana", "book", "car", "cat", "chair", "cloud", "dog", "door", "eye",
    "face", "fish", "flower", "fork", "guitar", "hammer", "hat", "house", "key", "knife",
    "leaf", "lightning", "moon", "mountain", "mouse", "star", "sun", "table", "tree", "umbrella"
]
NUM_CLASSES = len(CLASSES)
SAMPLES_PER_CLASS = 10000 # Limit samples per class for faster demo/training
DATA_DIR = "path/to/your/data/directory" # Change this to your desired directory
MAX_LEN = 196 # Max number of points in a sequence (adjust based on data analysis)
BATCH_SIZE = 128
EPOCHS = 100 # Adjust for real training


In [None]:
def download_data(classes, base_url, data_dir, samples_per_class):
    """Downloads .ndjson files for specified classes."""
    print("Starting download...")
    for class_name in classes:
        class_name_url = class_name.replace(" ", "%20") # Handle spaces in names if any
        file_path = os.path.join(data_dir, f"{class_name}.ndjson")

        if os.path.exists(file_path):
            print(f"File for '{class_name}' already exists. Skipping.")
            continue

        url = f"{base_url}{class_name_url}.ndjson"
        print(f"Downloading {class_name} from {url}...")
        try:
            response = requests.get(url, stream=True)
            response.raise_for_status() # Raise an exception for bad status codes

            with open(file_path, 'wb') as f:
                 for chunk in response.iter_content(chunk_size=8192):
                     f.write(chunk)
            print(f"Downloaded '{class_name}'")

        except requests.exceptions.RequestException as e:
            print(f"Error downloading {class_name}: {e}")
            # Remove partially downloaded file if error occurs
            if os.path.exists(file_path):
                os.remove(file_path)
        except Exception as e:
             print(f"An unexpected error occurred for {class_name}: {e}")
             if os.path.exists(file_path):
                os.remove(file_path)

    print("Download process finished.")

# --- Download the data ---
download_data(CLASSES, BASE_URL, DATA_DIR, SAMPLES_PER_CLASS)

print(f"\nData should be downloaded in the '{DATA_DIR}' directory.")


Data should be downloaded in the '/kaggle/input/quick-draw-ndjson' directory.


In [None]:
import numpy as np
import json
import os
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.sequence import pad_sequences
import tensorflow as tf

# --- Load and Preprocess Functions ---
def strokes_to_deltas(drawing_strokes):
    """
    Converts raw strokes list [ [[x,...],[y,...]], ...]
    to delta format [(dx, dy, pen_state), ...].
    pen_state = 0 for intermediate points, 1 for last point in stroke.
    """
    deltas = []
    last_x, last_y = 0, 0
    for stroke in drawing_strokes:
        x_coords, y_coords = stroke[0], stroke[1]
        if not x_coords: # Skip empty strokes if any
            continue

        # First point uses absolute coords (or diff from 0,0)
        dx = x_coords[0] - last_x
        dy = y_coords[0] - last_y
        deltas.append([dx, dy, 0]) # pen_state=0 for first point

        # Subsequent points use deltas
        for i in range(1, len(x_coords)):
            dx = x_coords[i] - x_coords[i-1]
            dy = y_coords[i] - y_coords[i-1]
            deltas.append([dx, dy, 0]) # pen_state=0 for intermediate

        # Mark the last point of the stroke
        if deltas: # Ensure deltas is not empty
             deltas[-1][2] = 1

        last_x, last_y = x_coords[-1], y_coords[-1]

    # Truncate if longer than MAX_LEN
    if len(deltas) > MAX_LEN:
        deltas = deltas[:MAX_LEN]

    return np.array(deltas, dtype=np.float32)

def load_and_preprocess(classes, data_dir, samples_per_class, max_len):
    """Loads, preprocesses, and pads the data."""
    all_sequences = []
    all_labels = []
    label_map = {name: i for i, name in enumerate(classes)}

    print("Loading and preprocessing data...")
    for i, class_name in enumerate(classes):
        file_path = os.path.join(data_dir, f"{class_name}.ndjson")
        count = 0
        try:
            with open(file_path, 'r') as f:
                for line in f:
                    if count >= samples_per_class:
                        break
                    try:
                        drawing = json.loads(line)
                        if not drawing['recognized']: # Optional: Skip drawings not recognized by the game
                             continue
                        if not drawing.get('drawing'): # Check if drawing data exists
                             continue

                        delta_sequence = strokes_to_deltas(drawing['drawing'])
                        if delta_sequence.shape[0] > 1: # Ensure sequence is not empty or just one point
                             all_sequences.append(delta_sequence)
                             all_labels.append(label_map[class_name])
                             count += 1
                    except (json.JSONDecodeError, KeyError, IndexError, TypeError) as e:
                        # print(f"Skipping malformed line/drawing in {class_name}: {e}")
                        continue # Skip malformed lines or drawings
        except FileNotFoundError:
            print(f"Warning: File not found for class '{class_name}'. Skipping.")
            continue
        print(f"Loaded {count} samples for '{class_name}'")

    if not all_sequences:
         raise ValueError("No valid sequences loaded. Check data files and paths.")

    print(f"Total sequences loaded: {len(all_sequences)}")

    # Pad sequences
    # padding='pre' might be slightly better for RNNs, but 'post' is common too.
    padded_sequences = pad_sequences(all_sequences, maxlen=max_len,
                                     padding='post', dtype='float32')

    # Convert labels to categorical
    categorical_labels = to_categorical(np.array(all_labels), num_classes=len(classes))

    # --- Normalization (Standardization) ---
    # Calculate mean and std dev only on the delta values (dx, dy)

    non_padding_mask = (padded_sequences[:, :, :2] != 0).any(axis=2) # Mask for non-zero dx/dy
    dx_dy_values = padded_sequences[:, :, :2][non_padding_mask]

    if dx_dy_values.size == 0:
         raise ValueError("No valid dx/dy values found for normalization. Check data.")

    mean = np.mean(dx_dy_values, axis=0)
    std = np.std(dx_dy_values, axis=0)
    # Add a small epsilon to std dev to prevent division by zero
    std = np.where(std == 0, 1e-6, std)

    print(f"Normalization - Mean (dx, dy): {mean}, Std Dev (dx, dy): {std}")

    # Apply standardization ONLY to dx and dy (first two elements)
    # Avoid normalizing the binary pen_state flag
    padded_sequences[:, :, 0] = (padded_sequences[:, :, 0] - mean[0]) / std[0]
    padded_sequences[:, :, 1] = (padded_sequences[:, :, 1] - mean[1]) / std[1]

    # Set padding values back to 0 after normalization
    padding_mask_3d = np.repeat(non_padding_mask[:, :, np.newaxis], 3, axis=2)
    padded_sequences = np.where(padding_mask_3d, padded_sequences, 0.0)

    print(f"Data shapes - Sequences: {padded_sequences.shape}, Labels: {categorical_labels.shape}")

    return padded_sequences, categorical_labels

# --- Load the data ---
# Wrap in a try-except block in case loading fails
try:
    X, y = load_and_preprocess(CLASSES, DATA_DIR, SAMPLES_PER_CLASS, MAX_LEN)
except ValueError as e:
    print(f"Error loading data: {e}")
    exit()


# --- Create tf.data Datasets (Optional but recommended for large data) ---
# If the dataset fits in memory, you can skip this and use numpy arrays directly
# For larger datasets, tf.data is much more efficient

# Split data (Example: 80% train, 20% validation)
from sklearn.model_selection import train_test_split
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

print(f"Train set size: {X_train.shape[0]}, Validation set size: {X_val.shape[0]}")

# Convert to tf.data datasets
# train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
# train_dataset = train_dataset.shuffle(buffer_size=X_train.shape[0]).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

# val_dataset = tf.data.Dataset.from_tensor_slices((X_val, y_val))
# val_dataset = val_dataset.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

# print("tf.data Datasets created.")
# If using numpy arrays directly (for smaller datasets):
train_dataset = (X_train, y_train)
val_dataset = (X_val, y_val)

Loading and preprocessing data...
Loaded 10000 samples for 'apple'
Loaded 10000 samples for 'banana'
Loaded 10000 samples for 'book'
Loaded 10000 samples for 'car'
Loaded 10000 samples for 'cat'
Loaded 10000 samples for 'chair'
Loaded 10000 samples for 'cloud'
Loaded 10000 samples for 'dog'
Loaded 10000 samples for 'door'
Loaded 10000 samples for 'eye'
Loaded 10000 samples for 'face'
Loaded 10000 samples for 'fish'
Loaded 10000 samples for 'flower'
Loaded 10000 samples for 'fork'
Loaded 10000 samples for 'guitar'
Loaded 10000 samples for 'hammer'
Loaded 10000 samples for 'hat'
Loaded 10000 samples for 'house'
Loaded 10000 samples for 'key'
Loaded 10000 samples for 'knife'
Loaded 10000 samples for 'leaf'
Loaded 10000 samples for 'lightning'
Loaded 10000 samples for 'moon'
Loaded 10000 samples for 'mountain'
Loaded 10000 samples for 'mouse'
Loaded 10000 samples for 'star'
Loaded 10000 samples for 'sun'
Loaded 10000 samples for 'table'
Loaded 10000 samples for 'tree'
Loaded 10000 samples 

In [11]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Conv1D, BatchNormalization, ReLU, Dropout, LSTM, Dense, TimeDistributed

def build_stroke_lstm_model(input_shape, num_classes):
    """Builds the Conv1D -> Stacked LSTM -> Dense model."""
    model = Sequential(name="Stroke_LSTM_Classifier")

    # Input Layer
    model.add(Input(shape=input_shape, name="input_strokes"))

    # --- 1D Convolutional Block ---
    # Conv layers act as feature extractors along the sequence dimension
    model.add(Conv1D(filters=48, kernel_size=5, strides=1, padding="same", name="conv1d_1"))
    model.add(BatchNormalization(name="bn_1"))
    model.add(ReLU(name="relu_1"))
    model.add(Dropout(0.2, name="dropout_1")) # Regularization

    model.add(Conv1D(filters=64, kernel_size=5, strides=1, padding="same", name="conv1d_2"))
    model.add(BatchNormalization(name="bn_2"))
    model.add(ReLU(name="relu_2"))
    model.add(Dropout(0.2, name="dropout_2"))

    model.add(Conv1D(filters=96, kernel_size=3, strides=1, padding="same", name="conv1d_3")) # Third conv often helps
    model.add(BatchNormalization(name="bn_3"))
    model.add(ReLU(name="relu_3"))
    model.add(Dropout(0.2, name="dropout_3"))

    # --- Stacked LSTM Block ---
    # return_sequences=True passes the output of each time step to the next LSTM
    # The last LSTM layer usually has return_sequences=False unless followed by TimeDistributed Dense
    model.add(LSTM(units=128, return_sequences=True, dropout=0.3, recurrent_dropout=0.3, name="lstm_1"))
    model.add(LSTM(units=128, return_sequences=False, dropout=0.3, recurrent_dropout=0.3, name="lstm_2")) # Only final output needed

    # --- Dense Classifier Block ---
    model.add(Dense(units=128, name="dense_1"))
    model.add(BatchNormalization(name="bn_dense_1"))
    model.add(ReLU(name="relu_dense_1"))
    model.add(Dropout(0.4, name="dropout_dense_1")) # Heavier dropout before final layer

    model.add(Dense(units=num_classes, activation='softmax', name="output_softmax"))

    return model

# --- Build the model ---
input_shape = (MAX_LEN, 3) # max_len sequence length, 3 features (dx, dy, pen_state)
model = build_stroke_lstm_model(input_shape, NUM_CLASSES)

# --- Compile the model ---
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
model.compile(optimizer=optimizer,
              loss='categorical_crossentropy',
              metrics=['accuracy', tf.keras.metrics.TopKCategoricalAccuracy(k=3)]) # Top-3 accuracy is useful

model.summary()

In [None]:
import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint

print("\nStarting training...")
checkpoint_filepath = 'path/to/checkpoint/best_stroke_lstm_model.keras'
# --- Callbacks ---
# Stop training early if validation loss doesn't improve
early_stopping = EarlyStopping(monitor='val_loss', patience=10, # Increase patience for complex tasks
                               restore_best_weights=True, verbose=1)
# Reduce learning rate when validation loss plateaus
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5,
                              min_lr=1e-6, verbose=1) # Lower min_lr
model_checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_filepath,
    monitor='val_loss',
    save_best_only=True,
    mode='min',
    verbose=1,
    save_weights_only=False) # Set to True to save only weights

# --- Fit the model ---
# If using tf.data datasets:
# history = model.fit(
#     train_dataset,
#     epochs=EPOCHS,
#     validation_data=val_dataset,
#     callbacks=[early_stopping, reduce_lr]
# )

# If using numpy arrays directly:
history = model.fit(
    X_train, y_train,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    validation_data=(X_val, y_val),
    callbacks=[early_stopping, reduce_lr, model_checkpoint_callback]
)


Starting training...


In [None]:
history = model.fit(
    X_train, y_train,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    validation_data=(X_val, y_val),
    callbacks=[early_stopping, reduce_lr, model_checkpoint_callback]
)

Epoch 1/100
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 347ms/step - accuracy: 0.8676 - loss: 0.4464 - top_k_categorical_accuracy: 0.9635
Epoch 1: val_loss improved from inf to 0.23792, saving model to /kaggle/working/best_stroke_lstm_model.keras
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m718s[0m 379ms/step - accuracy: 0.8676 - loss: 0.4464 - top_k_categorical_accuracy: 0.9635 - val_accuracy: 0.9264 - val_loss: 0.2379 - val_top_k_categorical_accuracy: 0.9851 - learning_rate: 0.0010
Epoch 2/100
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 348ms/step - accuracy: 0.8709 - loss: 0.4366 - top_k_categorical_accuracy: 0.9642
Epoch 2: val_loss improved from 0.23792 to 0.22297, saving model to /kaggle/working/best_stroke_lstm_model.keras
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m711s[0m 379ms/step - accuracy: 0.8709 - loss: 0.4366 - top_k_categorical_accuracy: 0.9642 - val_accuracy: 0.9317 - val_loss: 0.2230 -

In [None]:
# Load the saved model
model = load_model('path/to/checkpoint/best_stroke_lstm_model.keras')

# --- Evaluate the model (optional) ---
print("\nEvaluating on validation set:")
loss, accuracy, top3_accuracy = model.evaluate(X_val, y_val, verbose=0)
print(f"Validation Loss: {loss:.4f}")
print(f"Validation Accuracy: {accuracy:.4f}")
print(f"Validation Top-3 Accuracy: {top3_accuracy:.4f}")

# --- Save the model (optional) ---
model.save("quickdraw_stroke_lstm_model.keras") # update the path
print("\nModel saved as quickdraw_stroke_lstm_model.keras")