In [1]:
import os

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

from tensorflow.keras.models import Sequential
from tensorflow.keras import layers
from collections import Counter

from tensorflow.keras.preprocessing.image import load_img, img_to_array
from sklearn.model_selection import train_test_split
from concurrent.futures import ProcessPoolExecutor

import random

In [2]:
TARGET_SIZE = (144, 256)

In [3]:
def parse_filename(filename):
    components = filename.split("_")
    offset = 0
    
    if "-" in components[1]:
        offset = 1
    
    if len(components) < 6 + offset:
        return False, []
   
    x = int(components[1 + offset])
    y = int(components[2 + offset])
    z = int(components[3 + offset])
    r = int(components[4 + offset])
    is_flying = int(components[5 + offset].split(".")[0])

    if x == 0 and y == 0 and z == 0 and r == 0:
        return False, []

    return True, [x, y, z, r, is_flying]

In [4]:
image_data = []
label_data = []

for date_folder in os.listdir("data"):
    date_folder_path = os.path.join("data", date_folder)

    if os.path.isdir(date_folder_path):
        for filename in os.listdir(date_folder_path):
            if filename.endswith(".png"):
                image_path = os.path.join(date_folder_path, filename)
                valid, components = parse_filename(filename)
                
                if valid:
                    image_data.append(image_path)
                    label_data.append(components)

In [5]:
count = Counter([tuple(x) for x in label_data])
count = count.most_common()

count = [x for x in count if x[1] >= 100]

f_image_data = [image_data[i] for i in range(len(label_data)) if tuple(label_data[i]) in [x[0] for x in count]]
f_label_data = [label_data[i] for i in range(len(label_data)) if tuple(label_data[i]) in [x[0] for x in count]]

f_label_data = ["_".join([str(x) for x in y]) for y in f_label_data]

tokenizer = {x: i for i, x in enumerate(set(f_label_data))}
detokenizer = {i: x for i, x in enumerate(set(f_label_data))}

In [6]:
def preprocess_image(image_path):
    image = load_img(image_path, target_size=TARGET_SIZE) 
    image = img_to_array(image) / 255.0

    return image

In [7]:
def preprocess_and_cache_images(image_paths, cache_folder, max_workers):
    os.makedirs(cache_folder, exist_ok=True)
    cached_images = []

    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        for image_path in image_paths:
            cached_path = os.path.join(cache_folder, os.path.basename(image_path) + ".npy")
            if os.path.exists(cached_path):
                cached_images.append(np.load(cached_path))
            else:
                image = preprocess_image(image_path)
                np.save(cached_path, image)
                cached_images.append(cached_path)

    return cached_images

In [8]:
f_image_data = preprocess_and_cache_images(f_image_data, "cache", max_workers=os.cpu_count())
one_hot_labels = tf.keras.utils.to_categorical([tokenizer[x] for x in f_label_data])

In [9]:
print(len(label_data), len(image_data), len(f_label_data), len(f_image_data))

11849 11849 10774 10774


In [10]:
image_train, image_val, label_train, label_val = train_test_split(
    f_image_data, one_hot_labels, test_size=0.2, random_state=42)

image_train = np.array(image_train)
image_val = np.array(image_val)
label_train = np.array(label_train)
label_val = np.array(label_val)

print(image_train.shape, image_val.shape, label_train.shape, label_val.shape)

(8619, 144, 256, 3) (2155, 144, 256, 3) (8619, 15) (2155, 15)


In [11]:
# make a CNN
model = Sequential()
model.add(layers.Conv2D(64, (3, 3), input_shape=(TARGET_SIZE[0], TARGET_SIZE[1], 3), activation="relu"))
model.add(layers.MaxPooling2D(pool_size=(3, 3)))
model.add(layers.Conv2D(32, (3, 3), activation="relu"))
model.add(layers.MaxPooling2D(pool_size=(3, 3)))
model.add(layers.Conv2D(16, (3, 3), activation="relu"))
model.add(layers.MaxPooling2D(pool_size=(3, 3)))

model.add(layers.Flatten())

model.add(layers.Dense(64, activation="relu"))
model.add(layers.Dense(len(tokenizer), activation="softmax"))

In [12]:
print(model.summary())

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 142, 254, 64)      1792      
                                                                 
 max_pooling2d (MaxPooling2D  (None, 47, 84, 64)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 45, 82, 32)        18464     
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 15, 27, 32)       0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 13, 25, 16)        4624      
                                                                 
 max_pooling2d_2 (MaxPooling  (None, 4, 8, 16)         0

In [13]:
model.compile(
    optimizer="adam",
    loss="categorical_crossentropy",
    metrics=["accuracy"]
)

model.fit(
    image_train,
    label_train,
    epochs=10,
    batch_size=32,
    validation_data=(image_val, label_val)
)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x1b53616f8e0>

In [14]:
directory = "models/VA62_9-6-23"
os.makedirs(directory, exist_ok=True)

model.save(f"{directory}/model.h5")
np.save(f"{directory}/tokenizer.npy", tokenizer)
np.save(f"{directory}/detokenizer.npy", detokenizer)

In [15]:
# def predict(filename, from_folder=False):
#     image = None

#     if from_folder:
#         image = preprocess_image(filename)

#     image = np.expand_dims(image or filename, axis=0)

#     if not from_folder:
#         plt.imshow(image[0])

#     predictions = model.predict(image)[0]
#     predictions = np.argmax(predictions)

#     result = detokenizer[predictions]

#     return result


In [16]:
# ran_index = random.randint(0, len(image_val))
# predictions = predict(image_val[ran_index], from_folder=False)
# label = label_val[ran_index]

# print(f"predictions | {predictions}")
# print(f"label       | {detokenizer[np.argmax(label)]}")
