In [13]:
import tensorflow as tf
from tensorflow.keras import layers, Model
from tensorflow.keras.applications import EfficientNetB3
import numpy as np
import pandas as pd
from sklearn.neighbors import NearestNeighbors
import os
from tqdm import tqdm

In [14]:
# Enable GPU acceleration and mixed precision
tf.config.optimizer.set_jit(True)
tf.keras.mixed_precision.set_global_policy('mixed_float16')

In [15]:
# Configuration
TARGET_SIZE = (224, 224)
BATCH_SIZE = 256
AUTOTUNE = tf.data.AUTOTUNE

In [17]:
### Phase 1: Optimized General Color Model ###
class LABPreprocessing(layers.Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.mean = tf.constant([0.485, 0.456, 0.406])
        self.std = tf.constant([0.229, 0.224, 0.225])
        
    def call(self, inputs):
        # Normalize with ImageNet stats
        x = (inputs - self.mean) / self.std
        
        # Convert RGB to LAB using TF ops
        xyz = tf.image.rgb_to_xyz(x)
        xyz = tf.clip_by_value(xyz, 1e-8, 1.0)
        
        # XYZ to LAB conversion
        xyz = tf.unstack(xyz, axis=-1)
        x, y, z = [t * 100.0 for t in xyz]
        
        l = tf.clip(116.0 * self._f(y/100.0) - 16.0, 0, 100)
        a = 500.0 * (self._f(x/95.047) - self._f(y/100.0))
        b = 200.0 * (self._f(y/100.0) - self._f(z/108.883))
        
        # Dynamic range adjustment
        l = l / 100.0
        a = (a + 128.0) / 255.0
        b = (b + 128.0) / 255.0
        
        return tf.stack([l, a, b], axis=-1)
    
    def _f(self, t):
        return tf.where(t > 0.008856, tf.pow(t, 1/3), 7.787*t + 16/116)

In [18]:
def build_optimized_model():
    inputs = layers.Input((*TARGET_SIZE, 3), dtype=tf.float32)
    x = layers.Rescaling(1./255)(inputs)
    
    # Enhanced preprocessing
    x = LABPreprocessing()(x)
    
    # Architecture optimized for mobile
    base = EfficientNetB3(include_top=False, weights='imagenet', 
                         input_shape=(*TARGET_SIZE, 3))
    base.trainable = False
    
    # Feature extraction with depthwise pooling
    x = base(x)
    x = layers.DepthwiseConv2D(3, activation='swish')(x)
    x = layers.GlobalAveragePooling2D()(x)
    
    # Mixed precision-aware dense layers
    x = layers.Dense(256, dtype='float32')(x)
    x = layers.LayerNormalization()(x)
    outputs = layers.Dense(3, activation='sigmoid', dtype='float32')(x)
    
    model = Model(inputs, outputs)
    model.compile(optimizer=tf.keras.optimizers.Adam(0.001),
                 loss='mse', metrics=['mae'])
    return model

In [19]:
### Phase 2: Optimized Self-Supervised Learning ###
class SimCLR(tf.keras.Model):
    def __init__(self, base_model, temperature=0.1):
        super().__init__()
        self.temperature = temperature
        self.encoder = base_model
        self.projection = tf.keras.Sequential([
            layers.Dense(256, activation='relu'),
            layers.LayerNormalization(),
            layers.Dense(128)
        ])
        
    def compile(self, optimizer, **kwargs):
        super().compile(**kwargs)
        self.optimizer = optimizer
        self.loss_tracker = tf.keras.metrics.Mean(name="loss")
        
    def train_step(self, images):
        # Generate augmented views
        aug1 = self.augment(images)
        aug2 = self.augment(images)
        
        with tf.GradientTape() as tape:
            z1 = self.projection(self.encoder(aug1))
            z2 = self.projection(self.encoder(aug2))
            loss = self.nt_xent_loss(z1, z2)
            
        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}
    
    def augment(self, x):
        return tf.keras.Sequential([
            layers.RandomFlip("horizontal"),
            layers.RandomRotation(0.2),
            layers.RandomZoom(0.2),
            layers.RandomContrast(0.2),
            layers.GaussianNoise(0.1)
        ])(x)
    
    def nt_xent_loss(self, z1, z2):
        z = tf.concat([z1, z2], axis=0)
        similarity = tf.matmul(z, z, transpose_b=True) / self.temperature
        batch_size = tf.shape(z1)[0]
        
        # Create positive mask
        pos_mask = tf.eye(batch_size * 2, batch_size * 2, dtype=tf.bool)
        pos_mask = tf.math.logical_xor(pos_mask, tf.roll(pos_mask, batch_size, axis=0))
        
        # Calculate NT-Xent loss
        logits = tf.where(pos_mask, similarity, -1e9)
        labels = tf.range(batch_size)
        labels = tf.concat([labels, labels], axis=0)
        return tf.reduce_mean(
            tf.nn.sparse_softmax_cross_entropy_with_logits(labels, logits))

In [20]:
### Phase 3: Vectorized Munsell Mapping ###
class MunsellMapper:
    def __init__(self, csv_path):
        df = pd.read_csv(csv_path)
        self.lab = df[["L", "a", "b"]].values
        self.codes = df["munsell"].values
        self.nn = NearestNeighbors(n_neighbors=5, algorithm='ball_tree').fit(self.lab)
        
    def lab_to_munsell(self, lab):
        lab = np.clip(lab, [0, -128, -128], [100, 127, 127])
        distances, indices = self.nn.kneighbors(lab.reshape(1, -1))
        
        # Apply tolerances (±1 unit for value/chroma)
        candidates = self.codes[indices[0]]
        lab_values = self.lab[indices[0]]
        
        # Find best match within tolerance
        valid = (np.abs(lab_values[:, 0] - lab[0]) <= 1) & \
                (np.abs(lab_values[:, 1] - lab[1]) <= 1)
        if np.any(valid):
            return candidates[valid][0]
        return candidates[0]

In [None]:
### Phase 4: Mobile-Optimized Deployment ###
class MobileInference:
    def __init__(self, model_path, munsell_mapper):
        self.interpreter = tf.lite.Interpreter(model_path=model_path)
        self.interpreter.allocate_tensors()
        self.mapper = munsell_mapper
        
    def predict(self, images):
        input_details = self.interpreter.get_input_details()
        output_details = self.interpreter.get_output_details()
        
        # Process multiple images
        outputs = []
        for img in images:
            img = tf.image.resize(img, TARGET_SIZE).numpy()
            self.interpreter.set_tensor(input_details[0]['index'], img[np.newaxis])
            self.interpreter.invoke()
            lab = self.interpreter.get_tensor(output_details[0]['index'])[0]
            
            # Denormalize LAB
            lab = [
                lab[0] * 100,
                (lab[1] * 255) - 128,
                (lab[2] * 255) - 128
            ]
            outputs.append(self.mapper.lab_to_munsell(lab))
        
        # Return most frequent prediction
        return max(set(outputs), key=outputs.count)


In [21]:
## Optimized Data Pipeline ###
def create_pipeline(file_pattern, shuffle=True):
    def process_path(file_path):
        img = tf.io.read_file(file_path)
        img = tf.image.decode_jpeg(img, channels=3)
        return tf.image.convert_image_dtype(img, tf.float32)
    
    ds = tf.data.Dataset.list_files(file_pattern)
    if shuffle: ds = ds.shuffle(10000)
    return ds.map(process_path, num_parallel_calls=AUTOTUNE) \
             .batch(BATCH_SIZE) \
             .prefetch(AUTOTUNE)


In [22]:
### Enhanced Evaluation ###
class ColorEvaluator:
    def __init__(self, model, mapper):
        self.model = model
        self.mapper = mapper
        
    def delta_e(self, lab1, lab2):
        # CIEDE2000 implementation
        try:
            from colormath.color_objects import LabColor
            from colormath.color_diff import delta_e_cie2000
            return delta_e_cie2000(LabColor(*lab1), LabColor(*lab2))
        except ImportError:
            # Fallback to Euclidean distance
            return np.sqrt(np.sum((lab1 - lab2)**2))
        
    def evaluate(self, dataset):
        lab_errors = []
        delta_es = []
        munsell_acc = 0
        total = 0
        
        for batch in dataset:
            preds = self.model.predict(batch)
            for i in range(len(batch)):
                # Convert predictions to LAB
                lab_pred = [
                    preds[i][0] * 100,
                    (preds[i][1] * 255) - 128,
                    (preds[i][2] * 255) - 128
                ]
                
                # Get ground truth (implementation specific)
                lab_true = get_true_lab(batch[i])  
                munsell_true = get_munsell_label(batch[i])
                
                # Calculate metrics
                lab_errors.append(np.mean((lab_pred - lab_true)**2))
                delta_es.append(self.delta_e(lab_pred, lab_true))
                munsell_acc += (self.mapper.lab_to_munsell(lab_pred) == munsell_true)
                total +=1
                
        return {
            "lab_mse": np.mean(lab_errors),
            "delta_e": np.mean(delta_es),
            "munsell_acc": munsell_acc / total
        }


In [23]:
# Phase 1: General Model 
general_model = build_optimized_model()
train_ds = create_pipeline("/home/sala/data/general/*.jpg")
general_model.fit(train_ds, epochs=10)

TypeError: Exception encountered when calling LABPreprocessing.call().

[1mCould not automatically infer the output shape / dtype of 'lab_preprocessing' (of type LABPreprocessing). Either the `LABPreprocessing.call()` method is incorrect, or you need to implement the `LABPreprocessing.compute_output_spec() / compute_output_shape()` method. Error encountered:

Input 'y' of 'Sub' Op has type float32 that does not match type float16 of argument 'x'.[0m

Arguments received by LABPreprocessing.call():
  • args=('<KerasTensor shape=(None, 224, 224, 3), dtype=float16, sparse=False, name=keras_tensor_179>',)
  • kwargs=<class 'inspect._empty'>

In [None]:
### Training Orchestration ###
def main():
    
    
    # Phase 2: Self-supervised Fine-tuning
    simclr = SimCLR(general_model)
    simclr.compile(optimizer=tf.keras.optimizers.Adam(0.0001))
    soil_ds = create_pipeline("/home/sala/data/soil_data/test/Alluvial soil/*.jpg")
    simclr.fit(soil_ds, epochs=5)
    
    # Phase 3: Munsell Mapping
    mapper = MunsellMapper("munsell.csv")
    
    # Phase 4: Mobile Conversion
    converter = tf.lite.TFLiteConverter.from_keras_model(simclr.encoder)
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    converter.representative_dataset = lambda: representative_dataset_gen()
    tflite_model = converter.convert()
    
    # Save and test
    with open("soil_color.tflite", "wb") as f:
        f.write(tflite_model)
    
    # Evaluate
    mobile_model = MobileInference("soil_color.tflite", mapper)
    evaluator = ColorEvaluator(mobile_model, mapper)
    test_ds = create_pipeline("test_images/*.jpg", shuffle=False)
    results = evaluator.evaluate(test_ds)
    
    print(f"LAB MSE: {results['lab_mse']:.2f}")
    print(f"ΔE 2000: {results['delta_e']:.2f}")
    print(f"Munsell Accuracy: {results['munsell_acc']*100:.1f}%")

if __name__ == "__main__":
    main()