In [1]:
from collections import defaultdict
import imageio.v2 as iio
import matplotlib.pyplot as plt
import numpy as np
import os
import tensorflow as tf
import tensorflow.keras as ks
from transformers import ViTImageProcessor, TFViTForImageClassification

2024-07-21 20:54:52.080921: I external/local_tsl/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.
2024-07-21 20:54:52.153050: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-07-21 20:54:52.153085: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-07-21 20:54:52.156973: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-07-21 20:54:52.170989: I external/local_tsl/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.
2024-07-21 20:54:52.172926: I tensorflow/core/platform/cpu_feature_guard.cc:1

In [2]:
# Create an instance of ViTImageProcessor and use ViT base model as pretrained model
feature_extractor = ViTImageProcessor.from_pretrained('google/vit-base-patch16-224')
# Create an instance of ViTForImageClassification and use ViT base model as pretrained model
model = TFViTForImageClassification.from_pretrained('google/vit-base-patch16-224')
model.summary(expand_nested=True)

All PyTorch model weights were used when initializing TFViTForImageClassification.

All the weights of TFViTForImageClassification were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFViTForImageClassification for predictions without further training.


Model: "tf_vi_t_for_image_classification"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 vit (TFViTMainLayer)        multiple                  85798656  
                                                                 
 classifier (Dense)          multiple                  769000    
                                                                 
Total params: 86567656 (330.23 MB)
Trainable params: 86567656 (330.23 MB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [26]:
# List of directories with data to use in training, validation and testing
datadir_list = [
    '/home/felbus/transformers/data/Lego', 
    '/home/felbus/transformers/data/Duplo'
]

# Definition of useful classes and functions for later Papeline creation

class ParseCat:
    '''Class helps to transform string labels to integer numbers. Therefore an initially
    empty dictionary is filled with an pair (categry_str: #), when class is called. 
    # is the integer representation of a category string value and is generated automatically,
    when class is called with an unknown category string value. Class returns # uppon call.
    '''
    def __init__(self):
        self.cat_dict = {} # emtpy dictionary to hold ('category': #) pairs

    def __call__(self, category):
        # Dynamically define #, if category is unkwown, return #
        if (cat_int := self.cat_dict.get(category)) is None: 
            self.cat_dict[category] = cat_int = len(self.cat_dict)
        return cat_int # return #

    def __str__(self):
        # Generate human readable output for print()
        content_str = f'#\t\tCategory\n-------------------------------------\n'
        for i, (key, value) in enumerate(self.cat_dict.items()):
            content_str += f'{value}\t\t{key}\n'
        return content_str
        

@tf.py_function(Tout=tf.float32)
def apply_feature_extractor(x):
    '''Function will apply "feature_extractor" to image data. Decorator allows usage of this function inside tf.data.Dataset.map().
    Parameters:
    x <tf.Tensor>: image data

    Return:
    <tf.Tensor>: features of image data with shape (channels, heigth, width)
    '''
    return feature_extractor.preprocess(images=x, return_tensors='tf')['pixel_values']

lego_parse = ParseCat()

def list_file_generator(dir_list):
    '''Generator function will take a list with directory paths and yield tuples
    of two strings (filepath, label). Label will be extracted from lowest part of
    directory path and is pared to an integer category number.
    Parameters:
    dir_list <list <str>>: list of strings containing paths to directories

    Return:
    <tuple (<str>, <int>): Tuple containing absolute file path and label category
    '''
    # Loop over all directories passed in dir_list
    for directory in dir_list:
        # Decode bytestring to string
        directory = directory.decode('utf-8')
        # Take lowest directory name as label and parse it to integer
        _, label_str = os.path.split(directory)
        label = lego_parse(label_str)
        # Create list of all files in directory and loop over these files
        file_list = os.listdir(directory)
        for file in file_list:
            # join path and filename to absolute file path of this file
            file_path = os.path.join(directory, file)
            # yield tupel (absolute file path to image file, label)
            yield file_path, label
            
def load_image_data(path):
    '''Function will load image data and process it with tensorflow decode_image function.
    Parameters:
    path <str>: file path string to load the image from

    Return:
    <tf.Tensor>: Decoded image with shape (height, width, channels)
    '''
    image = tf.io.read_file(path)
    return tf.io.decode_image(image, channels=3)

# Creation of Pipeline object tf.data.Dataset

# Initiate an tf.data.Dataset instance from generator function. 
# Elements will be of type tuple with two Tensors containing a string (file path and label)
image_dataset = tf.data.Dataset.from_generator(
    list_file_generator, args=[datadir_list], output_signature=(
        tf.TensorSpec(shape=(), dtype=tf.string), # file path
        tf.TensorSpec(shape=(), dtype=tf.int16) # label category
    )
)


# Load image data by mapping file path to load_image_data function
image_dataset = image_dataset.map(lambda x, y: (load_image_data(x), y))


# Wrap call of ViTImageProcessor in batch() / unbatch(), because otherwise it will add a dimension, that is not
# recognized as a batching dimension by the tf.data.Dataset instance.
image_dataset = image_dataset.batch(2)
# Apply feature_extractor to all image data, data in pipeline will be "channel dimension first"
# Images will be rescaled to [0, 1] and then normalized to means [0.5, 0.5, 0.5], resized to 3x224x224
image_dataset = image_dataset.map(lambda x, y: (apply_feature_extractor(x), y))
image_dataset = image_dataset.unbatch()

# Create new dataset from iterator over image_dataset. This is a work around, as the
# return of ViTImageProcessor somehow returns a Dataset, that is unsuitable to be 
# passed to TFViTForImageClassification.
image_dataset = tf.data.Dataset.from_generator(
    image_dataset.__iter__, output_signature=(
        tf.TensorSpec(shape=(3, 224, 224), dtype=tf.float32),
        tf.TensorSpec(shape=(), dtype=tf.int16)
    )
)
# Batching 
image_dataset = image_dataset.batch(3)

tf.math.argmax(model.predict(image_dataset).logits, axis=1)



<tf.Tensor: shape=(20,), dtype=int64, numpy=
array([844, 844, 844, 710, 844, 750, 761, 844, 767, 591, 767, 411, 710,
       844, 626, 844, 767, 767, 844, 710])>

In [27]:

print(lego_parse)

#		Category
-------------------------------------
0		Lego
1		Duplo



In [44]:
def list_files_from_directory(path):
    print(type(path))
    return path

data_folders = ['~/transformers/data/Lego', '~/transformers/data/Duplo']
# Create Dataset structure over list of datafolders and use tf.data.Dataset.interleave to create Dataset of image data
image_pipeline = tf.data.Dataset.from_tensor_slices(data_folders)
image_pipeline = image_pipeline.map(lambda x: list_files_from_directory(x))
#image_pipeline = image_pipeline.interleave(lambda x: list_files_from_directory(x))
for i, val in enumerate(image_pipeline):
    print(type(val))
    print(val.numpy())

<class 'tensorflow.python.framework.ops.SymbolicTensor'>
<class 'tensorflow.python.framework.ops.EagerTensor'>
b'~/transformers/data/Lego'
<class 'tensorflow.python.framework.ops.EagerTensor'>
b'~/transformers/data/Duplo'


In [31]:
prediction = model.predict(image_pipeline)
tf.argmax(prediction.logits, axis=1)



<tf.Tensor: shape=(3,), dtype=int64, numpy=array([285, 285, 282])>

In [26]:
# Create tf.data.Dataset containing all ".jpg" file names in folder
test = tf.data.Dataset.list_files('*.jpg')
# Load the image date from file names
test = test.map(lambda x: load_image_data(x))

for i, val in enumerate(test):
    print(val.shape)

(224, 224, 3)
(224, 224, 3)
(224, 224, 3)


In [26]:
for i, val in enumerate(image_pipeline.unbatch()):
    print(val[0].numpy().shape)
    print(val[0].set_shape([224, 224, 3]).shape)
    #plt.imshow(val[0])

(3, 224, 224)


ValueError: Tensor's shape (3, 224, 224) is not compatible with supplied shape [224, 224, 3].

In [75]:
# ToDo : Learn about shapes and that shapes 

<tensorflow.python.data.ops.options.Options at 0x7f5106db8280>

In [11]:
pic = tf.io.read_file('sample.jpg')
pic = tf.io.decode_image(pic)
pic_features = feature_extractor(pic, return_tensors='tf')
prediction = model.predict(pic_features['pixel_values'])
pic_features.keys()



dict_keys(['pixel_values'])

In [7]:
type([])

list