In [11]:
import tensorflow as tf
import pandas as pd
import numpy as np
import json
import os
import linecache

# Get model architecture

In [4]:
model = tf.saved_model.load("../pretrained_model/saved_model_with_signatures")

model_signature_directory = '../pretrained_model/model_architecture.csv'

if os.path.exists(model_signature_directory):
    # Collecting signature details
    signatures_data = []

    # Iterate over each signature in the model
    for key, signature in model.signatures.items():
        # Initialize dictionaries for current signature's inputs and outputs
        inputs_dict = {}
        outputs_dict = {}

        # Iterate over inputs and outputs, filling in the dictionaries
        for input_key, input_val in signature.structured_input_signature[1].items():
            inputs_dict[input_key] = str(input_val.dtype.name)

        for output_key, output_val in signature.structured_outputs.items():
            outputs_dict[output_key] = str(output_val.dtype.name)

        # Append the current signature's details to the list
        signatures_data.append({
            "Signature Key": key,
            "Inputs": inputs_dict,
            "Outputs": outputs_dict
        })

    # Convert list of signature data into a DataFrame for visualization
    df_signatures = pd.DataFrame(signatures_data)

    df_inputs = df_signatures["Inputs"].apply(pd.Series)
    df_outputs = df_signatures["Outputs"].apply(pd.Series)
    df_expanded = pd.concat([df_signatures.drop(['Inputs', 'Outputs'], axis=1), df_inputs, df_outputs], axis=1)
    df_expanded.to_csv("../pretrained_model/model_architecture.csv", index=False)
else:
    df_expanded = pd.read_csv(model_signature_directory)

In [5]:
df_expanded

Unnamed: 0,Signature Key,embedding_sample,target_seq_len,inp_pos,target_pos,inp_embeddings,input_seq_len,input_stroke,pen,seq_len,stroke,pi,sigma,embedding_sample.1,mu,position_sample
0,decode_stroke,float32,int32,,,,,,float32,int32,float32,,,,,
1,predict_embedding,,,float32,float32,float32,,,,,,float32,float32,float32,float32,
2,predict_position,,,float32,,float32,,,,,,float32,float32,,float32,float32
3,encode_stroke,,,,,,int32,float32,,,,,,float32,,
4,forward_pass,,int32,,,,int32,float32,float32,int32,float32,,,float32,,


# Preprocesses raw drawings and get the right input format for `encode_stroke`

In [6]:
# Define functions to adjusts all drawings to have a consistent scale or size
def get_bounding_box(drawing):
    minx = 99999
    miny = 99999
    maxx = 0
    maxy = 0

    for s in drawing:
      minx = min(minx, min(s[0]))
      maxx = max(maxx, max(s[0]))
      miny = min(miny, min(s[1]))
      maxy = max(maxy, max(s[1]))
    return (minx, miny, maxx, maxy)

def size_normalization(drawing):
  bb = get_bounding_box(drawing)
  width, height = bb[2] - bb[0], bb[3] - bb[1]
  offset_x, offset_y = bb[0], bb[1]
  if height < 1e-6:
    height = 1

  size_normalized_drawing = [[[(x - offset_x) / height for x in stroke[0]],
                              [(y - offset_y) / height for y in stroke[1]],
                              [t for t in stroke[2]]]
                             for stroke in drawing]

  return size_normalized_drawing

In [7]:
# Define a function to resample the ink to have uniform time steps
# (Ensure that each point is separated by a constant time step)
def resample_ink(drawing, timestep=20):
    resampled_drawing = []
    
    for stroke in drawing:
        # Initialize with the first point
        resampled_stroke = [[stroke[0][0], stroke[1][0], stroke[2][0]]]  
        
        for i in range(1, len(stroke[0])):
            x0, y0, t0 = stroke[0][i-1], stroke[1][i-1], stroke[2][i-1]
            x1, y1, t1 = stroke[0][i], stroke[1][i], stroke[2][i]
            distance = np.sqrt((x1 - x0)**2 + (y1 - y0)**2)
            if distance == 0:
                continue
            else:
                new_points = max(1, int(distance / timestep))
                for j in range(1, new_points + 1):
                    new_point = [x0 + j * (x1 - x0) / new_points,
                                y0 + j * (y1 - y0) / new_points,
                                t0 + j * (t1 - t0) / new_points]
                    resampled_stroke.append(new_point)
        
        resampled_drawing.append(resampled_stroke)

    return resampled_drawing

Here I try with the first drawing in quick_draw_Eiffel_Tower:

In [14]:
ndjson_file_path = 'full_raw_The_Eiffel_Tower.ndjson'

# Select the line to read as the sample drawing
line_number = 12
# Use linecache to get the specific line
line = linecache.getline(ndjson_file_path, line_number).strip()
# Parse the JSON content from the line
selected_row = json.loads(line)

sample_drawing = selected_row["drawing"]
print(f"There are totally {len(sample_drawing)} strokes in the sample drawing")

There are totally 3 strokes in the sample drawing


In [16]:
# Sequence length for each stroke
[len(i[0]) for i in sample_drawing]

[146, 22, 17]

In [17]:
# Preprocess the drawing
drawing_normalized = size_normalization(sample_drawing)
drawing_resampled = resample_ink(drawing_normalized)

In [18]:
drawing_normalized

[[[0.0,
   0.004319654427645789,
   0.008639308855291577,
   0.019438444924406047,
   0.02591792656587473,
   0.03455723542116631,
   0.04319654427645788,
   0.05183585313174946,
   0.06047516198704104,
   0.06911447084233262,
   0.07775377969762419,
   0.08639308855291576,
   0.09503239740820735,
   0.09935205183585313,
   0.10367170626349892,
   0.11447084233261338,
   0.11879049676025918,
   0.12526997840172785,
   0.12958963282937366,
   0.12958963282937366,
   0.12958963282937366,
   0.12958963282937366,
   0.12958963282937366,
   0.12742980561555076,
   0.12742980561555076,
   0.12742980561555076,
   0.12526997840172785,
   0.12095032397408208,
   0.11879049676025918,
   0.11447084233261338,
   0.11231101511879049,
   0.1079913606911447,
   0.10367170626349892,
   0.10367170626349892,
   0.10151187904967603,
   0.10151187904967603,
   0.09935205183585313,
   0.09503239740820735,
   0.09287257019438445,
   0.09071274298056156,
   0.09071274298056156,
   0.08855291576673865,
   0.0

Convert the input format and shape:

In [19]:
# Define a function to formalize the input shape to be used in `encode_stroke` signature
# Specifically, the shape of `input_seq_len` is (None, ) and the shape of input_stroke is (None, None, 3) 
def formalize_input(stroke):
    input_stroke_tensor = tf.convert_to_tensor([stroke], dtype=tf.float32) # Add an extra dimension for batch size
    input_seq_len_tensor = tf.convert_to_tensor([len(stroke)], dtype=tf.int32)

    return input_seq_len_tensor, input_stroke_tensor

# Use pretrained model for prediction

## Try the encode_stroke signature

In [20]:
encode_stroke = model.signatures["encode_stroke"]
encode_stroke

<ConcreteFunction (*, input_seq_len: TensorSpec(shape=(None,), dtype=tf.int32, name='input_seq_len'), input_stroke: TensorSpec(shape=(None, None, 3), dtype=tf.float32, name='input_stroke')) -> Dict[['embedding_sample', TensorSpec(shape=(None, 8), dtype=tf.float32, name='embedding_sample')]] at 0x341229B90>

In [21]:
# Lists to collect embeddings and positions
embedding_sample_list = []
inp_pos_list = []

# Process only the first two strokes
for stroke in drawing_resampled[:2]:
    input_seq_len_tensor, input_stroke_tensor = formalize_input(stroke)
    output = encode_stroke(input_seq_len=input_seq_len_tensor, 
                           input_stroke=input_stroke_tensor)
    embedding_sample = output["embedding_sample"]
    
    # Extract the first point's coordinates as position
    inp_pos = [[stroke[0][0], stroke[1][0]]]
    inp_pos_list.extend(inp_pos)
    embedding_sample_list.append(embedding_sample)

# Convert lists to tensors with the right shape
inp_pos_tensor = tf.convert_to_tensor(inp_pos_list, dtype=tf.float32)  # Shape: (num_strokes, 2)
inp_embeddings_tensor = tf.concat(embedding_sample_list, axis=0)  # Shape: (num_strokes, 8)

# Add batch dimension
inp_pos_tensor = tf.expand_dims(inp_pos_tensor, axis=0)  # Shape: (1, num_strokes, 2)
inp_embeddings_tensor = tf.expand_dims(inp_embeddings_tensor, axis=0)  # Shape: (1, num_strokes, 8)

## Try the predict_position signature

In [24]:
predict_position = model.signatures["predict_position"]
predict_position

<ConcreteFunction (*, inp_pos: TensorSpec(shape=(None, None, 2), dtype=tf.float32, name='inp_pos'), inp_embeddings: TensorSpec(shape=(None, None, 8), dtype=tf.float32, name='inp_embeddings')) -> Dict[['position_sample', TensorSpec(shape=(None, 2), dtype=tf.float32, name='position_sample')], ['pi', TensorSpec(shape=(None, 10), dtype=tf.float32, name='pi')], ['sigma', TensorSpec(shape=(None, 1, 10, 2), dtype=tf.float32, name='sigma')], ['mu', TensorSpec(shape=(None, 1, 10, 2), dtype=tf.float32, name='mu')]] at 0x34C8FFA90>

In [25]:
predict_position_result = predict_position(inp_pos=inp_pos_tensor, 
                                           inp_embeddings=inp_embeddings_tensor)
predict_position_result

{'position_sample': <tf.Tensor: shape=(1, 2), dtype=float32, numpy=array([[ 5.0079856, -1.7939286]], dtype=float32)>,
 'pi': <tf.Tensor: shape=(1, 10), dtype=float32, numpy=
 array([[4.9947449e-08, 4.1465189e-08, 3.4878403e-07, 2.9787504e-09,
         9.9642249e-04, 1.7822293e-06, 5.7293884e-02, 3.3884905e-07,
         9.4170719e-01, 3.4080483e-09]], dtype=float32)>,
 'sigma': <tf.Tensor: shape=(1, 1, 10, 2), dtype=float32, numpy=
 array([[[[ 9.469406  ,  3.93775   ],
          [ 1.8703833 ,  4.1385603 ],
          [ 6.523202  ,  0.9962487 ],
          [35.05433   ,  3.7892816 ],
          [ 0.16137658,  0.64853305],
          [ 1.0840179 ,  0.38157785],
          [22.735527  ,  1.0849528 ],
          [35.106552  ,  2.1862864 ],
          [18.42456   ,  5.7575884 ],
          [ 4.4909253 ,  0.9814574 ]]]], dtype=float32)>,
 'mu': <tf.Tensor: shape=(1, 1, 10, 2), dtype=float32, numpy=
 array([[[[-0.78398544,  1.8392066 ],
          [ 0.09649509,  1.2448666 ],
          [-0.2920542 , -0.

In [26]:
target_pos = predict_position_result['position_sample']
target_pos_tensor = tf.expand_dims(target_pos, axis=0)

# Try predict_embedding signature

In [27]:
predict_embedding = model.signatures["predict_embedding"]
predict_embedding

<ConcreteFunction (*, inp_pos: TensorSpec(shape=(None, None, 2), dtype=tf.float32, name='inp_pos'), target_pos: TensorSpec(shape=(None, 1, 2), dtype=tf.float32, name='target_pos'), inp_embeddings: TensorSpec(shape=(None, None, 8), dtype=tf.float32, name='inp_embeddings')) -> Dict[['pi', TensorSpec(shape=(None, 10), dtype=tf.float32, name='pi')], ['sigma', TensorSpec(shape=(None, 1, 10, 8), dtype=tf.float32, name='sigma')], ['embedding_sample', TensorSpec(shape=(None, 8), dtype=tf.float32, name='embedding_sample')], ['mu', TensorSpec(shape=(None, 1, 10, 8), dtype=tf.float32, name='mu')]] at 0x367F4BB10>

In [28]:
predict_embedding_result = predict_embedding(inp_pos=inp_pos_tensor, 
                                             target_pos=target_pos_tensor,
                                             inp_embeddings=inp_embeddings_tensor)
predict_embedding_result

{'pi': <tf.Tensor: shape=(1, 10), dtype=float32, numpy=
 array([[0.09046026, 0.11755432, 0.14144145, 0.11475937, 0.11965625,
         0.04789938, 0.06722607, 0.14215028, 0.02967727, 0.12917535]],
       dtype=float32)>,
 'sigma': <tf.Tensor: shape=(1, 1, 10, 8), dtype=float32, numpy=
 array([[[[0.45284918, 0.29740077, 0.15811475, 0.41532522, 0.23129684,
           0.2014866 , 0.5296572 , 0.2501981 ],
          [0.4723905 , 0.26922047, 0.3173757 , 0.29430878, 0.2629834 ,
           0.36324573, 0.46068507, 0.30033436],
          [0.3568875 , 0.42014602, 0.29737753, 0.45211092, 0.42484426,
           0.3818772 , 0.46291026, 0.2178421 ],
          [0.3622169 , 0.31614846, 0.206989  , 0.4615963 , 0.2814282 ,
           0.20501602, 0.53985107, 0.3541556 ],
          [0.273858  , 0.21198273, 0.22635981, 0.40452194, 0.21225491,
           0.48606813, 0.19932765, 0.23868212],
          [0.3943012 , 0.47304624, 0.26862285, 0.47427455, 0.60211563,
           0.5168385 , 0.50502825, 0.34702405],
 

# Try the decode_stroke signature

In [29]:
decode_stroke = model.signatures["decode_stroke"]
decode_stroke

<ConcreteFunction (*, embedding_sample: TensorSpec(shape=(None, 8), dtype=tf.float32, name='embedding_sample'), target_seq_len: TensorSpec(shape=(), dtype=tf.int32, name='target_seq_len')) -> Dict[['pen', TensorSpec(shape=(None, None, 1), dtype=tf.float32, name='pen')], ['seq_len', TensorSpec(shape=(None,), dtype=tf.int32, name='seq_len')], ['stroke', TensorSpec(shape=(None, None, 2), dtype=tf.float32, name='stroke')]] at 0x34B261CD0>

In [30]:
# Use the (real) next stroke to determine the target_seq_len for the decoder
target_seq_len_tensor = tf.convert_to_tensor(len(drawing_resampled[2]), dtype=tf.int32)

decode_stroke_result = decode_stroke(embedding_sample = predict_embedding_result["embedding_sample"],
                                     target_seq_len = target_seq_len_tensor)

decode_stroke_result["stroke"]
# This decoded stroke with pen state and stroke are used as the new input 
# for the encoder, whose starting position and embeddings will be used
# for auto-regressive for the remaining stokes

<tf.Tensor: shape=(1, 17, 2), dtype=float32, numpy=
array([[[ -0.32955068,  -0.22248104],
        [ -0.6766161 ,  -0.49407703],
        [ -2.0241368 ,  -5.802304  ],
        [ -5.119701  , -15.102497  ],
        [ -5.5239687 , -18.375616  ],
        [ -2.6613305 , -17.858814  ],
        [  0.9564642 , -16.767225  ],
        [  3.9374683 , -15.174292  ],
        [  5.4061937 , -11.872535  ],
        [  6.628682  ,  -5.2172203 ],
        [  4.7367835 ,   3.5651972 ],
        [  4.209163  ,   2.217497  ],
        [  1.1554601 ,   4.8128157 ],
        [ -0.8442453 ,   3.380175  ],
        [ -2.2247639 ,   2.0960534 ],
        [ -2.9414732 ,  -0.9859777 ],
        [ -2.8693335 ,  -1.7561842 ]]], dtype=float32)>

# Random Try Code

In [None]:
# def process_drawing_for_encode_stroke(drawing):
#     flat_strokes = []
#     total_points = 0
#     for stroke in drawing:
#         for i in range(len(stroke[0])):  # Iterate through points in the stroke
#             x = stroke[0][i]
#             y = stroke[1][i]
#             # Assuming the third list contains timing information, not directly used here
#             # If there's a specific "pen state" value needed, adjust accordingly
#             flat_strokes.append([x, y, 1])  # Use '1' as a placeholder for pen state
#         total_points += len(stroke[0])
    
#     # Convert to tensors
#     input_stroke = tf.constant(flat_strokes, dtype=tf.float32)
#     input_seq_len = tf.constant([total_points], dtype=tf.int32)
    
#     return input_stroke, input_seq_len

# # Find the maximum sequence length across all processed drawings
# max_seq_len = max(input_stroke.shape[0] for input_stroke, _ in processed_drawings)

# # Pad each drawing sequence to the maximum length
# padded_drawings = []
# for input_stroke, input_seq_len in processed_drawings:
#     # Calculate the padding amounts
#     padding = [[0, max_seq_len - tf.shape(input_stroke)[0]], [0, 0]]  # Pad the sequence length to max_seq_len
    
#     # Pad the stroke data
#     padded_stroke = tf.pad(input_stroke, padding, "CONSTANT")
    
#     # Append the padded stroke and original sequence length
#     padded_drawings.append((padded_stroke, input_seq_len))

# padded_drawings

# import json

# processed_drawings = []

# # Draw a sample of 320 sketches
# max_rows = 320
# current_row = 0

# with open("data_dir/quick_draw/raw_Eiffel_Tower.ndjson", 'r') as f:
#     for line in f:
#         if current_row < max_rows:
#             drawing_data = json.loads(line)
#             drawing = drawing_data["drawing"]
#             processed_drawing = process_drawing_for_encode_stroke(drawing)
#             processed_drawings.append(processed_drawing)
#             current_row += 1
#         else:
#             break
# stroke_tensors = [x[0] for x in padded_drawings]
# seq_len_tensors = [x[1] for x in padded_drawings]

# stroke_dataset = tf.data.Dataset.from_tensor_slices(stroke_tensors)
# seq_len_dataset = tf.data.Dataset.from_tensor_slices(seq_len_tensors)

# # Combine into a single dataset
# dataset = tf.data.Dataset.zip((stroke_dataset, seq_len_dataset))

# dataset

# # Set your desired batch size
# batch_size = 128

# # Batch the dataset. No need to specify padding values or shapes here because
# # your tensors within each dataset element already have a uniform shape after padding.
# batched_dataset = dataset.batch(batch_size)

In [None]:
# # Define the features of importance for input formatting
# feature_description = {
#     'ink': tf.io.VarLenFeature(tf.float32),
# }

# # For now, I use a very small sample of dataset
# file_paths = "data_dir/quick_draw/training/raw_Eiffel_Tower-00000-of-00010"
# dataset = tf.data.TFRecordDataset(file_paths)

# # for raw_record in dataset.take(1):
# #   example = tf.train.Example()
# #   example.ParseFromString(raw_record.numpy())
# #   print(example)

# # Parse a Tensorflow Example proto 
# parsed_dataset = dataset.map(lambda x: tf.io.parse_single_example(x, feature_description)) 

# # Define a function to get the `input_seq_len` and `input_stroke` arguments 
# # for fitting the pretrained model (specifically, `encode_stroke` signature)
# max_length_threshold = 201

# def get_input_arguments(parsed_record):
#     # Reshape and extract the first three dimensions from parsed_record['ink']
#     # (x coordinate, y coordinate, and pen state)
#     ink = tf.sparse.to_dense(parsed_record['ink'])
#     input_seq_len = tf.shape(ink)[0] // 4
#     ink_reshaped = tf.reshape(ink, (input_seq_len, 4))
#     # Expand the input_stroke to three dimensions for batching 
#     input_stroke = tf.expand_dims(ink_reshaped[:, :3], axis=0)
    
#     # Make sure it matches the input shape 
#     input_seq_len = tf.reshape(input_seq_len, [1]) 

#     return input_seq_len, input_stroke
    
# preprocessed_dataset = parsed_dataset.map(get_input_arguments)