In [None]:
import numpy as np
!pip install tensorflow
import tensorflow as tf

# Discretized AP Arrays (0->+max->0->-max->0)

ap_typeI = np.array(
    [0.0, 0.4, 0.9, 1.0, 0.7, 0.2, 0.0, -0.2,
     -0.5, -0.7, -0.5, -0.3, -0.1, 0.0, 0.0],
    dtype=np.float32)

ap_typeIIa = np.array(
    [0.0, 0.7, 1.0, 0.4, 0.0, -0.3, -0.7, -0.3, 0.0, 0.0],
    dtype=np.float32)

ap_typeIIb = np.array(
    [0.0, 1.0, 0.3, 0.0, -0.5, -0.2, 0.0],
    dtype=np.float32)

ap_silence = np.array(
    [-1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0],
    dtype=np.float32)

# 1) Feature Definitions & Dilation/Padding (First Layer)

sampling_rate = 20000                                           #20 kHz

features = [
    {"name": "Type I",   "frequency": 30,  "kernel_size": 15},  # Slow Twitch
    {"name": "Type IIa", "frequency": 100, "kernel_size": 10},  # Medium Twitch
    {"name": "Type IIb", "frequency": 225, "kernel_size": 7 },  # Fast Twitch
    {"name": "Silence",  "frequency": None,"kernel_size": 7 }   # Silence => size=7
]

# Function to compute dilation rates and padding for each feature

def compute_dilations_and_padding(feature, sampling_rate, num_layers=4):
    ksize = feature["kernel_size"]
    if feature["frequency"] is None:                            # Silence feature
        dilation_rates = [1]*num_layers                         # Fixed dilation
    else:
        freq = feature["frequency"]
        d0 = int(np.ceil(sampling_rate/(freq*ksize)))           # Base dilation
        dilation_rates = [d0]
        for _ in range(num_layers-1):                           # Double dilation for each subsequent layer
            dilation_rates.append(dilation_rates[-1]*2)
    total_pad = sum((ksize-1)*d for d in dilation_rates)        # Total padding
    return dilation_rates, total_pad

for feat in features:
    feat["dilation_rates"], feat["total_padding"] = compute_dilations_and_padding(feat,sampling_rate)

# 2) First Layer Processing (Feature Extraction for 4 layers)

def first_layer_processing(input_data, num_channels=4):
    signal_length = input_data.shape[1]                        # Length of the signal
    num_features = len(features)                               # Number of features (4 in this case)
    channel_list = []
    ap_kernels = {
        "Type I":   tf.keras.initializers.Constant(ap_typeI.reshape((15,1,1))),
        "Type IIa": tf.keras.initializers.Constant(ap_typeIIa.reshape((10,1,1))),
        "Type IIb": tf.keras.initializers.Constant(ap_typeIIb.reshape((7,1,1))),
        "Silence":  tf.keras.initializers.Constant(ap_silence.reshape((7,1,1)))
    }
    for ch in range(num_channels):
        ch_data = input_data[ch,:]                            # Extract data for this channel
        feat_outputs = []
        for feat in features:
            ksize = feat["kernel_size"]
            dilations = feat["dilation_rates"]
            total_pad = feat["total_padding"]
            feat_name = feat["name"]
            x = tf.reshape(ch_data, (1, signal_length, 1))    # Reshape data and apply causal padding
            x = tf.pad(x, [[0, 0],[total_pad,0],[0, 0]], mode="CONSTANT")                                               
            for d in dilations:                               # Apply convolutional layers with increasing dilation
                init = ap_kernels[feat_name]
                conv = tf.keras.layers.Conv1D(
                    filters=1, 
                    kernel_size=ksize, 
                    dilation_rate=d,
                    padding="valid", 
                    activation=None,
                    use_bias=False,
                    kernel_initializer=init,
                    trainable=False
                )
                x = conv(x)
                out_1d = tf.squeeze(x).numpy()                # Convert back to numpy
                feat_outputs.append(out_1d)
        min_len = min(len(a) for a in feat_outputs)           # Align features by trimming to the minimum length
        feat_outputs = [a[:min_len] for a in feat_outputs]
        stacked = np.stack(feat_outputs, axis=0)
        channel_list.append(stacked) 
    min_len_overall = min(x.shape[1] for x in channel_list)   # Align channels and stack into a final tensor
    channel_list = [x[:, :min_len_overall] for x in channel_list]
    first_out = np.stack(channel_list, axis=0)                # => (4,4,time_steps)
    return first_out

# 3) Second Layer Processing with 2 sub-layers (dilations=10,20) + Feature Stride

def second_layer_processing_with_strides(first_out, strides_per_feature, num_filters=4):
    kernel_size=10                                            # Fixed kernel size for this layer
    second_layer_dilations = [10,20]                          # Fixed dilation rates for all features
    num_channels, num_features, time_steps = first_out.shape
    second_list = []
    for ch in range(num_channels):
        for j in range(num_features):
            data_1d = first_out[ch, j, :]
            data_3d = tf.reshape(data_1d, (1, time_steps, 1))
            stride = strides_per_feature[j]
            x = data_3d                                            
            for d in second_layer_dilations:                  # Two convolutional layers with increasing dilation
                conv = tf.keras.layers.Conv1D(
                    filters=num_filters, 
                    kernel_size=kernel_size,
                    strides=stride,
                    dilation_rate=d,
                    padding="same",
                    activation="relu"
                )
                x = conv(x)
            second_list.append(x.numpy())
second_array = np.array(second_list)
new_times = [second_array[i].shape[1] for i in range(len(second_array))]
min_nt = min(new_times)
trimmed = []
idx = 0
for ch in range(num_channels):
    for j in range(num_features):
        arr = second_array[idx]
        arr = arr[:, :min_nt, :]
        trimmed.append(arr)
        idx += 1
trimmed_np = np.array(trimmed)
trimmed_np = trimmed_np.reshape(num_channels,num_features, 1,min_nt, num_filters)
final_tensor = np.squeeze(trimmed_np, axis=2).transpose(0,1,3,2)
return final_tensor

# 4)  MLP For Servo Angle Prediction Output

def build_servo_mlp(input_dim, num_outputs=5):
    model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=(input_dim,)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(num_outputs, activation=None)
    ])
    return model

# 5) Full Pipeline with previous angles concatenation

def process_test_data(custom_test_data=None, signal_length=1000, num_channels=4, num_filters=4, prev_angles=None):
    if custom_test_data is None:
        input_data = tf.random.uniform((num_channels, signal_length))
    else:
        input_data = tf.convert_to_tensor(custom_test_data, dtype=tf.float32)
        
    first_out = first_layer_processing(input_data,num_channels=num_channels)        # First-layer processing      
    
    second_layer_strides = [1, 2, 3, 1]                                             # Second-layer processing
    second_out = second_layer_processing_with_strides(first_out, strides_per_feature=second_layer_strides,num_filters=num_filters)
    
    flattened_out = second_out.reshape((1, -1))                                     # Flatten the output for MLP input
    
    input_dim = flattened_out.shape[1]                                              # Build and run the MLP
    servo_mlp = build_servo_mlp(input_dim=input_dim, num_outputs=5)
    raw_angles = servo_mlp.predict(flattened_out,[0])

    clamped_angles = np.clip(raw_angles, 0, 180)                                    # Clamp angles between 0 and 180 degrees                                

return second_out, clamped_angles

# 6) Example Usage

if __name__ == "__main__":
    test_data = np.random.rand(4, 1000).astype(np.float32)                          # Example test data: 4 channels, each with 1000 samples
    
    second_out, angles = process_test_data(custom_test_data=test_data)              # Run the pipeline

    print("Predicted Servo Angles:", angles)                                        # Print final servo angles
