In [1]:
import tensorflow as tf
import numpy as np
import logging
import datetime

# --- Configure Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

logger.info("Starting TensorFlow basic test on Vertex AI Workbench.")

2025-07-28 13:52:54.445494: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2025-07-28 13:52:55.731298: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/cuda/lib64:/usr/local/nccl2/lib:/usr/local/cuda/extras/CUPTI/lib64:/usr/lib/x86_64-linux-gnu/:/opt/conda/lib
2025-07-28 13:52:55.731426: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local

In [2]:

# --- 1. Check TensorFlow Version ---
tf_version = tf.__version__
logger.info(f"TensorFlow version: {tf_version}")

2025-07-28 13:53:02,051 - INFO - TensorFlow version: 2.11.0


In [3]:

# --- 2. Check GPU Availability ---
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    logger.info(f"GPUs available: {len(gpus)}")
    for gpu in gpus:
        logger.info(f"  - {gpu}")
    # Set TensorFlow to use GPU memory growth to avoid allocating all memory at once
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logger.info("GPU memory growth set to True.")
    except RuntimeError as e:
        logger.error(f"Error setting GPU memory growth: {e}")
else:
    logger.warning("No GPUs found. TensorFlow will run on CPU.")


2025-07-28 13:53:07.021229: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/cuda/lib64:/usr/local/nccl2/lib:/usr/local/cuda/extras/CUPTI/lib64:/usr/lib/x86_64-linux-gnu/:/opt/conda/lib
2025-07-28 13:53:07.021273: W tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:265] failed call to cuInit: UNKNOWN ERROR (303)
2025-07-28 13:53:07.021299: I tensorflow/compiler/xla/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (spark-workbench): /proc/driver/nvidia/version does not exist


In [4]:
# --- 3. Perform a Simple TensorFlow Operation (CPU/GPU agnostic) ---
try:
    # Create two constant tensors
    a = tf.constant([[1.0, 2.0], [3.0, 4.0]])
    b = tf.constant([[5.0, 6.0], [7.0, 8.0]])

    # Perform matrix multiplication
    c = tf.matmul(a, b)

    logger.info(f"Tensor 'a':\n{a.numpy()}")
    logger.info(f"Tensor 'b':\n{b.numpy()}")
    logger.info(f"Result of matrix multiplication (c):\n{c.numpy()}")

    # Verify a simple calculation
    expected_c = np.array([[19., 22.], [43., 50.]])
    if np.array_equal(c.numpy(), expected_c):
        logger.info("Simple matrix multiplication test PASSED.")
    else:
        logger.error("Simple matrix multiplication test FAILED: Unexpected result.")

except Exception as e:
    logger.exception(f"An error occurred during TensorFlow operation:", exc_info=True)
    logger.error("TensorFlow basic test FAILED.")
    exit(1) # Exit with an error code


2025-07-28 13:53:26.890799: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2025-07-28 13:53:26,946 - INFO - Tensor 'a':
[[1. 2.]
 [3. 4.]]
2025-07-28 13:53:26,948 - INFO - Tensor 'b':
[[5. 6.]
 [7. 8.]]
2025-07-28 13:53:26,950 - INFO - Result of matrix multiplication (c):
[[19. 22.]
 [43. 50.]]
2025-07-28 13:53:26,951 - INFO - Simple matrix multiplication test PASSED.


In [5]:
# --- 4. Simple Model Definition and Summary (Optional, but good for ML setup) ---
try:
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(10, activation='relu', input_shape=(5,)),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer='adam', loss='binary_crossentropy')
    model.summary(print_fn=logger.info) # Print summary to logs
    logger.info("Keras Sequential model defined and compiled successfully.")
except Exception as e:
    logger.error(f"Error defining Keras model: {e}")
    logger.error("TensorFlow Keras model test FAILED.")

logger.info("TensorFlow basic test completed successfully.")

2025-07-28 13:53:37,069 - INFO - Model: "sequential"
2025-07-28 13:53:37,070 - INFO - _________________________________________________________________
2025-07-28 13:53:37,071 - INFO -  Layer (type)                Output Shape              Param #   
2025-07-28 13:53:37,073 - INFO -  dense (Dense)               (None, 10)                60        
2025-07-28 13:53:37,074 - INFO -                                                                  
2025-07-28 13:53:37,075 - INFO -  dense_1 (Dense)             (None, 1)                 11        
2025-07-28 13:53:37,076 - INFO -                                                                  
2025-07-28 13:53:37,078 - INFO - Total params: 71
2025-07-28 13:53:37,079 - INFO - Trainable params: 71
2025-07-28 13:53:37,080 - INFO - Non-trainable params: 0
2025-07-28 13:53:37,081 - INFO - _________________________________________________________________
2025-07-28 13:53:37,081 - INFO - Keras Sequential model defined and compiled successfully.
20

# Classes and methods definition

In [2]:
tf.config.threading.set_intra_op_parallelism_threads(14)
tf.config.threading.set_inter_op_parallelism_threads(2)

In [3]:
def elu_plus_one_plus_epsilon(x):
    return tf.keras.activations.elu(x) + 1 + tf.keras.backend.epsilon()

class MixtureDensityOutput(tf.keras.layers.Layer):
    def __init__(self, output_dimension, num_mixtures, **kwargs):
        super().__init__(**kwargs)
        self.output_dim = output_dimension
        self.num_mix = num_mixtures
        self.mdn_mus = tf.keras.layers.Dense(
            self.num_mix * self.output_dim, name="mdn_mus"
        )  # mix*output vals, no activation
        self.mdn_sigmas = tf.keras.layers.Dense(
            self.num_mix * self.output_dim,
            activation=elu_plus_one_plus_epsilon,
            name="mdn_sigmas",
        )  # mix*output vals exp activation
        self.mdn_pi = tf.keras.layers.Dense(self.num_mix, name="mdn_pi")  # mix vals, logits

    def build(self, input_shape):
        self.mdn_mus.build(input_shape)
        self.mdn_sigmas.build(input_shape)
        self.mdn_pi.build(input_shape)
        super().build(input_shape)

    @property
    def trainable_weights(self):
        return (
            self.mdn_mus.trainable_weights
            + self.mdn_sigmas.trainable_weights
            + self.mdn_pi.trainable_weights
        )

    @property
    def non_trainable_weights(self):
        return (
            self.mdn_mus.non_trainable_weights
            + self.mdn_sigmas.non_trainable_weights
            + self.mdn_pi.non_trainable_weights
        )

    def call(self, x, mask=None):
        return tf.keras.layers.concatenate(
            [self.mdn_mus(x), self.mdn_sigmas(x), self.mdn_pi(x)], name="mdn_outputs"
        )

In [4]:
class MDNLayer(tf.keras.layers.Layer):
    def __init__(self, num_mixtures, output_dim):
        super(MDNLayer, self).__init__()
        self.num_mixtures = num_mixtures
        self.output_dim = output_dim
        
        # Layer to predict mixture weights (softmax for probabilities)
        self.dense_weights = tf.keras.layers.Dense(num_mixtures, activation='softmax')
        
        # Layer to predict means for each mixture component
        self.dense_means = tf.keras.layers.Dense(num_mixtures * output_dim)
        
        # Layer to predict standard deviations for each mixture component
        self.dense_stds = tf.keras.layers.Dense(num_mixtures * output_dim, activation='softplus')
    
    def call(self, inputs):
        # Predict the mixture parameters
        # weights = tf.nn.log_softmax(self.dense_weights(inputs))
        weights = self.dense_weights(inputs)
        means = self.dense_means(inputs)
        stds = self.dense_stds(inputs)
        
        # Reshape means and stds to match the number of mixtures and output dimension
        means = tf.reshape(means, (-1, self.num_mixtures, self.output_dim))
        stds = tf.reshape(stds, (-1, self.num_mixtures, self.output_dim))
        
        return weights, means, stds

In [5]:
def get_lstm_model(timesteps, num_features, num_output_features, num_mixtures, units=64, output_steps=14, conv_kernel=3):
    LSTM_model = tf.keras.Sequential()
    LSTM_model.add(tf.keras.layers.Input((timesteps, num_features)))
    LSTM_model.add(tf.keras.layers.BatchNormalization())
    LSTM_model.add(tf.keras.layers.Conv1D(filters=units, kernel_size=conv_kernel, strides=1, padding='same'))
    # LSTM_model.add(tf.keras.layers.Dropout(0.2))
    LSTM_model.add(tf.keras.layers.BatchNormalization())
    LSTM_model.add(tf.keras.layers.LSTM(units=units, return_sequences=True))
    LSTM_model.add(tf.keras.layers.BatchNormalization())
    # LSTM_model.add(tf.keras.layers.Dropout(0.2))
    LSTM_model.add(tf.keras.layers.LSTM(units=int(units/2)))
    LSTM_model.add(tf.keras.layers.BatchNormalization())
    # LSTM_model.add(tf.keras.layers.Dropout(0.2))
    LSTM_model.add(MixtureDensityOutput(num_output_features, num_mixtures))
    LSTM_model.add(tf.keras.layers.BatchNormalization())
    # LSTM_model.add((tf.keras.layers.Dense(int(units/4), activation='relu')))
    # LSTM_model.add(tf.keras.layers.BatchNormalization())
    LSTM_model.add((tf.keras.layers.Dense(num_output_features, activation='sigmoid')))

    return LSTM_model

In [6]:
class FeedBack(tf.keras.Model):

    def __init__(self, units, 
                 out_steps, 
                 num_features, 
                 timesteps, 
                 num_output_features=1, 
                 num_mixtures=5, 
                 stochastic=True, 
                 feature_extraction=True,
                 kernel=3):
        
        super().__init__()
        self.num_output_features = num_output_features
        self.num_mixtures = num_mixtures
        self.num_features = num_features
        self.timesteps = timesteps
        self.out_steps = out_steps
        self.units = units
        self.stochastic = stochastic
        self.conv_filters = units
        self.feature_extraction = feature_extraction
        
        self.cnn_layer = tf.keras.layers.Conv1D(
            filters=self.conv_filters,            # Number of filters (reduce feature space)
            kernel_size=kernel,         # Kernel size (adjust based on input characteristics)
            strides=1,             # Stride size (keep it 1 to preserve sequence length)
            padding='same',        # Keep the timesteps dimension unchanged
            activation='relu'      # Non-linear activation function
        )
        self.lstm_cell = tf.keras.layers.LSTMCell(units)
        # Also wrap the LSTMCell in an RNN to simplify the `warmup` method.
        self.lstm_rnn = tf.keras.layers.RNN(
            self.lstm_cell, return_state=True, unroll=True
        )
        # self.dense = tf.keras.layers.Dense(num_output_features, activation="sigmoid")
        # self.output_layer = tf.keras.layers.Dense(num_output_features, activation=None)
        self.mdn_layer = MDNLayer(self.num_mixtures, num_output_features)
    
    def warmup(self, inputs):
        # inputs.shape :> (batch, time, features)
        # x.shape :> (batch, lstm_units)
        x, *state = self.lstm_rnn(inputs)
        # predictions.shape => (batch, features)
        # prediction = self.dense(x)
        # print(x.shape)
        weights, means, stds = self.mdn_layer(x)
        # prediction = self.sample_mixture(weights, means, stds)
        # print(prediction.shape)
        return weights, means, stds, state
    
    def sample_mixture(self, weights, means, stds, training):
        """Sample from the mixture of Gaussians"""
        # Sample a component index based on mixture weights

        if self.stochastic:
            component_idx = tf.random.categorical(tf.math.log(weights), 1)
            component_idx = tf.squeeze(component_idx, axis=-1)  # shape (batch,)
        else:
            component_idx = tf.argmax(weights, axis=-1)
            # component_idx = tf.expand_dims(component_idx, axis=-1)  # Ensure the correct shape
        
        # Gather the means and stds based on the sampled component index
        selected_means = tf.gather(means, component_idx, batch_dims=1)  # shape (batch, output_dim)
        selected_stds = tf.gather(stds, component_idx, batch_dims=1)  # shape (batch, output_dim)
        
        # Sample from the selected Gaussian (Normal distribution)
        if self.stochastic:
            sampled_values = selected_means + selected_stds * tf.random.normal(tf.shape(selected_means))
        else:
            sampled_values = selected_means
        
        return sampled_values

    def call(self, inputs, training=None):
        # Use a TensorArray to capture dynamically unrolled outputs.
        all_predictions = []
        if self.feature_extraction:
            inputs = self.cnn_layer(inputs)
        # Initialize the LSTM state.
        weights, means, stds, state = self.warmup(inputs)
        weights_reshaped = tf.expand_dims(weights, axis=-1)
        # print('weights reshaped', weights_reshaped.shape)
        # print('weights', weights.shape)
        # print('means', means.shape)
        # print('stds', stds.shape)
        # Insert the first prediction.
        all_predictions.append(tf.concat([weights_reshaped, means, stds], axis=1))

        # Run the rest of the prediction steps.
        for n in range(1, self.out_steps):
            # Use the last prediction as input.
            x = self.sample_mixture(weights, means, stds, training)
            x = tf.repeat(x, self.units, axis=-1)
            # Execute one lstm step.
            x, state = self.lstm_cell(x, states=state, training=training)
            # Convert the lstm output to a prediction.
            weights, means, stds = self.mdn_layer(x)
            weights_reshaped = tf.expand_dims(weights, axis=-1)
            # print('weights', weights_reshaped.shape)
            # print('means', means.shape)
            # print('stds', stds.shape)
            # prediction = self.sample_mixture(weights, means, stds)
            # Add the prediction to the output.
            all_predictions.append(tf.concat([weights_reshaped, means, stds], axis=1))

        # predictions.shape => (time, batch, features)
        # print('concat', all_predictions[0].shape)
        predictions = tf.concat(all_predictions, axis=-1)
        # print('stack', predictions.shape)
        # predictions.shape => (batch, time, features)
        predictions = tf.transpose(predictions, [0, 2, 1])
        # print('transpose', predictions.shape)
        return predictions
    
    def load_weights_and_build(self, weights_path):
        self.build(input_shape=(None, self.timesteps, self.num_features))
        self.load_weights(weights_path)

In [7]:
class FeedBackAttetion(tf.keras.Model):

    def __init__(self, units, out_steps, num_features, timesteps, num_output_features=1, num_mixtures=5, stochastic=True, feature_extraction=True, attention_heads=3):
        super().__init__()
        self.num_output_features = num_output_features
        self.num_mixtures = num_mixtures
        self.num_features = num_features
        self.timesteps = timesteps
        self.out_steps = out_steps
        self.units = units
        self.stochastic = stochastic
        self.conv_filters = units
        self.feature_extraction = feature_extraction
        self.attention_heads = attention_heads
        
        # self.cnn_dropout = tf.keras.layers.Dropout(0.6)
        self.cnn_layer = tf.keras.layers.Conv1D(
            filters=self.conv_filters,            # Number of filters (reduce feature space)
            kernel_size=3,         # Kernel size (adjust based on input characteristics)
            strides=1,             # Stride size (keep it 1 to preserve sequence length)
            padding='same',        # Keep the timesteps dimension unchanged
            activation='relu',      # Non-linear activation function
            kernel_initializer='zeros',
            # kernel_regularizer=tf.keras.regularizers.L2()
        )
        self.lstm_cell = tf.keras.layers.LSTMCell(units)
        # Also wrap the LSTMCell in an RNN to simplify the `warmup` method.
        self.lstm_rnn = tf.keras.layers.RNN(
            self.lstm_cell, return_state=True, unroll=True
        )

        self.attention_layer = tf.keras.layers.MultiHeadAttention(num_heads=self.attention_heads, 
                                                                  key_dim=self.out_steps,
                                                                  kernel_initializer='zeros',
                                                                #   kernel_regularizer=tf.keras.regularizers.L2(),
                                                                #   dropout=0.6
                                                                  )

        # self.dense = tf.keras.layers.Dense(num_output_features, activation="sigmoid")
        # self.output_layer = tf.keras.layers.Dense(num_output_features, activation=None)
        self.mdn_layer = MDNLayer(self.num_mixtures, num_output_features)
    
    def warmup(self, inputs):
        # inputs.shape => (batch, time, features)
        # x.shape => (batch, lstm_units)
        # print("input", inputs.shape)
        x, *state = self.lstm_rnn(inputs)
        # predictions.shape => (batch, features)
        # prediction = self.dense(x)
        x = tf.expand_dims(x, axis=1)
        # print("expand", x.shape)
        x = self.attention_layer(query=x, value=x, key=x)
        # print("attention", x.shape)
        weights, means, stds = self.mdn_layer(x)
        # prediction = self.sample_mixture(weights, means, stds)
        # print(prediction.shape)
        return weights, means, stds, state
    
    def sample_mixture(self, weights, means, stds, training):
        """Sample from the mixture of Gaussians"""
        # Sample a component index based on mixture weights

        if training == True and self.stochastic:
            component_idx = tf.random.categorical(tf.math.log(weights), 1)
            component_idx = tf.squeeze(component_idx, axis=-1)  # shape (batch,)
        else:
            component_idx = tf.argmax(weights, axis=-1)
            # component_idx = tf.expand_dims(component_idx, axis=-1)  # Ensure the correct shape
        
        # Gather the means and stds based on the sampled component index
        selected_means = tf.gather(means, component_idx, batch_dims=1)  # shape (batch, output_dim)
        selected_stds = tf.gather(stds, component_idx, batch_dims=1)  # shape (batch, output_dim)
        
        # Sample from the selected Gaussian (Normal distribution)
        if training == True and self.stochastic:
            sampled_values = selected_means + selected_stds * tf.random.normal(tf.shape(selected_means))
        else:
            sampled_values = selected_means
        
        return sampled_values

    def call(self, inputs, training=None):
        # Use a TensorArray to capture dynamically unrolled outputs.
        all_predictions = []
        if self.feature_extraction:
            inputs = self.cnn_layer(inputs)
            # inputs = self.cnn_dropout(inputs)
        # Initialize the LSTM state.
        weights, means, stds, state = self.warmup(inputs)
        weights_reshaped = tf.reshape(weights, (-1, weights.shape[-1], 1))
        # weights_reshaped = tf.expand_dims(weights, axis=-1)
        # print('weights', weights.shape)
        # print('weightsreshape', weights_reshaped.shape)
        # print('means', means.shape)
        # print('stds', stds.shape)
        # Insert the first prediction.
        all_predictions.append(tf.concat([weights_reshaped, means, stds], axis=1))

        # Run the rest of the prediction steps.
        for n in range(1, self.out_steps):
            # Use the last prediction as input.
            x = self.sample_mixture(tf.reshape(weights, (-1, weights.shape[-1])), means, stds, training)
            x = tf.repeat(x, self.units, axis=-1)
            # Execute one lstm step.
            x = tf.reshape(x, (-1, x.shape[-1]))
            # print("cell", x.shape)
            x, state = self.lstm_cell(x, states=state, training=training)
            # Convert the lstm output to a prediction.
            x = tf.expand_dims(x, axis=1)
            x = self.attention_layer(query=x, value=x, key=x)
            weights, means, stds = self.mdn_layer(x)
            # weights_reshaped = tf.expand_dims(weights, axis=-1)
            # print('weights', weights_reshaped.shape)
            # print('means', means.shape)
            # print('stds', stds.shape)
            # prediction = self.sample_mixture(weights, means, stds)
            # Add the prediction to the output.
            weights_reshaped = tf.reshape(weights, (-1, weights.shape[-1], 1))
            all_predictions.append(tf.concat([weights_reshaped, means, stds], axis=1))

        # predictions.shape => (time, batch, features)
        # print('concat', all_predictions[0].shape)
        predictions = tf.concat(all_predictions, axis=-1)
        # print('stack', predictions.shape)
        # predictions.shape => (batch, time, features)
        predictions = tf.transpose(predictions, [0, 2, 1])
        # print('transpose', predictions.shape)
        return predictions
    
    def load_weights_and_build(self, weights_path):
        self.build(input_shape=(None, self.timesteps, self.num_features))
        self.load_weights(weights_path)

In [8]:
def f1_loss(y_true, y_pred):
    y_true = tf.reduce_max(y_true, axis=1, keepdims=True)
    
    tn = tf.keras.backend.sum(tf.keras.backend.cast(y_true*y_pred, 'float'), axis=0)
    tp = tf.keras.backend.sum(tf.keras.backend.cast((1-y_true)*(1-y_pred), 'float'), axis=0)
    fn = tf.keras.backend.sum(tf.keras.backend.cast((1-y_true)*y_pred, 'float'), axis=0)
    fp = tf.keras.backend.sum(tf.keras.backend.cast(y_true*(1-y_pred), 'float'), axis=0)

    p = tp / (tp + fp + tf.keras.backend.epsilon())
    r = tp / (tp + fn + tf.keras.backend.epsilon())

    f1 = 2*p*r / (p+r+tf.keras.backend.epsilon())
    f1 = tf.where(tf.math.is_nan(f1), tf.zeros_like(f1), f1)
    return 1 - tf.keras.backend.mean(f1)

In [9]:
def accuracy_loss(y_true, y_pred):
    y_true = tf.reduce_max(y_true, axis=1, keepdims=True)
    loss = tf.abs(y_true - y_pred)
    return tf.reduce_mean(loss)

In [10]:
def get_custom_accuracy_loss(num_mixtures, output_features):
    def accuracy_loss(y_true, y_pred):
        def sample_mean(weights, means):
            """Sample from the mixture of Gaussians"""
            component_idx = tf.argmax(weights, axis=-1)
            selected_means = tf.gather(means, component_idx, batch_dims=2)  # shape (batch, output_dim)
            return selected_means
        
        window =  sample_mean(
                y_pred[:, :, :num_mixtures],
                y_pred[:, :, num_mixtures:num_mixtures + num_mixtures*output_features]
            ) # l'accesso con iteratore serve perché sample_mixture ritorna un array con shape (batch, output_features)
        
        window = tf.reshape(window, (-1, window.shape[1], 1))
        window = tf.reduce_max(window, axis=1, keepdims=True)
        y_true = tf.reduce_max(y_true, axis=1, keepdims=True)
        loss = tf.abs(y_true - window)
        return tf.reduce_mean(loss)
    
    return accuracy_loss

In [11]:
def get_custom_loss(num_mixtures, output_features):
    def custom_loss(y_true, y_pred):
        def sample_mean(weights, means):
            """Sample from the mixture of Gaussians"""
            component_idx = tf.argmax(weights, axis=-1)            
            selected_means = tf.gather(means, component_idx, batch_dims=2)  # shape (batch, output_dim)
            
            return selected_means
        
        window =  sample_mean(
                y_pred[:, :, :num_mixtures],
                y_pred[:, :, num_mixtures:num_mixtures + num_mixtures*output_features]
            ) # l'accesso con iteratore serve perché sample_mixture ritorna un array con shape (batch, output_features)
        
        window = tf.reshape(window, (-1, window.shape[1], 1))
        absolute_error = tf.abs(y_true - window)
        loss = tf.reduce_sum(absolute_error, axis=1) / tf.cast(tf.shape(y_true)[1], tf.float32)
        return tf.reduce_mean(loss)
    
    return custom_loss

In [12]:
def get_combined_custom_loss(num_mixtures, output_features):
    def combined_custom_loss(y_true, y_pred):
        def sample_mean(weights, means):
            """Sample from the mixture of Gaussians"""
            component_idx = tf.argmax(weights, axis=-1)            
            # Gather the means and stds based on the sampled component index
            selected_means = tf.gather(means, component_idx, batch_dims=2)  # shape (batch, output_dim)
            
            return selected_means
        
        window =  sample_mean(
                y_pred[:, :, :num_mixtures],
                y_pred[:, :, num_mixtures:num_mixtures + num_mixtures*output_features]
            ) # l'accesso con iteratore serve perché sample_mixture ritorna un array con shape (batch, output_features)
        
        window = tf.reshape(window, (-1, window.shape[1], 1))
        absolute_error = tf.abs(y_true - window)
        mae = tf.reduce_sum(absolute_error, axis=1) / tf.cast(tf.shape(y_true)[1], tf.float32)

        window = tf.reduce_max(window, axis=1, keepdims=True)
        y_true = tf.reduce_max(y_true, axis=1, keepdims=True)
        acc = tf.abs(y_true - window)

        return mae - (1 - acc) * 0.2
    
    return combined_custom_loss

In [13]:
def get_bce_loss(num_mixtures, output_features):
    def bce_loss(y_true, y_pred):
        def sample_mean(weights, means):
            """Sample from the mixture of Gaussians"""
            component_idx = tf.argmax(weights, axis=-1)            
            # Gather the means and stds based on the sampled component index
            selected_means = tf.gather(means, component_idx, batch_dims=2)  # shape (batch, output_dim)
            
            return selected_means
        
        window =  sample_mean(
                y_pred[:, :, :num_mixtures],
                y_pred[:, :, num_mixtures:num_mixtures + num_mixtures*output_features]
            ) # l'accesso con iteratore serve perché sample_mixture ritorna un array con shape (batch, output_features)

        loss = tf.keras.losses.Hinge()(y_true, window)
        return loss
    
    return bce_loss

In [14]:
def get_mdn_loss(num_mixtures, output_dim):
    def mdn_loss(y_true, y_pred):
        """
        Compute the MDN loss using the log-likelihood of the true values 
        under the predicted mixture of Gaussians.

        y_true: True target values (batch_size, time_steps, output_dim)
        y_pred: Predicted values from the model (batch_size, time_steps, num_mixtures * output_dim * 2 + num_mixtures)
        num_mixtures: Number of mixtures (K)
        output_dim: Dimension of the output (D)
        """
        # Extract mixture weights, means, and stds from the predicted tensor
        weights = y_pred[:, :, :num_mixtures]
        means = y_pred[:, :, num_mixtures:num_mixtures + num_mixtures * output_dim]
        stds = y_pred[:, :, num_mixtures + num_mixtures * output_dim:]
        
        # Reshape means and stds to match (batch_size, time_steps, num_mixtures, output_dim)
        means = tf.reshape(means, (-1, tf.shape(means)[1], num_mixtures, output_dim))
        stds = tf.reshape(stds, (-1, tf.shape(stds)[1], num_mixtures, output_dim))

        # Compute the probability density for each Gaussian component
        # For each mixture, calculate the probability of the true value given the mean and std
        # norm_dist = tfd.Normal(loc=means, scale=stds)
        norm_dist = tf.compat.v1.distributions.Normal(loc=means, scale=stds)
        log_prob = norm_dist.log_prob(tf.expand_dims(y_true, axis=-2))  # (batch_size, time_steps, num_mixtures, output_dim)

        # Sum over the output dimension (to integrate over all the Gaussians in the mixture)
        log_prob_sum = tf.reduce_sum(log_prob, axis=-1)  # (batch_size, time_steps, num_mixtures)

        # Apply the mixture weights and compute the weighted log-likelihood
        weighted_log_prob = log_prob_sum + tf.math.log(weights)  # (batch_size, time_steps, num_mixtures)

        # Sum over all mixtures to get the total log-likelihood for each sample
        total_log_prob = tf.reduce_logsumexp(weighted_log_prob, axis=-1)  # (batch_size, time_steps)

        # Return the negative log-likelihood (since we minimize loss)
        return -tf.reduce_mean(total_log_prob)
    
    return mdn_loss
def get_mixture_loss_func(output_dim, num_mixes):
    def mdn_loss_func(y_true, y_pred):
        # Reshape inputs in case this is used in a TimeDistributed layer
        y_pred = tf.reshape(
            y_pred,
            [-1, (2 * num_mixes * output_dim) + num_mixes],
            name="reshape_ypreds",
        )
        y_true = tf.reshape(y_true, [-1, output_dim], name="reshape_ytrue")
        # Split the inputs into parameters
        out_mu, out_sigma, out_pi = tf.split(
            y_pred,
            num_or_size_splits=[
                num_mixes * output_dim,
                num_mixes * output_dim,
                num_mixes,
            ],
            axis=-1,
            name="mdn_coef_split",
        )
        # Construct the mixture models
        cat = tf.compat.v1.Categorical(logits=out_pi)
        component_splits = [output_dim] * num_mixes
        mus = tf.split(out_mu, num_or_size_splits=component_splits, axis=1)
        sigs = tf.split(out_sigma, num_or_size_splits=component_splits, axis=1)
        coll = [
            tf.compat.v1.MultivariateNormalDiag(loc=loc, scale_diag=scale)
            for loc, scale in zip(mus, sigs)
        ]
        mixture = tf.compat.v1.Mixture(cat=cat, components=coll)
        loss = mixture.log_prob(y_true)
        loss = tf.negative(loss)
        loss = tf.reduce_mean(loss)
        return loss

    return mdn_loss_func
# Custom callbacks
class CustomModelCheckpoint(tf.keras.callbacks.Callback):
    def __init__(self, filepath, monitor='val_loss', save_best_only=True, mode='min', verbose=1, 
                 initial_train_loss=float('inf'), initial_val_loss=float('inf'), save_weights_only=True, min_loss_delta=None):
        super(CustomModelCheckpoint, self).__init__()
        self.filepath = filepath
        self.monitor = monitor
        self.save_best_only = save_best_only
        self.mode = mode
        self.verbose = verbose
        # Initializing best loss values with the provided thresholds
        self.best_train_loss = initial_train_loss
        self.best_val_loss = initial_val_loss
        self.save_weights_only = save_weights_only
        self.min_loss_delta = min_loss_delta
        self.last_improvement_epoch = 0

    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}
        train_loss = logs.get('loss')
        val_loss = logs.get(self.monitor)

        if train_loss is None or val_loss is None:
            return

        # Check if both training and validation loss have improved
        if self.mode == 'min':
            if ((self.min_loss_delta is None and val_loss < self.best_val_loss) 
                or (self.min_loss_delta is not None and val_loss < (self.best_val_loss - self.min_loss_delta))):
                if self.verbose > 0:
                    print(f"\nEpoch {epoch+1}: Val loss improved, from {self.best_val_loss} to {val_loss}")
                self.best_train_loss = train_loss
                self.best_val_loss = val_loss
                if self.save_weights_only:
                    self.model.save_weights(self.filepath)
                else:
                    self.model.save(self.filepath)
            elif train_loss < self.best_train_loss and self.best_val_loss < val_loss:
                if self.verbose > 0:
                    print(f"\nEpoch {epoch+1}: Train loss improved, from {self.best_train_loss} to {train_loss}")
                self.best_train_loss = train_loss
                self.best_val_loss = val_loss
                if self.save_weights_only:
                    self.model.save_weights(self.filepath)
                else:
                    self.model.save(self.filepath)
            else:
                if self.verbose > 0:
                    print(f"\nEpoch {epoch+1}: Train and val losses didn't improve. Best val {self.best_val_loss}, Best Train {self.best_train_loss}")
                    self.model.load_weights(self.filepath)
        elif self.mode == 'max':
            raise ValueError("Mode 'max' is not supported for this callback. Only 'min' mode is supported.")
class RestoreBestWeightsCallback(tf.keras.callbacks.Callback):
    def __init__(self, checkpoint_filepath):
        self.checkpoint_filepath = checkpoint_filepath
        self.best_val_loss = math.inf
        self.last_improved_epoch = 0

    def on_epoch_end(self, epoch, logs=None):
        val_loss = logs.get("val_loss")
        if val_loss < self.best_val_loss:
            self.last_improved_epoch = epoch + 1
            self.best_val_loss = val_loss

        # Load the best weights at the end of each epoch
        if epoch > 0:  # Avoid loading weights at the very first epoch
            self.model.load_weights(self.checkpoint_filepath)
            print(f"Epoch {epoch+1}: Restoring model weights from {self.checkpoint_filepath} of epoch {self.last_improved_epoch} with val loss {self.best_val_loss}, after epoch {epoch+1}.")
class AdaptiveLRScheduler(tf.keras.callbacks.Callback):
    def __init__(self, patience=5, increase_factor=1.5, decrease_factor=0.5, min_lr=1e-6, min_delta=0.1):
        super().__init__()
        self.patience = patience  # Number of epochs with no improvement before decreasing LR
        self.increase_factor = increase_factor  # Factor by which to increase LR
        self.decrease_factor = decrease_factor  # Factor by which to decrease LR
        self.min_lr = min_lr  # Minimum value to which LR can decrease
        self.best_loss = np.inf
        self.epochs_since_improvement = 0
        self.epochs_with_slow_improvement = 0
        self.min_delta = min_delta

    def on_epoch_end(self, epoch, logs=None):
        current_loss = logs.get("val_loss")
        
        # If there's improvement, reset the "no improvement" counter and save the best loss
        if current_loss < self.best_loss:
            if self.best_loss - current_loss <= self.min_delta:
                self.epochs_with_slow_improvement += 1
            else:
                self.epochs_with_slow_improvement = 0

            self.best_loss = current_loss
            self.epochs_since_improvement = 0
        else:
            self.epochs_since_improvement += 1
            self.epochs_with_slow_improvement = 0
        
        # Apply learning rate adjustment
        current_lr = self.model.optimizer.lr.numpy()

        if self.epochs_since_improvement >= self.patience:
            # No improvement for `patience` epochs, reduce learning rate
            new_lr = max(current_lr * self.decrease_factor, self.min_lr)
            self.model.optimizer.lr.assign(new_lr)
            print(f"Epoch {epoch+1}: Reducing learning rate to {new_lr:.6f}")
            self.epochs_since_improvement = 0  # Reset after decreasing LR
        elif self.epochs_with_slow_improvement >= self.patience:
            # If there has been improvement, we may want to increase the learning rate
            # We only increase the LR if the rate of progress is slow (i.e., if no major improvement over the last few epochs)
            new_lr = current_lr * self.increase_factor
            self.model.optimizer.lr.assign(new_lr)
            self.epochs_with_slow_improvement = 0
            print(f"Epoch {epoch+1}: Increasing learning rate to {new_lr:.6f}")
        else:
            print(f"Epoch {epoch+1}: No variation to the learning rate {current_lr:.6f}; incresing countdown {(self.patience-self.epochs_with_slow_improvement)} - decreasing countdown {self.patience-self.epochs_since_improvement}")
# Compile & Fit
MAX_EPOCHS = 150

def compile_and_fit(
    model,
    validation_set,
    x_training_set=None,
    y_training_set=None,
    batch_size=1,
    training_data_generator=None,
    shuffle=False,
    checkpoint_threshold=math.inf,
    best_weights_path="Dati/USE_CASE/pChurn/best_models/model.weights.h5",
    initial_lr=1e-3
):
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor="val_loss",
        patience=5,
        min_delta=0.02,
        mode="min",
        restore_best_weights=False,
    )

    # Usage Example:
    # Initialize the custom callback with initial loss thresholds
    custom_checkpoint_callback = CustomModelCheckpoint(
        filepath=base_path+best_weights_path,
        verbose=1,
        min_loss_delta=0.3
        # initial_train_loss=1.5,  # Example initial training loss threshold
        # initial_val_loss=1.5     # Example initial validation loss threshold
    )

    reduce_on_plateau = tf.keras.callbacks.ReduceLROnPlateau(
        monitor="val_loss",
        factor=0.1,
        patience=3,
        verbose=0,
        mode="min",
        min_delta=0.1,
        cooldown=0,
        min_lr=0.0,
    )

    model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
        filepath=base_path+best_weights_path,
        monitor="val_loss",
        save_best_only=True,
        mode="min",
        verbose=1,
        save_weights_only=True,
        initial_value_threshold=checkpoint_threshold
    )

    restore_best_weights = RestoreBestWeightsCallback(base_path+best_weights_path)

    adaptive_lr = AdaptiveLRScheduler(patience=3, 
                                      increase_factor=10, 
                                      decrease_factor=0.1, 
                                      min_lr=1e-10, 
                                      min_delta=0.001)

    model.compile(
        # loss=tf.losses.BinaryCrossentropy(),
        loss=f1_loss,
        # loss=tf.losses.Hinge(),
        # loss=get_mixture_loss_func(1, model.num_mixtures),
        # loss=get_mdn_loss(model.num_mixtures, model.num_output_features),
        # loss=get_bce_loss(model.num_mixtures, model.num_output_features),
        # loss=get_custom_loss(model.num_mixtures, model.num_output_features),
        # loss=get_custom_accuracy_loss(model.num_mixtures, model.num_output_features),
        # loss=accuracy_loss,
        # optimizer=tf.keras.mixed_precision.LossScaleOptimizer(tf.optimizers.Adam(learning_rate=initial_lr, clipvalue=10.0)),
        # optimizer=tf.keras.optimizers.RMSprop(learning_rate=initial_lr),
        optimizer=tf.keras.optimizers.AdamW(learning_rate=initial_lr),
        # optimizer=tf.keras.optimizers.SGD(learning_rate=initial_lr, momentum=0.9),
        # metrics=[tf.keras.losses.MeanAbsoluteError()],
        metrics=[tf.keras.metrics.Recall(), tf.keras.metrics.Precision()],
    )

    if training_data_generator is not None:
        history = model.fit(
            training_data_generator,
            epochs=MAX_EPOCHS,
            validation_data=validation_set,
            shuffle=shuffle,
            callbacks=[model_checkpoint, reduce_on_plateau, early_stopping, restore_best_weights],
        )
    else:
        history = model.fit(
            x=x_training_set,
            y=y_training_set,
            epochs=MAX_EPOCHS,
            batch_size=batch_size,
            validation_data=validation_set,
            shuffle=shuffle,
            callbacks=[model_checkpoint, adaptive_lr, early_stopping, restore_best_weights],
        )
    return history

# Caricamento Dataset con Numpy partizionati da Pyspark


In [2]:
import os
from pyspark.sql import SparkSession

java_options = (
    "--add-opens=java.base/java.lang=ALL-UNNAMED "
    "--add-opens=java.base/java.nio=ALL-UNNAMED "
    "--add-opens=java.base/java.net=ALL-UNNAMED "
    "--add-opens=java.base/java.util=ALL-UNNAMED "
    "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED "
    "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED "
    "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED"
)

def inizialization(notebook, 
                   instances, 
                   cores, 
                   ram,
                   min_instances=1):
    machine_set_up = {
        "svil1": { "appName": "spark-svil1", "spark.driver.port": "8002", "spark.blockManager.port": "8001", },
        "svil2": { "appName": "spark-svil2", "spark.driver.port": "18002", "spark.blockManager.port": "18001", },
        "svil3": { "appName": "spark-svil3", "spark.driver.port": "28002", "spark.blockManager.port": "28001", },
    }

    pod_ip = os.environ.get("POD_IP", "0.0.0.0")

    return (
        SparkSession.builder.master("k8s://https://kubernetes.default.svc:443")
        .appName(machine_set_up[notebook]["appName"])
        .config("spark.submit.deployMode", "client")

        # --- Configurazione Esecutori Kubernetes ---
        .config("spark.kubernetes.namespace", "spark-svil")
        .config("spark.kubernetes.container.image", "europe-west6-docker.pkg.dev/poc-example-ds/pyspark-images/spark-py-worker:latest")
        .config("spark.kubernetes.container.image.pullPolicy", "Always")
        .config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark-sa-svil")
        .config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark-sa-svil")
        .config("spark.executor.memory", f"{ram}g")
        .config("spark.executor.cores", cores)
        .config("spark.kubernetes.executor.nodeSelector.workload", "ssd-shuffle") 
        .config("spark.kubernetes.executor.memoryOverhead", "6g")


        # --- Configurazione Driver ---
        .config("spark.driver.host", pod_ip)
        .config("spark.driver.port", machine_set_up[notebook]["spark.driver.port"])
        .config("spark.driver.bindAddress", "0.0.0.0")
        .config("spark.driver.memory", "8g")
        .config("spark.files.maxPartitionBytes","16777216")
        #.config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
        .config("spark.hadoop.fs.gs.auth.service.account.enable", "true")
        .config("spark.kubernetes.executor.volumes.persistentVolumeClaim.jupyter-workspace.mount.path", "/home/jovyan/work")
        .config("spark.kubernetes.executor.volumes.persistentVolumeClaim.jupyter-workspace.options.claimName", "jupyter-workspace-storage")
        # --- Altre configurazioni ---
        .config("spark.dynamicAllocation.enabled", "true")
        .config("spark.dynamicAllocation.minExecutors", min_instances)
        .config("spark.dynamicAllocation.maxExecutors", instances)
        .config("spark.sql.execution.arrow.pyspark.enabled", "true")
        .config("spark.driver.local.dir", "/tmp/spark-driver") 

       # --- CONFIGURAZIONE SSD PER EXECUTOR ---
        # Dice agli executor di usare l'SSD per lo shuffle
        .config("spark.executor.extraClassPath", "/mnt/disks/ssd0") # Legacy, ma aiuta
        .config("spark.kubernetes.executor.local.dirs.tmpfs", "/mnt/disks/ssd0") # Metodo Kubernetes
        # L'init container per impostare i permessi sull'SSD degli executor
        .config("spark.kubernetes.executor.initContainer.image", "busybox:1.28")
        .config("spark.kubernetes.executor.initContainer.command", '["/bin/sh", "-c", "chmod 777 /mnt/disks/ssd0"]')
        #.config("spark.memory.fraction", 0.8) # Usa l'80% della memoria per Spark
        #.config("spark.memory.storageFraction", 0.3) # Di cui, solo il 30% per il cache
        .config("spark.sql.shuffle.partitions", 200)
        .config("spark.shuffle.file.buffer", "1m") # Aumenta il buffer a 1MB (default 32k)
        # Nel tuo builder della SparkSession
        #.config("spark.sql.adaptive.coalescePartitions.parallelismFirst", "true")
        .config("spark.sql.adaptive.enabled", "true")  # Assicura che sia attivo
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") # Ottimizzazione aggiuntiva
        .config("spark.sql.adaptive.skewJoin.enabled", "true") # Gestisce lo skew specificamente per i join
       .getOrCreate()
    )
spark = inizialization(
    "svil3",
    instances=28,
    cores=7,
    ram=48,
    min_instances=28
)

25/07/28 09:00:09 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
import sys
base_path = "gs://poc-example-dati-dev/" #"/opt/spark/work-dir/" -> Uppato a Google Cloud Storage
sys.path.append(base_path)

df_prefix = "time_series"
df_train = spark.read.parquet(base_path + f"Dati/USE_CASE/pChurn/{df_prefix}_df_train.parquet/")
df_val = spark.read.parquet(base_path + f"Dati/USE_CASE/pChurn/{df_prefix}_df_val.parquet/")

df_train = df_train.withColumn("path", F.lit(f"{base_path}Dati/USE_CASE/pChurn/numpy_partitions/{df_prefix}/train"))
df_train = df_train.withColumn("row_idx", F.monotonically_increasing_id())
df_train = df_train.withColumn("partition_idx", F.spark_partition_id())
df_val = df_val.withColumn("path", F.lit(f"{base_path}Dati/USE_CASE/pChurn/numpy_partitions/{df_prefix}/val"))
df_val = df_val.withColumn("row_idx", F.monotonically_increasing_id())
df_val = df_val.withColumn("partition_idx", F.spark_partition_id())

if os.path.exists(f"{base_path}Dati/USE_CASE/pChurn/numpy_partitions/{df_prefix}/train/"):
    shutil.rmtree(f"{base_path}Dati/USE_CASE/pChurn/numpy_partitions/{df_prefix}/train/")
if os.path.exists(f"{base_path}Dati/USE_CASE/pChurn/numpy_partitions/{df_prefix}/val/"):
    shutil.rmtree(f"{base_path}Dati/USE_CASE/pChurn/numpy_partitions/{df_prefix}/val/")

os.makedirs(f"{base_path}Dati/USE_CASE/pChurn/numpy_partitions/{df_prefix}/train", exist_ok=True)
os.makedirs(f"{base_path}Dati/USE_CASE/pChurn/numpy_partitions/{df_prefix}/train/sample", exist_ok=True)
os.makedirs(f"{base_path}Dati/USE_CASE/pChurn/numpy_partitions/{df_prefix}/train/target", exist_ok=True)
os.makedirs(f"{base_path}Dati/USE_CASE/pChurn/numpy_partitions/{df_prefix}/val", exist_ok=True)
os.makedirs(f"{base_path}Dati/USE_CASE/pChurn/numpy_partitions/{df_prefix}/val/sample", exist_ok=True)
os.makedirs(f"{base_path}Dati/USE_CASE/pChurn/numpy_partitions/{df_prefix}/val/target", exist_ok=True)

25/07/28 09:00:27 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
25/07/28 09:00:42 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
25/07/28 09:00:57 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
25/07/28 09:01:12 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
25/07/28 09:01:27 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
25/07/28 09:01:42 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registere

KeyboardInterrupt: 

In [None]:
# --- 1. FUNZIONE PER CONVERTIRE LE PARTIZIONI SPARK IN TENSORI ---
# Questa funzione verrà eseguita in parallelo su ogni worker Spark.
# Il suo scopo è trasformare le righe Spark in tensori NumPy.
def spark_partition_to_tensors(partition_iterator):
    # Converte l'iteratore della partizione in un DataFrame pandas (efficiente a livello di partizione)
    pdf = pd.DataFrame([row.asDict() for row in partition_iterator])
    
    # Se la partizione è vuota, esci.
    if pdf.empty:
        return

    # Estrai le colonne 'sample' e 'target' e convertile in array NumPy pronti per TF
    # np.stack converte una lista di array in un unico array multidimensionale.
    samples = np.stack(pdf["sample"].apply(lambda x: x.toArray()).values)
    targets = np.stack(pdf["target"].values)
    
    # "Yield" crea un generatore, che è efficiente in termini di memoria.
    # Restituisce una singola tupla (samples, targets) per l'intera partizione.
    yield samples, targets

# --- 2. CREA I TF.DATA.DATASET USANDO UN GENERATORE (Metodo Corretto) ---

# Ottieni le specifiche dei dati per aiutare TensorFlow a capire la struttura
# Il primo 'None' rappresenta la dimensione del batch, che può variare.
sample_spec = tf.TensorSpec(shape=(None, 4, 22), dtype=tf.float32)
target_spec = tf.TensorSpec(shape=(None, 2), dtype=tf.float32)
output_signature = (sample_spec, target_spec)

# Crea un "generatore" che tira i dati dai worker una partizione alla volta
# .toLocalIterator() è memory-efficient: non carica tutto il dataset sul driver.
train_generator = lambda: df_train.rdd.mapPartitions(spark_partition_to_tensors).toLocalIterator()

# Crea la pipeline di dati di TensorFlow usando il generatore
# TensorFlow ora chiederà a Spark le partizioni una per una, man mano che servono.
train_dataset = tf.data.Dataset.from_generator(
    train_generator,
    output_signature=output_signature
).prefetch(tf.data.AUTOTUNE)

# Fai lo stesso per il validation set
val_generator = lambda: df_val.rdd.mapPartitions(spark_partition_to_tensors).toLocalIterator()
val_dataset = tf.data.Dataset.from_generator(
    val_generator,
    output_signature=output_signature
).prefetch(tf.data.AUTOTUNE)

print("Pipeline di dati TensorFlow create correttamente in memoria.")

In [None]:
# Otteniamo il conteggio totale dei campioni direttamente dai DataFrame Spark.
# Questa è un'operazione veloce che non causa problemi di memoria.
train_count = df_train.count()
val_count = df_val.count()

print(f"Numero totale di campioni di training: {train_count}")
print(f"Numero totale di campioni di validazione: {val_count}")

# La forma di un singolo campione (es. (4, 22)) è definita dalla logica di preprocessing
print(f"Forma di un singolo campione (sample): (4, 22)")
print(f"Forma di un singolo target: (2, 1)")

25/07/28 09:04:12 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
25/07/28 09:04:27 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
25/07/28 09:04:42 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
25/07/28 09:04:57 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
25/07/28 09:05:12 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
25/07/28 09:05:27 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registere

In [None]:
***

In [None]:

# --- Configuration (Adjust these paths and prefixes) ---

import tensorflow as tf
import os
import shutil
import numpy as np # Assuming you'd convert to numpy arrays if reading directly
import sys
base_path = "gs://poc-example-dati-dev/" #"/opt/spark/work-dir/" -> Uppato a Google Cloud Storage
sys.path.append(base_path)
import tensorflow as tf
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))
import time

print("Timer avviato...")
start_time = time.time()

df_prefix = "time_series"

# --- Define paths for NumPy partitions (where samples/targets will be saved/loaded) ---
# These paths are for saving/loading individual NumPy files, mimicking the Spark output structure.
train_numpy_path = os.path.join(base_path, f"Dati/USE_CASE/pChurn/numpy_partitions/{df_prefix}/train")
val_numpy_path = os.path.join(base_path, f"Dati/USE_CASE/pChurn/numpy_partitions/{df_prefix}/val")

train_sample_dir = os.path.join(train_numpy_path, "sample")
train_target_dir = os.path.join(train_numpy_path, "target")
val_sample_dir = os.path.join(val_numpy_path, "sample")
val_target_dir = os.path.join(val_numpy_path, "target")

# --- 1. Directory Setup (Standard Python - remains similar) ---
print("Setting up directories...")
if os.path.exists(train_numpy_path):
    shutil.rmtree(train_numpy_path)
if os.path.exists(val_numpy_path):
    shutil.rmtree(val_numpy_path)

os.makedirs(train_sample_dir, exist_ok=True)
os.makedirs(train_target_dir, exist_ok=True)
os.makedirs(val_sample_dir, exist_ok=True)
os.makedirs(val_target_dir, exist_ok=True)
print("Directories created.")

# --- 2. Data Simulation (REPLACE WITH ACTUAL TFRECORD LOADING) ---
# IMPORTANT: This section simulates having TFRecord files.
# In a real scenario, you would have converted your Spark Parquet DataFrames
# into TFRecord files in a prior, distributed step (e.g., using Spark, Dataflow, or a custom script).
# Each TFRecord file would contain serialized 'sample' and 'target' features.

# Define the expected shape and dtype of your sample and target features
# (4, 22) for sample, (2,) for target (assuming target is a 1D array of 2 elements)
SAMPLE_SHAPE = (4, 22)
TARGET_SHAPE = (2,)
DTYPE = tf.float32

# Simulate creating some dummy TFRecord files
# In production, these would be generated from your actual data
def _serialize_example(sample_feature, target_feature):
    """
    Creates a tf.train.Example message ready to be written to a file.
    """
    feature = {
        'sample': tf.train.Feature(float_list=tf.train.FloatList(value=sample_feature.flatten())),
        'target': tf.train.Feature(float_list=tf.train.FloatList(value=target_feature.flatten()))
    }
    return tf.train.Example(features=tf.train.Features(feature=feature)).SerializeToString()

def create_dummy_tfrecords(num_records, filename):
    writer = tf.io.TFRecordWriter(filename)
    for i in range(num_records):
        # Simulate different 'target' values for balanced sampling
        target_val = 1.0 if i % 2 == 0 else 0.0 # Roughly 50/50 active/inactive
        sample = np.random.rand(*SAMPLE_SHAPE).astype(np.float32)
        target = np.array([target_val, target_val]).astype(np.float32) # Assuming target is 2 elements
        writer.write(_serialize_example(sample, target))
    writer.close()
    print(f"Created {num_records} dummy TFRecords in {filename}")

# Create dummy TFRecord files for demonstration
dummy_train_tfrecord_path = os.path.join(train_numpy_path, "dummy_train.tfrecord")
dummy_val_tfrecord_path = os.path.join(train_numpy_path, "dummy_val.tfrecord")
create_dummy_tfrecords(10000, dummy_train_tfrecord_path) # Simulate 10,000 records
create_dummy_tfrecords(2000, dummy_val_tfrecord_path)   # Simulate 2,000 records


# --- 3. TensorFlow Data Pipeline for Loading and Transformations ---
print("Building TensorFlow data pipelines...")

def _parse_tfrecord_fn(example_proto):
    """
    Parses a single tf.train.Example proto and returns features and labels.
    """
    feature_description = {
        'sample': tf.io.FixedLenFeature(SAMPLE_SHAPE[0] * SAMPLE_SHAPE[1], tf.float32),
        'target': tf.io.FixedLenFeature(TARGET_SHAPE[0], tf.float32) # Adjusted for target shape
    }
    example = tf.io.parse_single_example(example_proto, feature_description)
    
    # Reshape 'sample' and 'target' to their original dimensions
    sample = tf.reshape(example['sample'], SAMPLE_SHAPE)
    target = tf.reshape(example['target'], TARGET_SHAPE)
    
    return sample, target

def create_tf_dataset(tfrecord_path, is_training=True):
    """
    Creates a tf.data.Dataset from TFRecord files.
    """
    dataset = tf.data.TFRecordDataset(tfrecord_path)
    dataset = dataset.map(_parse_tfrecord_fn, num_parallel_calls=tf.data.AUTOTUNE)
    
    # Simulate Spark's .withColumn("path", ...) and other metadata
    # In tf.data, you usually add metadata as part of the feature set or manage it externally.
    # For 'row_idx' and 'partition_idx', these are often implicit in tf.data.Dataset
    # if you're not explicitly saving them as features.
    # If you need them, you'd add them as features in your TFRecord.
    
    if is_training:
        dataset = dataset.shuffle(buffer_size=1024) # Shuffle for training
    
    dataset = dataset.batch(32) # Example batch size
    dataset = dataset.prefetch(tf.data.AUTOTUNE)
    return dataset

train_dataset = create_tf_dataset(dummy_train_tfrecord_path, is_training=True)
val_dataset = create_tf_dataset(dummy_val_tfrecord_path, is_training=False)

print("TensorFlow data pipelines created correctly.")

# --- 4. Balanced Sampling (Conceptual with tf.data.Dataset) ---
# This is a complex operation for very large, imbalanced datasets in TF.
# For exact balancing like Spark's sample, you might need to filter and combine.
print("\nStarting conceptual balanced sampling with TensorFlow (in memory)...")

def balanced_sample_tf_dataset(dataset, fraction_to_sample=0.05):
    """
    Conceptually samples a TensorFlow dataset to achieve better class balance.
    This assumes 'target' contains a class indicator (e.g., 0 or 1).
    For very large datasets, this might be memory intensive if classes are
    extracted to separate datasets in memory.
    """
    
    # Separate active and inactive samples
    active_dataset = dataset.filter(lambda s, t: tf.reduce_max(t) >= 1)
    inactive_dataset = dataset.filter(lambda s, t: tf.reduce_max(t) < 1)

    # Get counts (requires iterating the dataset, can be slow for large datasets)
    active_count = tf.data.experimental.cardinality(active_dataset).numpy()
    inactive_count = tf.data.experimental.cardinality(inactive_dataset).numpy()
    
    print(f"\n--- Statistics (Original) ---")
    print(f"Active Samples: {active_count}, Inactive Samples: {inactive_count}")

    if active_count == 0 or inactive_count == 0:
        print("Warning: One class has zero samples. Cannot create a balanced sample.")
        return tf.data.Dataset.from_tensor_slices((tf.constant([]), tf.constant([])))

    # Determine the minority class count
    min_class_count = min(active_count, inactive_count)
    
    # Calculate the desired number of samples for each class in the balanced dataset
    # This aims for (fraction_to_sample * total_original_count) / 2 samples per class
    desired_samples_per_class = int(min_class_count * fraction_to_sample)
    
    # Sample from each class
    # .take() is used for deterministic sampling. For random sampling, you'd shuffle first.
    active_sampled = active_dataset.shuffle(buffer_size=1024).take(desired_samples_per_class)
    inactive_sampled = inactive_dataset.shuffle(buffer_size=1024).take(desired_samples_per_class)
    
    # Union the sampled datasets
    balanced_dataset = active_sampled.union(inactive_sampled).shuffle(buffer_size=1024)
    
    return balanced_dataset

# Apply balanced sampling (conceptual)
df_train_balanced_tf = balanced_sample_tf_dataset(train_dataset, fraction_to_sample=0.1)
df_val_balanced_tf = balanced_sample_tf_dataset(val_dataset, fraction_to_sample=0.1)

# Verify final counts (requires iterating the dataset, can be slow)
final_train_active_count = df_train_balanced_tf.filter(lambda s, t: tf.reduce_max(t) >= 1).reduce(0, lambda x, _: x + 1).numpy()
final_train_inactive_count = df_train_balanced_tf.filter(lambda s, t: tf.reduce_max(t) < 1).reduce(0, lambda x, _: x + 1).numpy()
final_val_active_count = df_val_balanced_tf.filter(lambda s, t: tf.reduce_max(t) >= 1).reduce(0, lambda x, _: x + 1).numpy()
final_val_inactive_count = df_val_balanced_tf.filter(lambda s, t: tf.reduce_max(t) < 1).reduce(0, lambda x, _: x + 1).numpy()

print("\n--- Final Statistics (after conceptual balanced sampling) ---")
print(f"Training Set Final Active: {final_train_active_count}, Inactive: {final_train_inactive_count}")
print(f"Validation Set Final Active: {final_val_active_count}, Inactive: {final_val_inactive_count}")


# --- 5. Saving/Loading NumPy Partitions from TensorFlow Dataset (Optional) ---
# This mimics the original code's intent to save as NumPy partitions.
# In tf.data, you often don't save to intermediate NumPy files if you're
# directly feeding into a Keras model. This is for explicit file management.

print("\nSaving NumPy partitions from TensorFlow Dataset (conceptual)...")
# This part would involve iterating through the TF dataset and saving each sample/target
# as a separate .npy file. For very large datasets, this can be slow if not parallelized.
# You would typically use tf.data.experimental.save() for large datasets to a TFRecord format.

# Example of saving a few samples (conceptual, not for massive datasets directly)
# For a full dataset, you'd use dataset.enumerate() and save each element.
# Or, more efficiently, save as TFRecords using tf.data.experimental.save()

# This part is left commented out as it's a manual process for large datasets.
# For example:
# for i, (sample, target) in enumerate(df_train_balanced_tf.take(5)): # Take first 5 for example
#     np.save(os.path.join(train_sample_dir, f"sample_{i}.npy"), sample.numpy())
#     np.save(os.path.join(train_target_dir, f"target_{i}.npy"), target.numpy())
# print("Sample NumPy partitions saved.")

# --- 6. Model Definition (using Keras/TensorFlow) ---
# This part remains largely the same as your original TensorFlow model definition.
# The 'FeedBack' and 'FeedBackAttetion' classes would be defined elsewhere in your code.
print("\nDefining LSTM model...")
# Dummy classes for demonstration if not provided
class FeedBack(tf.keras.Model):
    def __init__(self, units, out_steps, timesteps, num_features, num_output_features, num_mixtures, stochastic, feature_extraction, kernel):
        super().__init__()
        self.units = units
        self.out_steps = out_steps
        self.timesteps = timesteps
        self.num_features = num_features
        self.num_output_features = num_output_features
        self.num_mixtures = num_mixtures
        self.stochastic = stochastic
        self.feature_extraction = feature_extraction
        self.kernel = kernel
        self.dense = tf.keras.layers.Dense(units) # Example layer

    def call(self, inputs):
        # Dummy call for build method
        return self.dense(inputs)

class FeedBackAttetion(tf.keras.Model):
    def __init__(self, units, out_steps, timesteps, num_features, num_output_features, num_mixtures, stochastic, feature_extraction, attention_heads):
        super().__init__()
        self.units = units
        self.out_steps = out_steps
        self.timesteps = timesteps
        self.num_features = num_features
        self.num_output_features = num_output_features
        self.num_mixtures = num_mixtures
        self.stochastic = stochastic
        self.feature_extraction = feature_extraction
        self.attention_heads = attention_heads
        self.dense = tf.keras.layers.Dense(units) # Example layer

    def call(self, inputs):
        # Dummy call for build method
        return self.dense(inputs)

def get_lstm_model(timesteps, num_features, num_output_features, num_mixtures, units, conv_kernel):
    # This is a placeholder for your actual model definition function
    model = FeedBack(units=units, out_steps=2, timesteps=timesteps, num_features=num_features,
                     num_output_features=num_output_features, num_mixtures=num_mixtures,
                     stochastic=True, feature_extraction=True, kernel=conv_kernel)
    return model

# Your model instantiation
lstm_model = FeedBack(units=16,
                      out_steps=2,
                      timesteps=4,
                      num_features=22,
                      num_output_features=1,
                      num_mixtures=128,
                      stochastic=True,
                      feature_extraction=True,
                      kernel=3
                      )
lstm_model.build(input_shape=(None, lstm_model.timesteps, lstm_model.num_features))
print("UserWarning: `build()` called on layer 'feed_back'...") # Acknowledge the warning

lstm_model_attention = FeedBackAttetion(
    units=16,
    out_steps=2,
    timesteps=4,
    num_features=22,
    num_output_features=1,
    num_mixtures=128,
    stochastic=True,
    feature_extraction=True,
    attention_heads=3,)
lstm_model_attention.build(input_shape=(None, 4, 22))
print("UserWarning: `build()` called on layer 'feed_back_attention'...") # Acknowledge the warning

lstm_model_get = get_lstm_model(timesteps=4, num_features=22, num_output_features=1, num_mixtures=128, units=16, conv_kernel=3)
lstm_model_get.summary()

print("\nTensorFlow-only data pipeline and model setup complete.")


In [None]:


# --- Define paths for NumPy partitions ---
train_numpy_path = os.path.join(base_path, f"Dati/USE_CASE/pChurn/numpy_partitions/{df_prefix}/train")
val_numpy_path = os.path.join(base_path, f"Dati/USE_CASE/pChurn/numpy_partitions/{df_prefix}/val")

train_sample_path = os.path.join(train_numpy_path, "sample")
train_target_path = os.path.join(train_numpy_path, "target")
val_sample_path = os.path.join(val_numpy_path, "sample")
val_target_path = os.path.join(val_numpy_path, "target")

# --- Clean up existing directories ---
if os.path.exists(train_numpy_path):
    shutil.rmtree(train_numpy_path)
if os.path.exists(val_numpy_path):
    shutil.rmtree(val_numpy_path)

# --- Create directories ---
os.makedirs(train_sample_path, exist_ok=True)
os.makedirs(train_target_path, exist_ok=True)
os.makedirs(val_sample_path, exist_ok=True)
os.makedirs(val_target_path, exist_ok=True)

# --- Data Loading and Processing (Conceptual - this is the part that changes most) ---
# This is the *hardest* part to replace directly with "only TensorFlow"
# You would typically:
# 1. Have your data already in a TF-friendly format (e.g., TFRecord files, CSVs, or individual NumPy files).
# 2. Use tf.data.Dataset.from_tensor_slices or tf.data.TFRecordDataset etc.
# 3. Perform any "column" additions or transformations using tf.map or Python.

# Example if data is already in NumPy arrays (simplified, not reading parquet here)
# For example, if you had train_features.npy, train_labels.npy, etc.
# train_samples = np.load(os.path.join(base_path, f"Dati/USE_CASE/pChurn/{df_prefix}_df_train_samples.npy"))
# train_targets = np.load(os.path.join(base_path, f"Dati/USE_CASE/pChurn/{df_prefix}_df_train_targets.npy"))
# val_samples = np.load(os.path.join(base_path, f"Dati/USE_CASE/pChurn/{df_prefix}_df_val_samples.npy"))
# val_targets = np.load(os.path.join(base_path, f"Dati/USE_CASE/pChurn/{df_prefix}_df_val_targets.npy"))

# Then, you'd save these into the new partitioned structure
# np.save(os.path.join(train_sample_path, "data.npy"), train_samples)
# np.save(os.path.join(train_target_path, "data.npy"), train_targets)
# np.save(os.path.join(val_sample_path, "data.npy"), val_samples)
# np.save(os.path.join(val_target_path, "data.npy"), val_targets)

# If your original parquet files are small enough to fit in memory,
# you could use pandas to read them, then convert to numpy/tf.data
# import pandas as pd
# df_train_pd = pd.read_parquet(base_path + f"Dati/USE_CASE/pChurn/{df_prefix}_df_train.parquet/")
# df_val_pd = pd.read_parquet(base_path + f"Dati/USE_CASE/pChurn/{df_prefix}_df_val.parquet/")

# Then convert pandas DataFrames to TensorFlow Datasets or NumPy arrays
# train_dataset = tf.data.Dataset.from_tensor_slices(dict(df_train_pd))
# val_dataset = tf.data.Dataset.from_tensor_slices(dict(df_val_pd))

# For adding "path", "row_idx", "partition_idx" - these are Spark DataFrame concepts.
# In TensorFlow, you'd manage file paths explicitly when saving/loading,
# and row/partition indices would be implicit in your tf.data pipeline or file organization.

In [None]:
def spark_partition_to_tensors(partition_iterator):
    # Converte l'iteratore della partizione in un DataFrame pandas (efficiente a livello di partizione)
    pdf = pd.DataFrame([row.asDict() for row in partition_iterator])
    
    # Se la partizione è vuota, esci.
    if pdf.empty:
        return

    # Estrai le colonne 'sample' e 'target' e convertile in array NumPy pronti per TF
    # np.stack converte una lista di array in un unico array multidimensionale.
    samples = np.stack(pdf["sample"].apply(lambda x: x.toArray()).values)
    targets = np.stack(pdf["target"].values)
    
    # "Yield" crea un generatore, che è efficiente in termini di memoria.
    # Restituisce una singola tupla (samples, targets) per l'intera partizione.
    yield samples, targets

# --- 2. CREA I TF.DATA.DATASET USANDO UN GENERATORE (Metodo Corretto) ---

# Ottieni le specifiche dei dati per aiutare TensorFlow a capire la struttura
# Il primo 'None' rappresenta la dimensione del batch, che può variare.
sample_spec = tf.TensorSpec(shape=(None, 4, 22), dtype=tf.float32)
target_spec = tf.TensorSpec(shape=(None, 2), dtype=tf.float32)
output_signature = (sample_spec, target_spec)

# Crea un "generatore" che tira i dati dai worker una partizione alla volta
# .toLocalIterator() è memory-efficient: non carica tutto il dataset sul driver.
train_generator = lambda: df_train.rdd.mapPartitions(spark_partition_to_tensors).toLocalIterator()

# Crea la pipeline di dati di TensorFlow usando il generatore
# TensorFlow ora chiederà a Spark le partizioni una per una, man mano che servono.
train_dataset = tf.data.Dataset.from_generator(
    train_generator,
    output_signature=output_signature
).prefetch(tf.data.AUTOTUNE)

# Fai lo stesso per il validation set
val_generator = lambda: df_val.rdd.mapPartitions(spark_partition_to_tensors).toLocalIterator()
val_dataset = tf.data.Dataset.from_generator(
    val_generator,
    output_signature=output_signature
).prefetch(tf.data.AUTOTUNE)

print("Pipeline di dati TensorFlow create correttamente in memoria.")

In [None]:
#print(train_x.shape, train_y.shape, df_train.count())
#print((val_tuple[0]).shape, (val_tuple[1]).shape, df_val.count())

# Otteniamo il conteggio totale dei campioni direttamente dai DataFrame Spark.
# Questa è un'operazione veloce che non causa problemi di memoria.
train_count = df_train.count()
val_count = df_val.count()

print(f"Numero totale di campioni di training: {train_count}")
print(f"Numero totale di campioni di validazione: {val_count}")

# La forma di un singolo campione (es. (4, 22)) è definita dalla logica di preprocessing
print(f"Forma di un singolo campione (sample): (4, 22)")
print(f"Forma di un singolo target: (2, 1)")