In [1]:
import matplotlib.pyplot as plt
import numpy as np
import os
import random
import tensorflow as tf
import requests
import albumentations as A
import json
import time
from functools import partial
from PIL import Image
from tensorflow.keras import applications
from tensorflow.keras import layers
from tensorflow.keras import losses
from tensorflow.keras import optimizers
from tensorflow.keras import metrics
from tensorflow.keras import Model
from tensorflow.keras.applications import resnet
from tensorflow.keras.preprocessing.image import img_to_array
target_shape = (200, 200)



from pathlib import Path

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
!tar -xf gdrive/MyDrive/jugio/images_skewed.tar.gz

In [None]:
modelName='ident_model'
version='v35'

fullModelName = modelName + '_' + version

In [None]:
augment = A.Compose([
    A.RandomCrop(height=200, width = 200),
    A.RandomBrightnessContrast(p=0.5, brightness_limit=[-0.2,0.4], contrast_limit=[-0.4,0.4]),
    A.HueSaturationValue(hue_shift_limit=[-10,10], sat_shift_limit=[-20, 20], val_shift_limit=[-15,15], p=0.5),
    A.GaussianBlur(p=0.3),
    A.MedianBlur(blur_limit=3, p=0.3),
])

In [None]:
def augmentImage(srcArr):
    return augment(image=srcArr)['image']

In [None]:
imagesPath = os.path.join('data', 'img_cropped_skewed_v2')
valImagesPath = os.path.join('data', 'img_cropped_skewed_val')

In [None]:
def preprocess_image(filename, aug=False):
    image_string = tf.io.read_file(filename)
    image = tf.image.decode_jpeg(image_string, channels=1)
    image = tf.image.grayscale_to_rgb(image)
    #image = tf.image.convert_image_dtype(image, tf.float32)
    image = tf.image.resize(image, target_shape)
    if aug:
        image = tf.image.random_crop(image,[170, 170, 3])
        image = tf.image.resize(image, target_shape)
    return image

def preprocess_triplets(anchor, positive, negative):
    return (
        preprocess_image(anchor, True),
        preprocess_image(positive, True),
        preprocess_image(negative, True),
    )


In [None]:
testImg = os.path.join(imagesPath, '111280' ,'111280_0.jpg')
def test(t):
    image_string = tf.io.read_file(t)
    image = tf.image.decode_jpeg(image_string, channels=1)
    image = tf.image.grayscale_to_rgb(image)
    image = tf.image.resize(image, target_shape)
    image = tf.image.random_crop(image,[170, 170, 3])
    image = tf.image.resize(image, target_shape)
    #print(image)
    #print(resnet.preprocess_input(image))
    plt.imshow(image / 255.)

test(testImg)

In [None]:
imageClasses = os.listdir(imagesPath)
valImageClasses = os.listdir(valImagesPath)

In [None]:
anchorImages = []
positiveImages = []
negativeImages = []

In [None]:
valAnchorImages = []
valPositiveImages = []
valNegativeImages = []

In [None]:
rng = np.random.default_rng(5656)

image_count = len(imageClasses)

for imageClass in imageClasses:
    imageClassFolder = os.path.join(imagesPath, imageClass)
    images = sorted(os.listdir(imageClassFolder))
    
    anchorImagePath = os.path.join(imageClassFolder, images[0])

    for i in range(1, len(images)):
        
        positiveImagePath = os.path.join(imageClassFolder, images[i])
  
        negativeImageClass = rng.choice(imageClasses)
        while negativeImageClass == imageClass:
            negativeImageClass = rng.choice(imageClasses)
        negativeImageClassFolder = os.path.join(imagesPath, negativeImageClass)
        negativeImagePath = os.path.join(negativeImageClassFolder, rng.choice(os.listdir(negativeImageClassFolder)))

        anchorImages.append(anchorImagePath)
        positiveImages.append(positiveImagePath)
        negativeImages.append(negativeImagePath)

In [None]:
for i in range(100):
    print(anchorImages[i])
    print(positiveImages[i])
    print(negativeImages[i])

In [None]:
print(len(anchorImages))

In [None]:
anchorJsonPath = 'dsJSON/anchor_v29.json'
positiveJsonPath = 'dsJSON/positive_v29.json'
negativeJsonPath = 'dsJSON/negative_v29.json'

with open(anchorJsonPath, 'r') as f:
    #f.write(json.dumps(anchorImages))
    anchorImages = json.load(f)
    
with open(positiveJsonPath, 'r') as f:
    #f.write(json.dumps(positiveImages))
    positiveImages = json.load(f)

with open(negativeJsonPath, 'r') as f:
    #f.write(json.dumps(negativeImages))
    negativeImages = json.load(f)

In [None]:
valAnchorJsonPath = 'dsJSON/anchor_val.json'
valPositiveJsonPath = 'dsJSON/positive_val.json'
valNegativeJsonPath = 'dsJSON/negative_val.json'

with open(valAnchorJsonPath, 'r') as f:
    #f.write(json.dumps(anchorImages))
    valAnchorImages = json.load(f)
    
with open(valPositiveJsonPath, 'r') as f:
    #f.write(json.dumps(positiveImages))
    valPositiveImages = json.load(f)

with open(valNegativeJsonPath, 'r') as f:
    #f.write(json.dumps(negativeImages))
    valNegativeImages = json.load(f)

In [None]:
print(len(valAnchorImages))

In [None]:
anchor_dataset = tf.data.Dataset.from_tensor_slices(anchorImages)
positive_dataset = tf.data.Dataset.from_tensor_slices(positiveImages)
negative_dataset = tf.data.Dataset.from_tensor_slices(negativeImages)

dataset = tf.data.Dataset.zip((anchor_dataset, positive_dataset, negative_dataset))
dataset = dataset.shuffle(buffer_size=1024)
dataset = dataset.map(preprocess_triplets)

print(dataset)

train_dataset = dataset

train_dataset = train_dataset.batch(32, drop_remainder=False)
train_dataset = train_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

In [None]:
valAnchor_dataset = tf.data.Dataset.from_tensor_slices(valAnchorImages)
valPositive_dataset = tf.data.Dataset.from_tensor_slices(valPositiveImages)
valNegative_dataset = tf.data.Dataset.from_tensor_slices(valNegativeImages)

valDataset = tf.data.Dataset.zip((valAnchor_dataset, valPositive_dataset, valNegative_dataset))
valDataset = valDataset.shuffle(buffer_size=1024)
valDataset = valDataset.map(preprocess_triplets)

print(valDataset)

val_dataset = valDataset

val_dataset = val_dataset.batch(32, drop_remainder=False)
val_dataset = val_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

In [None]:
def visualize(anchor, positive, negative):
  
    def show(ax, image):
        ax.imshow(image / 255.)
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

    fig = plt.figure(figsize=(6, 6))

    axs = fig.subplots(3, 3)
    for i in range(3):
        show(axs[i, 0], anchor[i])
        show(axs[i, 1], positive[i])
        show(axs[i, 2], negative[i])


visualize(*list(train_dataset.take(1).as_numpy_iterator())[0])

In [None]:
base_cnn = resnet.ResNet101(
    weights="imagenet", input_shape=target_shape + (3,), include_top=False
)

flatten = layers.Flatten()(base_cnn.output)
dense1 = layers.Dense(512, activation="relu")(flatten)
dense1 = layers.BatchNormalization()(dense1)
dense2 = layers.Dense(256, activation="relu")(dense1)
dense2 = layers.BatchNormalization()(dense2)
output = layers.Dense(256)(dense2)

embedding = Model(base_cnn.input, output)

trainable = False
for layer in base_cnn.layers:
    if layer.name == "conv5_block1_out":
        trainable = True
    layer.trainable = trainable

In [None]:
class DistanceLayer(layers.Layer):
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        return (ap_distance, an_distance)


anchor_input = layers.Input(name="anchor", shape=target_shape + (3,))
positive_input = layers.Input(name="positive", shape=target_shape + (3,))
negative_input = layers.Input(name="negative", shape=target_shape + (3,))

distances = DistanceLayer()(
    embedding(resnet.preprocess_input(anchor_input)),
    embedding(resnet.preprocess_input(positive_input)),
    embedding(resnet.preprocess_input(negative_input)),
)

siamese_network = Model(
    inputs=[anchor_input, positive_input, negative_input], outputs=distances
)

In [None]:
class SiameseModel(Model):
    
    def __init__(self, siamese_network, margin=0.5):
        super(SiameseModel, self).__init__()
        self.siamese_network = siamese_network
        self.margin = margin
        self.loss_tracker = metrics.Mean(name="loss")

    def call(self, inputs):
        return self.siamese_network(inputs)

    def train_step(self, data):
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)

        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)

        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_network.trainable_weights)
        )

        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def test_step(self, data):
        loss = self._compute_loss(data)

        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def _compute_loss(self, data):
        ap_distance, an_distance = self.siamese_network(data)

        loss = ap_distance - an_distance
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss

    @property
    def metrics(self):
        return [self.loss_tracker]


In [None]:
checkpoint_filepath = "chkpts/" + version + "-{epoch:02d}.hdf5"
print(checkpoint_filepath)
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=True)

early_stopping_callback = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    mode='min',
    patience=3,
    verbose=1
)

In [None]:
siamese_model = SiameseModel(siamese_network)
#siamese_model.built = True

In [None]:
siamese_model.load_weights('v35-15.hdf5')

In [None]:
siamese_model.compile(optimizer=optimizers.Adam(0.0001))
siamese_model.fit(train_dataset, epochs=30, validation_data=val_dataset, callbacks=[model_checkpoint_callback, early_stopping_callback])

In [None]:
embedding.save(fullModelName)
#!tar -czvf {fullModelName}.tar.gz {fullModelName}
#!mv {fullModelName}.tar.gz gdrive/MyDrive/jugio/{fullModelName}.tar.gz

In [None]:
sample = next(iter(train_dataset))
visualize(*sample)

anchor, positive, negative = sample
anchor_embedding, positive_embedding, negative_embedding = (
    embedding(resnet.preprocess_input(anchor)),
    embedding(resnet.preprocess_input(positive)),
    embedding(resnet.preprocess_input(negative)),
)

In [None]:
cosine_similarity = metrics.CosineSimilarity()

positive_similarity = cosine_similarity(anchor_embedding, positive_embedding)
print("Positive similarity:", positive_similarity.numpy())

negative_similarity = cosine_similarity(anchor_embedding, negative_embedding)
print("Negative similarity", negative_similarity.numpy())

In [None]:
!tar -xf gdrive/MyDrive/jugio/{fullModelName}.tar.gz

In [None]:
print(fullModelName)
embedding = tf.keras.models.load_model(fullModelName, compile=False)

In [None]:
lmaoPath = 'data\\img_cropped_skewed\\44818\\44818_0.jpg'
imageObj = preprocess_image(lmaoPath)
imageObj = np.expand_dims(imageObj, axis=0)
imageObj = tf.constant(imageObj)
val = embedding(resnet.preprocess_input(imageObj)).numpy().tolist()[0]
print(val)

In [None]:
# Convert the model.
converter = tf.lite.TFLiteConverter.from_keras_model(embedding)
tflite_model = converter.convert()

# Save the model.
with open(fullModelName + '\\' + fullModelName + '.tflite', 'wb') as f:
    f.write(tflite_model)


In [None]:
anchor, positive, negative = sample
anchor_embedding, positive_embedding, negative_embedding = (
    embedding(resnet.preprocess_input(anchor)),
    embedding(resnet.preprocess_input(positive)),
    embedding(resnet.preprocess_input(negative)),
)

In [None]:
cosine_similarity = metrics.CosineSimilarity()

positive_similarity = cosine_similarity(anchor_embedding, positive_embedding)
print("Positive similarity:", positive_similarity.numpy())

negative_similarity = cosine_similarity(anchor_embedding, negative_embedding)
print("Negative similarity", negative_similarity.numpy())

In [None]:
correct = 0
total = 0
sample = next(iter(train_dataset), None)
cosine_similarity = metrics.CosineSimilarity()

for i in range(100):
  anchor, positive, negative = sample
  anchor_embedding, positive_embedding, negative_embedding = (
      embedding(resnet.preprocess_input(anchor)),
      embedding(resnet.preprocess_input(positive)),
      embedding(resnet.preprocess_input(negative)),
  )
  
  positive_similarity = cosine_similarity(anchor_embedding, positive_embedding)
  negative_similarity = cosine_similarity(anchor_embedding, negative_embedding)

  if(positive_similarity > negative_similarity):
    correct = correct + 1

  print(str(i), " done")
  cosine_similarity.reset_state()
  total = total + 1
  sample = next(iter(train_dataset), None)

print('Correct: ', str(correct))
print('Total: ', str(total))

In [None]:
embeddingValues = {}

In [None]:
cnt = 0
for imageClass in imageClasses:
    imageClassFolder = os.path.join(imagesPath, imageClass)
    images = sorted(os.listdir(imageClassFolder))

    valuesForClass = []

    for image in images:
        imagePath = os.path.join(imageClassFolder, image)
        imageObj = preprocess_image(imagePath)
        imageObj = np.expand_dims(imageObj, axis=0)
        imageObj = tf.constant(imageObj)
        val = embedding(resnet.preprocess_input(imageObj))
        valuesForClass.append(val.numpy().tolist()[0])
        print(image)
        break

    embeddingValues[imageClass] = valuesForClass
    cnt = cnt + 1
    print('Calculated ', str(cnt), ' out of ', len(imageClasses))

In [None]:
import json

In [None]:
embeddingsFileName = 'embeddings_' + version + '.json'

In [None]:
with open(embeddingsFileName, 'w') as f:
      f.write(json.dumps(embeddingValues)) 

In [None]:
!cp {embeddingsFileName} gdrive/MyDrive/jugio/again.json

In [None]:
!cp gdrive/MyDrive/jugio/{embeddingsFileName} {embeddingsFileName}

In [None]:
with open(embeddingsFileName, 'r') as f:
    embeddingValues = json.load(f)

In [None]:
def testEmbedding(testClass):
    testEmbeddingPath = os.path.join(imagesPath, testClass, testClass + '_0.jpg')
    imageObj = preprocess_image(testEmbeddingPath)
    imageObj = np.expand_dims(imageObj, axis=0)
    imageObj = tf.constant(imageObj)
    val = embedding(resnet.preprocess_input(imageObj))
    print(val.numpy().tolist()[0])

In [None]:
testEmbedding('100419007')

In [None]:
embeddingValues['100419007']

In [None]:
!cp embeddings.json gdrive/MyDrive/jugio/embeddings.json

In [None]:
from numpy.linalg import norm

In [None]:
def predict(imagePath):
    imageObj = preprocess_image(imagePath)
    imageObj = np.expand_dims(imageObj, axis=0)
    imageObj = tf.constant(imageObj)
    val = embedding(resnet.preprocess_input(imageObj)).numpy().tolist()[0]
    current_max = -1
    current_card = None
    for cardId, cardValues in embeddingValues.items():
        for cardVal in cardValues:
            #similarity = np.dot(val, cardVal)/(norm(val)*norm(cardVal))
            similarity = norm(np.subtract(val, cardVal))
            
            if current_card is None or similarity < current_max:
                current_max = similarity
                current_card = cardId
            
            #if similarity > current_max:
            #    current_max = similarity
            #    current_card = cardId

    return current_card, current_max

In [None]:
!cp gdrive/MyDrive/jugio/data_trunc.json data/data_trunc.json

In [None]:
cardInfo = {}
with open('data/data_trunc.json', 'r') as f:
    cardInfoJson = json.load(f)
    for card in cardInfoJson:
        cardInfo[str(card['id'])] = card['name']

In [None]:
imagePath = 'blackback.jpg'

card, max = predict(imagePath)
print(cardInfo[card])

In [None]:
imageObj = Image.open(imagePath)
plt.imshow(imageObj)

In [None]:
def testModel():
    testDataFolderPath = os.path.join('data', 'test_v4')
    testCards = os.listdir(testDataFolderPath)
    correct = 0
    total = 0
    for testCard in testCards:
        testCardPath = os.path.join(testDataFolderPath, testCard)
        expectedResult = os.path.splitext(testCard)[0]
        
        result, resultScore = predict(testCardPath)
        if result == expectedResult:
            correct = correct + 1
            
        print(str(total), ': Expected ', expectedResult, ', got ', result)
        total = total + 1
        print('Correct: ', str(correct))
        print('Total: ', str(total))

In [None]:
!tar -xf gdrive/MyDrive/jugio/test.tar.gz

In [None]:
testModel()