In [1]:
import numpy as np

A = np.ones(5)

In [2]:
A

array([1., 1., 1., 1., 1.])

In [3]:
np.diag(A)

array([[1., 0., 0., 0., 0.],
       [0., 1., 0., 0., 0.],
       [0., 0., 1., 0., 0.],
       [0., 0., 0., 1., 0.],
       [0., 0., 0., 0., 1.]])

In [2]:
import numpy as np
A = np.array([1, 2, 3, 4, 5])
number = 10
np.concatenate(([number], A))

array([10,  1,  2,  3,  4,  5])

In [3]:
import numpy as np

# Load from NPZ format (recommended)
data = np.load("observations_sinusoidal.npz")
observations = data['observations']
timestamps = data['timestamps']
actions = data['actions']
states = data['states'] if 'states' in data else None

print(f"Observations shape: {observations.shape}")
print(f"Data keys: {list(data.keys())}")

Observations shape: (51, 1, 1)
Data keys: ['observations', 'timestamps', 'actions']


In [10]:
observations[0:2]

array([[[0.12615797]],

       [[0.53803015]]])

In [11]:
observations_d = np.array([[[1.0]], [[2.0]], [[3.0]], [[4.0]]])

In [12]:
observations_d.shape

(4, 1, 1)

In [13]:

# Mask for every nth observation (e.g., every 5th observation)
sampling_mask = np.arange(0, len(observations_d), 2)
sampled_observations = observations_d[sampling_mask]

In [14]:
sampled_observations

array([[[1.]],

       [[3.]]])

# Masking with None Values

When you want to mask observations but keep the same array length, you can set masked positions to `None`. This preserves the temporal structure while indicating missing/masked data.

In [15]:
# Example: Create masked observations with None values
# Start with your loaded observations
print("Original observations shape:", observations.shape)
print("First 5 observations:")
print(observations[:5])

# Create a copy for masking (use object dtype to allow None)
masked_observations = np.empty(observations.shape[0], dtype=object)

# Copy the original observations
for i in range(len(observations)):
    masked_observations[i] = observations[i].copy()

print("\nMasked array created with shape:", masked_observations.shape)

Original observations shape: (51, 1, 1)
First 5 observations:
[[[0.12615797]]

 [[0.53803015]]

 [[0.15975223]]

 [[0.46371002]]

 [[0.42984675]]]

Masked array created with shape: (51,)


In [None]:
# Method 1: Random masking - set random observations to None
np.random.seed(42)
mask_probability = 0.3  # 30% of observations will be None

# Create random mask
random_mask = np.random.random(len(observations)) < mask_probability

# Apply mask - set selected positions to None
masked_obs_random = masked_observations.copy()
masked_obs_random[random_mask] = None

print(f"Randomly masked {np.sum(random_mask)} out of {len(observations)} observations")
print("First 10 elements (None = masked):")
print(masked_obs_random[:10])

Randomly masked 19 out of 51 observations
First 10 elements (None = masked):
[array([[0.12615797]]) array([[0.53803015]]) array([[0.15975223]])
 array([[0.46371002]]) None None None array([[0.22498989]])
 array([[0.42373395]]) array([[0.04455506]])]


In [None]:
# Method 2: Systematic masking - every nth observation
mask_every_n = 3  # Mask every 3rd observation

masked_obs_systematic = masked_observations.copy()
# Set every nth observation to None
masked_obs_systematic[::mask_every_n] = None

print(f"Systematically masked every {mask_every_n}rd observation")
print("First 15 elements:")
print(masked_obs_systematic[:15])

In [None]:
# Method 3: Time-based masking - mask specific time ranges
time_mask_start = 2.0
time_mask_end = 3.5

masked_obs_time = masked_observations.copy()
time_mask = (timestamps >= time_mask_start) & (timestamps <= time_mask_end)
masked_obs_time[time_mask] = None

print(f"Time-based masking: masked observations between {time_mask_start} and {time_mask_end}")
print(f"Masked {np.sum(time_mask)} observations")

# Method 4: Value-based masking - mask based on observation values
masked_obs_value = masked_observations.copy()
# Mask observations where first component is above a threshold
threshold = np.mean(observations[:, 0, 0])  # Use mean as threshold
value_mask = observations[:, 0, 0] > threshold
masked_obs_value[value_mask] = None

print(f"\nValue-based masking: masked {np.sum(value_mask)} observations above threshold {threshold:.3f}")

In [None]:
# Utility functions for working with None-masked observations

def get_valid_observations(masked_obs_array):
    """Extract only non-None observations"""
    valid_indices = [i for i, obs in enumerate(masked_obs_array) if obs is not None]
    valid_obs = [masked_obs_array[i] for i in valid_indices]
    return np.array(valid_obs), valid_indices

def count_missing(masked_obs_array):
    """Count missing (None) observations"""
    return sum(1 for obs in masked_obs_array if obs is None)

def get_mask_pattern(masked_obs_array):
    """Get boolean mask where True = valid data, False = None"""
    return np.array([obs is not None for obs in masked_obs_array])

# Test the utility functions
valid_obs, valid_indices = get_valid_observations(masked_obs_random)
missing_count = count_missing(masked_obs_random)
mask_pattern = get_mask_pattern(masked_obs_random)

print(f"Valid observations: {len(valid_obs)}")
print(f"Missing observations: {missing_count}")
print(f"Total: {len(masked_obs_random)}")
print(f"Mask pattern (first 20): {mask_pattern[:20]}")

In [None]:
def create_masked_observations(observations, timestamps, mask_type='random', **kwargs):
    """
    Create masked observations with None values
    
    Args:
        observations: Original observations array
        timestamps: Corresponding timestamps
        mask_type: 'random', 'systematic', 'time_range', 'value_based', 'custom'
        **kwargs: Additional parameters for specific mask types
    
    Returns:
        numpy array with None values at masked positions
    """
    # Create object array to hold None values
    masked_obs = np.empty(len(observations), dtype=object)
    
    # Copy original observations
    for i in range(len(observations)):
        masked_obs[i] = observations[i].copy()
    
    if mask_type == 'random':
        prob = kwargs.get('probability', 0.2)
        seed = kwargs.get('seed', 42)
        np.random.seed(seed)
        mask = np.random.random(len(observations)) < prob
        
    elif mask_type == 'systematic':
        step = kwargs.get('step', 3)
        mask = np.zeros(len(observations), dtype=bool)
        mask[::step] = True
        
    elif mask_type == 'time_range':
        t_start = kwargs.get('t_start', 1.0)
        t_end = kwargs.get('t_end', 3.0)
        mask = (timestamps >= t_start) & (timestamps <= t_end)
        
    elif mask_type == 'value_based':
        threshold = kwargs.get('threshold', np.mean(observations[:, 0, 0]))
        dim = kwargs.get('dimension', 0)
        mask = observations[:, dim, 0] > threshold
        
    elif mask_type == 'custom':
        mask = kwargs.get('mask', np.zeros(len(observations), dtype=bool))
        
    else:
        raise ValueError(f"Unknown mask_type: {mask_type}")
    
    # Apply mask
    masked_obs[mask] = None
    
    return masked_obs, mask

# Example usage of the comprehensive function
print("Creating different masked versions:")

# Random masking
masked_random, mask_random = create_masked_observations(
    observations, timestamps, 'random', probability=0.25, seed=123
)
print(f"Random mask: {np.sum(mask_random)} masked out of {len(observations)}")

# Time-based masking
masked_time, mask_time = create_masked_observations(
    observations, timestamps, 'time_range', t_start=1.5, t_end=2.5
)
print(f"Time mask: {np.sum(mask_time)} masked out of {len(observations)}")

# Custom pattern - mask every 4th observation starting from index 1
custom_mask = np.zeros(len(observations), dtype=bool)
custom_mask[1::4] = True
masked_custom, _ = create_masked_observations(
    observations, timestamps, 'custom', mask=custom_mask
)
print(f"Custom mask: {np.sum(custom_mask)} masked out of {len(observations)}")

In [None]:
# Visualize the masking pattern
def print_mask_pattern(masked_obs, timestamps, max_display=30):
    """Print a visual representation of the mask pattern"""
    print("Mask pattern (✓ = valid data, ✗ = None):")
    pattern = ""
    times = ""
    
    display_length = min(len(masked_obs), max_display)
    
    for i in range(display_length):
        if masked_obs[i] is not None:
            pattern += "✓ "
            times += f"{timestamps[i]:.1f} "
        else:
            pattern += "✗ "
            times += "--- "
    
    if len(masked_obs) > max_display:
        pattern += "..."
        times += "..."
    
    print("Pattern:", pattern)
    print("Times:  ", times)
    print()

# Test the visualization
print("Random masking pattern:")
print_mask_pattern(masked_random, timestamps)

print("Time-based masking pattern:")
print_mask_pattern(masked_time, timestamps)

print("Custom masking pattern:")
print_mask_pattern(masked_custom, timestamps)