## 2. Visualization Functions

Functions for visualizing the training data and results:
- Plotting sample images with their classes
- Plotting confusion matrices
- Visualizing example errors

In [None]:
def plot_images(images, cls_true, cls_pred=None):
    """Plot 9 images in a 3x3 grid with true and predicted classes."""
    if len(images) == 0:
        print("no images to show")
        return 
    else:
        random_indices = random.sample(range(len(images)), min(len(images), 9))
        
    if cls_pred is not None:
        images, cls_true, cls_pred = zip(*[(images[i], cls_true[i], cls_pred[i]) for i in random_indices])
    else:
        images, cls_true = zip(*[(images[i], cls_true[i]) for i in random_indices])
    
    # Create figure with 3x3 sub-plots.
    fig, axes = plt.subplots(3, 3)
    fig.subplots_adjust(hspace=0.3, wspace=0.3)

    for i, ax in enumerate(axes.flat):
        # Plot image.
        ax.imshow(images[i].reshape(img_size, img_size, num_channels))

        # Show true and predicted classes.
        if cls_pred is None:
            xlabel = f"True: {cls_true[i]}"
        else:
            xlabel = f"True: {cls_true[i]}\nPred: {cls_pred[i]}"

        # Show the classes as the label on the x-axis.
        ax.set_xlabel(xlabel)
        
        # Remove ticks from the plot.
        ax.set_xticks([])
        ax.set_yticks([])
    
    plt.show()

def plot_confusion_matrix(cls_pred, cls_true):
    """Plot confusion matrix."""
    # Get the confusion matrix using sklearn.
    cm = confusion_matrix(y_true=cls_true,
                         y_pred=cls_pred)
    
    plt.figure(figsize=(10, 8))
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title('Confusion Matrix')
    plt.colorbar()
    
    # Set up the axis labels with just numbers 0-4
    num_classes = 5
    tick_marks = np.arange(num_classes)
    plt.xticks(tick_marks)
    plt.yticks(tick_marks)
    
    # Normalize the confusion matrix.
    cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    
    # Add percentage labels to the plot.
    thresh = cm.max() / 2.
    for i, j in np.ndindex(cm.shape):
        plt.text(j, i, '{:.2f}%'.format(cm[i, j]*100),
                horizontalalignment="center",
                color="white" if cm[i, j] > thresh else "black")
    
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()
    plt.show()

## 3. CNN Model Implementation

The CNN model consists of:
1. Image processing branch with convolutional layers
2. Weather feature processing branch with dense layers
3. Combined network for final classification

Let's implement the helper functions for creating layers first.

In [None]:
def new_weights(shape):
    """Create new TensorFlow weights."""
    return tf.Variable(tf.truncated_normal(shape, stddev=0.05))

def new_biases(length):
    """Create new TensorFlow biases."""
    return tf.Variable(tf.constant(0.05, shape=[length]))

def new_conv_layer(input, num_input_channels, filter_size, num_filters, use_pooling=True):
    """Create a new convolutional layer."""
    # Shape of the filter-weights for the convolution.
    shape = [filter_size, filter_size, num_input_channels, num_filters]

    # Create new weights and biases
    weights = new_weights(shape=shape)
    biases = new_biases(length=num_filters)

    # Create the TensorFlow operation for convolution.
    layer = tf.nn.conv2d(input=input,
                         filter=weights,
                         strides=[1, 1, 1, 1],
                         padding='SAME')

    # Add the biases to the results of the convolution.
    layer += biases

    # Use pooling to down-sample the image resolution?
    if use_pooling:
        layer = tf.nn.max_pool(value=layer,
                              ksize=[1, 2, 2, 1],
                              strides=[1, 2, 2, 1],
                              padding='SAME')

    # Rectified Linear Unit (ReLU).
    layer = tf.nn.relu(layer)

    return layer, weights

def flatten_layer(layer):
    """Flatten a layer for fully-connected layers."""
    # Get the shape of the input layer.
    layer_shape = layer.get_shape()

    # The number of features is: img_height * img_width * num_channels
    num_features = layer_shape[1:4].num_elements()
    
    # Reshape the layer to [num_images, num_features].
    layer_flat = tf.reshape(layer, [-1, num_features])

    return layer_flat, num_features

def new_fc_layer(input, num_inputs, num_outputs, use_relu=True):
    """Create a new fully-connected layer."""
    # Create new weights and biases.
    weights = new_weights(shape=[num_inputs, num_outputs])
    biases = new_biases(length=num_outputs)

    # Calculate the layer as the matrix multiplication of
    # the input and weights, and then add the bias-values.
    layer = tf.matmul(input, weights) + biases

    # Use ReLU?
    if use_relu:
        layer = tf.nn.relu(layer)

    return layer

In [None]:
def create_weather_cnn(num_classes):
    """Create the multi-modal weather classification CNN with both images and weather features."""
    # Placeholder variables for images
    x_images = tf.placeholder(tf.float32, shape=[None, img_size_flat], name='x_images')
    x_image = tf.reshape(x_images, [-1, img_size, img_size, num_channels])
    
    # Placeholder variables for weather features
    x_weather = tf.placeholder(tf.float32, shape=[None, num_weather_features], name='x_weather')
    
    y_true = tf.placeholder(tf.float32, shape=[None, num_classes], name='y_true')
    y_true_cls = tf.argmax(y_true, axis=1)

    # Convolutional layers for image processing
    layer_conv1, weights_conv1 = new_conv_layer(input=x_image,
                                               num_input_channels=num_channels,
                                               filter_size=filter_size1,
                                               num_filters=num_filters1,
                                               use_pooling=True)

    layer_conv2, weights_conv2 = new_conv_layer(input=layer_conv1,
                                               num_input_channels=num_filters1,
                                               filter_size=filter_size2,
                                               num_filters=num_filters2,
                                               use_pooling=True)

    layer_conv3, weights_conv3 = new_conv_layer(input=layer_conv2,
                                               num_input_channels=num_filters2,
                                               filter_size=filter_size3,
                                               num_filters=num_filters3,
                                               use_pooling=True)

    # Flatten the convolutional output
    layer_flat, num_features = flatten_layer(layer_conv3)

    # Fully-connected layer for image features
    layer_fc1_images = new_fc_layer(input=layer_flat,
                                   num_inputs=num_features,
                                   num_outputs=fc_size,
                                   use_relu=True)

    # Fully-connected layer for weather features
    layer_fc1_weather = new_fc_layer(input=x_weather,
                                    num_inputs=num_weather_features,
                                    num_outputs=weather_fc_size,
                                    use_relu=True)

    # Concatenate image and weather features
    layer_combined = tf.concat([layer_fc1_images, layer_fc1_weather], axis=1)
    combined_size = fc_size + weather_fc_size

    # Final fully-connected layer for classification
    layer_fc2 = new_fc_layer(input=layer_combined,
                            num_inputs=combined_size,
                            num_outputs=num_classes,
                            use_relu=False)

    # Predicted Class
    y_pred = tf.nn.softmax(layer_fc2)
    y_pred_cls = tf.argmax(y_pred, axis=1)

    # Cost-function to be optimized
    cross_entropy = tf.nn.softmax_cross_entropy_with_logits_v2(logits=layer_fc2, labels=y_true)
    cost = tf.reduce_mean(cross_entropy)

    # Optimization Method
    optimizer = tf.train.AdamOptimizer(learning_rate=1e-4).minimize(cost)

    # Performance Measures
    correct_prediction = tf.equal(y_pred_cls, y_true_cls)
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

    return x_images, x_weather, y_true, cost, accuracy, y_pred_cls

## 4. Training and Evaluation Functions

Functions for:
- Training the model
- Printing progress and validation accuracy
- Early stopping based on validation loss
- Plotting training history

In [None]:
def optimize(num_iterations, data, session, model_vars, early_stopping=None):
    """Train the model."""
    # Unpack model variables
    x_images, x_weather, y_true, cost, accuracy, y_pred_cls = model_vars
    
    # For plotting learning curve
    costs = []
    train_accuracies = []
    val_accuracies = []
    
    # Early stopping variables
    best_val_loss = float('inf')
    patience = early_stopping if early_stopping else float('inf')
    patience_counter = 0
    
    print("Training started...")
    
    for i in range(num_iterations):
        # Get a batch of training data
        x_batch, y_batch, _, cls_batch, features_batch = data.train.next_batch(batch_size)
        
        # Create feed dictionary for training
        feed_dict_train = {
            x_images: x_batch,
            x_weather: features_batch,
            y_true: y_batch
        }
        
        # Create feed dictionary for validation
        feed_dict_val = {
            x_images: data.valid.images,
            x_weather: data.valid.features,
            y_true: data.valid.labels
        }
        
        # Run the optimizer using this batch of training data
        session.run(optimizer, feed_dict=feed_dict_train)
        
        if i % 100 == 0:
            # Calculate the accuracy on the training-batch
            acc_train = session.run(accuracy, feed_dict=feed_dict_train)
            acc_val = session.run(accuracy, feed_dict=feed_dict_val)
            val_loss = session.run(cost, feed_dict=feed_dict_val)
            
            costs.append(val_loss)
            train_accuracies.append(acc_train)
            val_accuracies.append(acc_val)
            
            msg = "Iteration: {0:>6}, Training Accuracy: {1:>6.1%}, Validation Accuracy: {2:>6.1%}, Validation Loss: {3:.3f}"
            print(msg.format(i, acc_train, acc_val, val_loss))
            
            # Early stopping check
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                patience_counter = 0
            else:
                patience_counter += 1
            
            if patience_counter >= patience:
                print("Early stopping triggered!")
                break
    
    # Plot the learning curves
    iterations = range(0, num_iterations, 100)[:len(costs)]
    plt.figure(figsize=(12, 4))
    
    plt.subplot(1, 2, 1)
    plt.plot(iterations, costs)
    plt.title('Validation Loss')
    plt.xlabel('Iteration')
    plt.ylabel('Loss')
    
    plt.subplot(1, 2, 2)
    plt.plot(iterations, train_accuracies, label='Training')
    plt.plot(iterations, val_accuracies, label='Validation')
    plt.title('Accuracy')
    plt.xlabel('Iteration')
    plt.ylabel('Accuracy')
    plt.legend()
    
    plt.tight_layout()
    plt.show()
    
    return costs, train_accuracies, val_accuracies

def print_validation_accuracy(data, session, model_vars, cluster_descriptions):
    """Calculate and print the validation accuracy."""
    # Unpack model variables
    x_images, x_weather, y_true, cost, accuracy, y_pred_cls = model_vars
    
    # Create feed-dict for the validation data
    feed_dict = {
        x_images: data.valid.images,
        x_weather: data.valid.features,
        y_true: data.valid.labels
    }
    
    # Get the predicted class-numbers
    cls_pred = session.run(y_pred_cls, feed_dict=feed_dict)
    
    # Calculate accuracy
    correct = (cls_pred == data.valid.cls)
    correct_sum = correct.sum()
    acc = float(correct_sum) / len(correct)
    
    print("Validation Accuracy: {0:.1%}".format(acc))
    
    # Confusion Matrix
    class_names = [cluster_descriptions[i]['description'] for i in range(len(cluster_descriptions))]
    plot_confusion_matrix(cls_pred, data.valid.cls, class_names)
    
    # Show some example errors
    mask = ~correct
    incorrect_images = data.valid.images[mask]
    incorrect_cls_true = data.valid.cls[mask]
    incorrect_cls_pred = cls_pred[mask]
    
    # Plot some example errors
    if len(incorrect_images) > 0:
        print("\nExample errors:")
        plot_images(incorrect_images, incorrect_cls_true, incorrect_cls_pred, cluster_descriptions)
    else:
        print("\nNo errors found in validation set!")