In [1]:
!pip uninstall -y numpy
!pip install numpy==1.22.4

import numpy as np

Found existing installation: numpy 1.22.4
Uninstalling numpy-1.22.4:
  Successfully uninstalled numpy-1.22.4
Collecting numpy==1.22.4
  Using cached numpy-1.22.4-cp38-cp38-macosx_10_15_x86_64.whl (17.6 MB)
Installing collected packages: numpy
Successfully installed numpy-1.22.4


In [2]:
!pip install --upgrade h5py



In [3]:
!pip install tensorflow==2.12.0
import tensorflow as tf



In [5]:
import numpy as np
from tensorflow import keras
from sklearn.preprocessing import StandardScaler

def build_model(input_dim, output_dim, hidden_size=10, layers=None):
    """Build neural network model matching R amorenet structure"""
    model = keras.Sequential()
    
    if layers is not None:
        # Use custom layer architecture
        for i, layer_size in enumerate(layers):
            if i == 0:
                # First layer needs input dimension
                model.add(keras.layers.Dense(layer_size, activation='tanh', input_shape=(input_dim,)))
            elif i == len(layers) - 1:
                # Output layer with tanh activation (matching R output.layer="tansig")
                model.add(keras.layers.Dense(layer_size, activation='tanh'))
            else:
                # Hidden layers with tanh activation
                model.add(keras.layers.Dense(layer_size, activation='tanh'))
    else:
        # Default architecture: input -> hidden -> output (matching R behavior)
        model.add(keras.layers.Dense(hidden_size, activation='tanh', input_shape=(input_dim,)))
        model.add(keras.layers.Dense(output_dim, activation='tanh'))
    
    # Match R training parameters: learning rate 1e-2, momentum 0.5
    model.compile(optimizer=keras.optimizers.SGD(learning_rate=0.01, momentum=0.5),
                  loss='mean_squared_error')
    return model

def train_model(model, x, y, epochs=100, verbose=True):
    """Train the model"""
    model.fit(x, y, epochs=epochs, verbose=verbose)
    return model

def predict_model(model, x):
    """Make predictions with the model"""
    return model.predict(x)

def normalize_data(x, y):
    """Normalize input and output data using standardization"""
    import numpy as np
    
    # Convert to numpy arrays and ensure proper shape
    x = np.asarray(x)
    y = np.asarray(y)
    
    # Handle 1D arrays - reshape to 2D
    if x.ndim == 1:
        x = x.reshape(-1, 1)
    if y.ndim == 1:
        y = y.reshape(-1, 1)
    
    x_scaler = StandardScaler()
    y_scaler = StandardScaler()
    x_norm = x_scaler.fit_transform(x)
    y_norm = y_scaler.fit_transform(y)
    return x_norm, y_norm, x_scaler, y_scaler

def train_normalized_model(x, y, hidden_size=10, epochs=100):
    """Train a model with normalized data"""
    x_norm, y_norm, x_scaler, y_scaler = normalize_data(x, y)
    model = build_model(x_norm.shape[1], y_norm.shape[1], hidden_size)
    trained_model = train_model(model, x_norm, y_norm, epochs)
    return trained_model, x_scaler, y_scaler

def predict_normalized_model(model, x, x_scaler, y_scaler):
    """Make predictions with normalized model and denormalize results"""
    x_norm = x_scaler.transform(x)
    y_pred_norm = model.predict(x_norm)
    return y_scaler.inverse_transform(y_pred_norm)

def create_normalized_model(x, y, hidden_size=10, layers=None, maxit=100):
    """
    Factory function to create a normalized model
    This function signature matches what dopdt expects
    """
    return NormalizedModel(x, y, hidden_size, layers, maxit)

class NormalizedModel:
    """
    Python equivalent of R's amorenetNorm class
    Handles normalization and denormalization automatically
    """
    def __init__(self, x, y, hidden_size=10, layers=None, maxit=100):
        """
        Initialize and train normalized neural network
        
        Args:
            x: Input data (numpy array or similar)
            y: Output data (numpy array or similar)
            hidden_size: Size of hidden layer (default 10, matching R)
            layers: Custom layer architecture (list of layer sizes)
            maxit: Maximum iterations/epochs (default 100, matching R)
        """
        # Normalize data
        x_norm, y_norm, x_scaler, y_scaler = normalize_data(x, y)
        
        # Build model with proper architecture
        model = build_model(
            input_dim=x_norm.shape[1],
            output_dim=y_norm.shape[1],
            hidden_size=hidden_size,
            layers=layers
        )
        
        # Train model
        self.model = train_model(model, x_norm, y_norm, epochs=maxit, verbose=False)
        self.x_scaler = x_scaler
        self.y_scaler = y_scaler
    
    def predict(self, x):
        """
        Make predictions with automatic normalization/denormalization
        Equivalent to R's predict.amorenetNorm
        """
        # Handle vector input (convert to matrix like R code)
        if len(x.shape) == 1:
            x = x.reshape(1, -1)
        
        # Check compatibility
        if x.shape[1] != len(self.x_scaler.mean_):
            raise ValueError("predict: data not compatible")
        
        # Normalize input
        x_norm = self.x_scaler.transform(x)
        
        # Predict
        y_pred_norm = self.model.predict(x_norm, verbose=0)
        
        # Denormalize output
        return self.y_scaler.inverse_transform(y_pred_norm)
    
    @property
    def input_dim(self):
        """Get input dimension"""
        return self.model.input_shape[1]
    
    def __call__(self, x, y, **kwargs):
        """
        Make the class callable - this allows it to be used as a model factory
        Returns a new instance with the given data
        """
        # Merge any additional parameters
        params = {'hidden_size': 10, 'layers': None, 'maxit': 100}
        params.update(kwargs)
        return NormalizedModel(x, y, **params)