In [2]:
import json
import numpy as np
import os
import tensorflow as tf
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.preprocessing import LabelEncoder

def parse_line(ndjson_line):
    """Parse an ndjson line and return ink (as np array) and classname."""
    sample = json.loads(ndjson_line)
    class_name = sample["word"]
    inkarray = sample["drawing"]
    stroke_lengths = [len(stroke[0]) for stroke in inkarray]
    total_points = sum(stroke_lengths)
    np_ink = np.zeros((total_points, 3), dtype=np.float32)
    current_t = 0
    for stroke in inkarray:
        for i in [0, 1]:
            np_ink[current_t:(current_t + len(stroke[0])), i] = stroke[i]
        current_t += len(stroke[0])
        np_ink[current_t - 1, 2] = 1  # stroke_end
    # Preprocessing.
    # 1. Size normalization.
    lower = np.min(np_ink[:, 0:2], axis=0)
    upper = np.max(np_ink[:, 0:2], axis=0)
    scale = upper - lower
    scale[scale == 0] = 1
    np_ink[:, 0:2] = (np_ink[:, 0:2] - lower) / scale
    # 2. Compute deltas.
    np_ink[1:, 0:2] -= np_ink[0:-1, 0:2]
    np_ink = np_ink[1:, :]
    return np_ink, class_name

def load_data_from_directory(directory, max_samples=None):
    drawings = []
    labels = []
    for filename in os.listdir(directory):
        if filename.endswith(".ndjson"):
            with open(os.path.join(directory, filename), 'r') as f:
                lines = f.readlines()
            if max_samples:
                lines = lines[:max_samples]
            for line in lines:
                drawing, label = parse_line(line)
                drawings.append(drawing)
                labels.append(label)
    return drawings, labels

In [3]:
drawings, labels = load_data_from_directory('./dataset', max_samples=1000)

# Pad sequences to the same length
maxlen = max(len(drawing) for drawing in drawings)
drawings = pad_sequences(drawings, maxlen=maxlen, padding='post', dtype='float32')

# Encode labels
label_encoder = LabelEncoder()
labels = label_encoder.fit_transform(labels)

# Convert to tensors
drawings = tf.convert_to_tensor(drawings)
labels = tf.convert_to_tensor(labels)

In [4]:
import tensorflow as tf
from tensorflow.keras.layers import Conv1D, Dense, GRU, Bidirectional, Masking
from tensorflow.keras import Model

class QuickDrawModel(Model):
    def __init__(self, num_classes):
        super(QuickDrawModel, self).__init__()
        self.num_classes = num_classes

        # Definir las capas en el __init__
        self.conv1 = Conv1D(filters=48, kernel_size=5, strides=1, padding='same', activation='relu')
        self.conv2 = Conv1D(filters=64, kernel_size=5, strides=1, padding='same', activation='relu')
        self.conv3 = Conv1D(filters=96, kernel_size=3, strides=1, padding='same', activation='relu')
        self.rnn = Bidirectional(GRU(128, recurrent_activation='sigmoid', return_sequences=False))
        self.fc1 = Dense(128, activation='relu')
        self.logits = Dense(num_classes, activation='softmax')

    def _get_input_tensors(self, drawings, labels):
        lengths = tf.reduce_sum(tf.cast(tf.not_equal(drawings, 0.0), tf.int32), axis=1)[:, 0]
        return drawings, lengths, labels

    def _add_conv_layers(self, inks, lengths):
        x = self.conv1(inks)
        x = self.conv2(x)
        x = self.conv3(x)
        return x, lengths

    def _add_rnn_layers(self, convolved, lengths):
        x = self.rnn(convolved)
        return x

    def _add_fc_layers(self, final_state):
        x = self.fc1(final_state)
        logits = self.logits(x)
        return logits

    def call(self, inputs):
        inks, lengths, labels = self._get_input_tensors(inputs, None)
        x, lengths = self._add_conv_layers(inks, lengths)
        x = self._add_rnn_layers(x, lengths)
        x = self._add_fc_layers(x)
        return x


In [5]:
num_classes = len(label_encoder.classes_)

# Define the model
model = QuickDrawModel(num_classes)

# Compile the model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

In [6]:
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

Num GPUs Available:  0


In [9]:
# Train the model
model.fit(drawings, labels, epochs=10, batch_size=32, validation_split=0.2)

Epoch 1/10
Epoch 2/10

KeyboardInterrupt: 

In [None]:
# Example prediction
sample_drawing = drawings[0:1]  # Take the first drawing
prediction = model.predict(sample_drawing)
predicted_label = label_encoder.inverse_transform([np.argmax(prediction)])
print(predicted_label)
