In [4]:
def label_binary(df, rul_threshold=30):
    # Assuming "unit" and "cycle" columns exist
    grouped = df.groupby('unit')
    binary_labels = []

    for unit, group in grouped:
        max_cycle = group['cycle'].max()
        for cycle in group['cycle']:
            rul = max_cycle - cycle
            binary_labels.append(1 if rul < rul_threshold else 0)

    return binary_labels


In [5]:
from torch.utils.data import Dataset
import torch
class CMAPSSBinaryDataset(Dataset):
    def __init__(self, df, sensor_id, rul_threshold=30, window=30):
        self.sensor = f's{sensor_id}'
        self.window = window

        self.data = df[['unit', 'cycle', self.sensor]].copy()
        self.labels = label_binary(df, rul_threshold)

        self.X, self.y = self.create_sequences()

    def create_sequences(self):
        X, y = [], []
        for unit in self.data['unit'].unique():
            unit_data = self.data[self.data['unit'] == unit][self.sensor].values
            unit_labels = self.labels[:len(unit_data)]
            for i in range(len(unit_data) - self.window):
                X.append(unit_data[i:i+self.window].reshape(-1, 1))
                y.append(unit_labels[i + self.window])
        return torch.tensor(X, dtype=torch.float32), torch.tensor(y, dtype=torch.float32).unsqueeze(1)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]


In [11]:
from datasets.cmapss import load_data
from torch.utils.data import DataLoader


client_id = 5  # 0–25
sensor_id = client_id + 1

raw = load_data("FD001")[0]  # Get the training dataframe
# Rename columns to match expected names in CMAPSSBinaryDataset
raw_renamed = raw.rename(columns={
	'unit_number': 'unit',
	'time': 'cycle',
	f'sensor{sensor_id}': f's{sensor_id}'
})
ds = CMAPSSBinaryDataset(raw_renamed, sensor_id=sensor_id)
loader = DataLoader(ds, batch_size=32, shuffle=True)


-----------------
 Data Set: FD001 
-----------------
Train trjectories: 100
Test trajectories: 100
Conditions: ONE (Sea Level)
Fault Modes: ONE (HPC Degradation)



In [12]:
ds.data.head()

Unnamed: 0,unit,cycle,s6
0,1,1,21.61
1,1,2,21.61
2,1,3,21.61
3,1,4,21.61
4,1,5,21.61


In [10]:
ds.data.head()

Unnamed: 0,unit,cycle,s21
0,1,1,23.419
1,1,2,23.4236
2,1,3,23.3442
3,1,4,23.3739
4,1,5,23.4044


In [12]:
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F

In [14]:


class CNN_LSTM_Skip1D(nn.Module):
    def __init__(self, input_dim, cnn_filters=64, lstm_hidden=128, skip_steps=2, fc_out_dim=1):
        super(CNN_LSTM_Skip1D, self).__init__()
        self.skip_steps = skip_steps
        self.lstm_hidden = lstm_hidden

        # 1D Convolutional Layer
        self.conv1d = nn.Conv1d(in_channels=input_dim, out_channels=cnn_filters, kernel_size=1, padding='same')
        self.relu = nn.ReLU()

        # Pooling (no actual effect here, but included per spec)
        self.pool = nn.AvgPool1d(kernel_size=1, padding=0)  # effectively a no-op

        # LSTM
        self.lstm = nn.LSTM(input_size=cnn_filters, hidden_size=lstm_hidden, batch_first=True)

        # Skip connection layers (for FC transformation)
        self.fc_v = nn.Linear(lstm_hidden, fc_out_dim)
        self.fc_s = nn.ModuleList([nn.Linear(lstm_hidden, fc_out_dim) for _ in range(skip_steps)])
        self.b_out = nn.Parameter(torch.zeros(fc_out_dim))

    def forward(self, x):

        # Apply 1D CNN: (B, seq_len, in) -> (B, in, seq_len)
        #x = x.permute(0, 2, 1)
        x = self.relu(self.conv1d(x))
        x = self.pool(x)
        x = x.permute(0, 2, 1)  # Back to (B, seq_len, cnn_filters)

        # LSTM
        lstm_out, _ = self.lstm(x)  # (B, seq_len, lstm_hidden)
        skip_outputs = []

        for t in range(x.size(1)):
            current = lstm_out[:, t]  # shape (B, hidden)

            # Gather skip connections
            skip_sum = 0
            for i in range(self.skip_steps):
                if t - i - 1 >= 0:
                    skip_sum += self.fc_s[i](lstm_out[:, t - i - 1])

            # Final output at timestep t
            pV_t = self.fc_v(current)
            pD_t = pV_t + skip_sum + self.b_out
            skip_outputs.append(pD_t.unsqueeze(1))

        return torch.cat(skip_outputs, dim=1)  # (B, seq_len, fc_out_dim)



def train(model, loader, epochs=5, lr=1e-3, device='cuda'):
    criterion = torch.nn.BCEWithLogitsLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    model.train()
    for epoch in tqdm(range(epochs), colour='green'):
        total_loss = 0
        for X, y in loader:
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            output = model(X)
            loss = criterion(output, y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item() * X.size(0)
        avg_loss = total_loss / len(loader.dataset)
        print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")
model = CNN_LSTM_Skip1D(input_dim=1)
model.to('cuda')
train(model,loader)


  0%|[32m          [0m| 0/5 [00:00<?, ?it/s]




RuntimeError: Given groups=1, weight of size [64, 1, 1], expected input[32, 30, 1] to have 1 channels, but got 30 channels instead

In [None]:
import math

def get_k_from_probability(probability_percent):
    p = probability_percent / 100.0
    
    if p <= 0 or p >= 1:
        raise ValueError("Probability must be between 0 and 100 (exclusive).")
    
    k = math.sqrt(1 / (1 - p))
    return round(k, 4)

# Example usage
if __name__ == "__main__":
    try:
        p_input = float(input("Enter the probability within kσ (in %, e.g., 75): "))
        k_result = get_k_from_probability(p_input)
        print(f"Minimum k such that P(|X - μ| < kσ) ≥ {p_input}% is k = {k_result}")
    except ValueError as e:
        print(f"Error: {e}")



Minimum k such that P(|X - μ| < kσ) ≥ 30.0% is k = 1.1952


In [3]:
def chebyshev_interval(mean, std_dev, k):
    if std_dev < 0 or k <= 0:
        raise ValueError("Standard deviation must be non-negative and k must be positive.")

    min_bound = mean - k * std_dev
    max_bound = mean + k * std_dev
    probability = 1 - (1 / (k ** 2))

    return {
        "k": round(k, 4),
        "probability_%": round(probability * 100, 2),
        "interval": (round(min_bound, 4), round(max_bound, 4))
    }

# Example usage
if __name__ == "__main__":
    try:
        mu = float(input("Enter the mean (μ): "))
        sigma = float(input("Enter the standard deviation (σ): "))
        k = float(input("Enter the value of k: "))
        result = chebyshev_interval(mu, sigma, k)
        
        print(f"\nWith k = {result['k']}:")
        print(f"At least {result['probability_%']}% of the data lies within the interval:")
        print(f"{result['interval']}")
    except ValueError as e:
        print(f"Error: {e}")



With k = 1.19:
At least 29.38% of the data lies within the interval:
(16.43, 23.57)


In [42]:
import numpy as np
def get_chebyshev_bounds(arr, bwt):
    # Flatten the array to handle weights from CNN or LSTM (any shape)
    arr_flat = arr.flatten()
    # Standardize the flattened array
    arr_flat = (arr_flat - np.mean(arr_flat)) / np.std(arr_flat)
    mean = np.mean(arr_flat)
    std = np.std(arr_flat)

    if 0 <= bwt < 25:
        k = 1.1952
    elif 25 <= bwt < 50:
        k = np.sqrt(2)
    elif 50 <= bwt < 75:
        k = 2.236
    elif 75 <= bwt <= 100:
        k = 10
    else:
        raise ValueError("bwt must be in the range 0 to 100")

    lower = mean - k * std
    upper = mean + k * std
    mask = ((arr_flat >= lower) & (arr_flat <= upper)).astype(int)
    percent_0 = 100 * np.sum(mask == 0) / len(mask)
    percent_1 = 100 * np.sum(mask == 1) / len(mask)
    print(f"% of 0 in mask: {percent_0:.2f}%")
    print(f"% of 1 in mask: {percent_1:.2f}%")
    return lower, upper, np.unique(mask), mean, std

In [46]:
bwt = 75
# Mimic weights from a CNN layer (for demonstration)
cnn_weights = np.random.randn(64, 1, 1)  # Example: 64 filters, 1 input channel, kernel size 1

#print(cnn_weights)
lower, upper, _ , mean, std = get_chebyshev_bounds(cnn_weights, bwt)
print(f"Lower bound: {lower}")
print(f"Upper bound: {upper}")

print(f"Mean: {mean}")
print(f"Standard Deviation: {std}")

% of 0 in mask: 0.00%
% of 1 in mask: 100.00%
Lower bound: -10.0
Upper bound: 10.0
Mean: 1.3877787807814457e-17
Standard Deviation: 1.0


In [63]:
def get_mask_around_mean_ecdf(weights: torch.Tensor, level: float) -> torch.Tensor:
    assert 0 <= level <= 100, "Level must be between 0 and 100"

    # Map input level to percent of data to keep
    if 0 <= level < 25:
        keep_percent = 0.3
    elif 25 <= level < 50:
        keep_percent = 0.5
    elif 50 <= level < 75:
        keep_percent = 0.8
    else:  # 75 <= level <= 100
        keep_percent = 1.0

    flat_weights = weights.flatten()
    sorted_weights, indices = torch.sort(flat_weights)
    n = len(flat_weights)
    ecdf = torch.arange(1, n + 1, dtype=torch.float32) / n

    # Center interval around mean
    mean_val = flat_weights.mean()
    mean_idx = torch.searchsorted(sorted_weights, mean_val)
    half_window = int((keep_percent * n) // 2)
    start = max(mean_idx - half_window, 0)
    end = min(mean_idx + half_window, n)

    mask = torch.zeros_like(flat_weights, dtype=torch.bool)
    mask[indices[start:end]] = 1
    mask = mask.reshape(weights.shape).int()

    percent_ones = 100 * mask.sum().item() / mask.numel()
    percent_zeros = 100 - percent_ones
    print(f"% of 0 in mask (outside bounds): {percent_zeros:.2f}%")
    print(f"% of 1 in mask (inside bounds): {percent_ones:.2f}%")
    return mask



In [67]:
bwt = 50
# Mimic weights from a CNN layer (for demonstration)
cnn_weights = np.random.randn(64, 1, 1)  # Example: 64 filters, 1 input channel, kernel size 1

get_mask_around_mean_ecdf(torch.tensor(cnn_weights, dtype=torch.float32), level=bwt)



% of 0 in mask (outside bounds): 21.88%
% of 1 in mask (inside bounds): 78.12%


tensor([[[1]],

        [[0]],

        [[1]],

        [[0]],

        [[0]],

        [[1]],

        [[1]],

        [[1]],

        [[1]],

        [[1]],

        [[0]],

        [[1]],

        [[1]],

        [[1]],

        [[1]],

        [[0]],

        [[1]],

        [[1]],

        [[1]],

        [[1]],

        [[1]],

        [[1]],

        [[1]],

        [[1]],

        [[0]],

        [[1]],

        [[1]],

        [[0]],

        [[1]],

        [[1]],

        [[0]],

        [[1]],

        [[1]],

        [[1]],

        [[0]],

        [[1]],

        [[1]],

        [[1]],

        [[1]],

        [[0]],

        [[1]],

        [[1]],

        [[1]],

        [[1]],

        [[1]],

        [[0]],

        [[1]],

        [[1]],

        [[1]],

        [[1]],

        [[0]],

        [[1]],

        [[1]],

        [[1]],

        [[0]],

        [[1]],

        [[1]],

        [[1]],

        [[1]],

        [[1]],

        [[1]],

        [[1]],

        

In [62]:
import numpy as np

def generate_weighted_distribution(size=100):
    numbers = np.arange(1, 101)
    
    # Define weights based on the intervals:
    # 30% for 75-100, 40% for 50-74, 20% for 25-49, 10% for 1-24
    
    weights = np.zeros_like(numbers, dtype=float)
    
    weights[(numbers >= 75) & (numbers <= 100)] = 0.30 / 26    # 26 numbers in 75-100
    weights[(numbers >= 50) & (numbers <= 74)] = 0.40 / 25    # 25 numbers in 50-74
    weights[(numbers >= 25) & (numbers <= 49)] = 0.20 / 25    # 25 numbers in 25-49
    weights[(numbers >= 1) & (numbers <= 24)]  = 0.10 / 24    # 24 numbers in 1-24
    
    # Sample 'size' numbers with the defined weighted probabilities
    sample = np.random.choice(numbers, size=size, p=weights)
    return sample

# Example usage
distribution = generate_weighted_distribution(4)
print(distribution)


[79 72 66 30]


In [1]:
import pickle


print(pickle.format_version)

4.0
