In [64]:
import numpy as np
from sklearn.datasets import make_classification
import matplotlib.pyplot as plt
import torch
from tqdm import tqdm
import torch.nn as nn

# Creating Data

In [46]:

np.random.seed(42)


X, y = make_classification(n_samples=2000000, n_features=3, n_informative=3, n_redundant=0,
                           n_clusters_per_class=1, weights=[0.5, 0.5], flip_y=0.05, class_sep=1.5)
y = 2*y - 1

# fig = plt.figure(figsize=(8, 6))
# ax = fig.add_subplot(111, projection='3d')


# ax.scatter(X[y == -1][:, 0], X[y == -1][:, 1], X[y == -1][:, 2], c='b', marker='o', label='Class -1')
# ax.scatter(X[y == 1][:, 0], X[y == 1][:, 1], X[y == 1][:, 2], c='r', marker='^', label='Class 1')

# ax.set_xlabel('Feature 1')
# ax.set_ylabel('Feature 2')
# ax.set_zlabel('Feature 3')
# ax.set_title('3D Scatter Plot of Synthetic Data')
# ax.legend()

# plt.show()

# LFs generator

In [37]:
def random_label_flip_and_zero(arr, m, n_list, zero_n_list):

    if len(n_list) != m or len(zero_n_list) != m:
        raise ValueError("The length of n_list and zero_n_list must be equal to m.")
    
    length = len(arr)
    flipped_arrays = []

    for i in range(m):
        n = n_list[i]
        zero_n = zero_n_list[i]

        # Randomly select indices to flip
        indices_to_zero = np.random.choice(length, zero_n, replace=False)

        # Create a copy of the array to flip the labels
        modified_arr = arr.copy()
        modified_arr[indices_to_zero] = 0

        # Identify the untouched indices
        untouched_indices = np.setdiff1d(np.arange(length), indices_to_zero)

        # Randomly select indices from the untouched indices to set to 0
        indices_to_flip = np.random.choice(untouched_indices, n, replace=False)

        # Set the chosen indices to 0
        modified_arr[indices_to_flip] = -modified_arr[indices_to_flip]

        flipped_arrays.append(modified_arr)

    return flipped_arrays

In [47]:
arr = y

m = 5  # Number of random selections
n_list = [5, 10, 15, 20, 25]  # Different number of indices to flip for each selection
zero_n_list = [20, 40, 60, 80, 10]  # Different number of indices to set to 0 for each selection
flipped_arrays = random_label_flip_and_zero(arr, m, n_list, zero_n_list)


# print("Original = ", arr)
ALL_LFs = {}

for i, modified_arr in enumerate(flipped_arrays):
#     print(f"Array {i+1}:")
#     print(modified_arr-arr)
    lf_dict = {}
    
    lf_dict['alpha'] = 1 - (n_list[i]/(len(y) - zero_n_list[i]))
    lf_dict['beta'] = 1 - (zero_n_list[i]/len(y))
    
    lf_dict['outputs'] = modified_arr
    
    ALL_LFs[i] = lf_dict

In [48]:
ALL_LFs

{0: {'alpha': 0.9999974999749998,
  'beta': 0.99999,
  'outputs': array([-1, -1, -1, ...,  1, -1, -1])},
 1: {'alpha': 0.999994999899998,
  'beta': 0.99998,
  'outputs': array([-1, -1, -1, ...,  1, -1, -1])},
 2: {'alpha': 0.9999924997749933,
  'beta': 0.99997,
  'outputs': array([-1, -1, -1, ...,  1, -1, -1])},
 3: {'alpha': 0.999989999599984,
  'beta': 0.99996,
  'outputs': array([-1, -1, -1, ...,  1, -1, -1])},
 4: {'alpha': 0.9999874999374997,
  'beta': 0.999995,
  'outputs': array([-1, -1, -1, ...,  1, -1, -1])}}

# Expected Value for alpha and beta

In [57]:
m = 5
epsilon = 0.03
s_cardinality = len(y)

minimum_cardinality = (356/(epsilon)**2) * np.log(m/(3*epsilon))

print("minimum cardinality = ", minimum_cardinality)
print("current cardinality = ", s_cardinality)
if s_cardinality > minimum_cardinality:
    print("Check!")
else:
    print("More data needed ...")

minimum cardinality =  1589098.3705628957
current cardinality =  2000000
Check!


# Label Model

In [84]:
# initializing

Alpha_Beta_numpy = np.random.rand(m,2)
Alpha_Beta = torch.tensor(Alpha_Beta_numpy, requires_grad=True)

class LabelModel(nn.Module):
    def __init__(self):
        super(LabelModel, self).__init__()
#         self.sigmoid = torch.sigmoid()
        
    def forward(self, alpha_beta_array, lf_label, true_label):
        
        all_lf_probls = 1
        
        for lf_index in range(alpha_beta_array.shape[0]):
            
            lf_alpha = torch.sigmoid(alpha_beta_array[lf_index,0])
            lf_beta = torch.sigmoid(alpha_beta_array[lf_index,1])
            
            if lf_label[lf_index] == true_label:
                
                lf_prob = lf_alpha * lf_beta
            
            if lf_label[lf_index] == -true_label:
                
                lf_prob = (1 - lf_alpha) * lf_beta
            
            if lf_label[lf_index] == 0:
                
                lf_prob = 1 - lf_beta
        
            all_lf_probls = all_lf_probls * lf_prob
        
        
        return all_lf_probls

In [85]:
model = LabelModel()


In [90]:
model(alpha_beta_array=test_lf,
      lf_label=test_lf_label,
      true_label=test_label)

tensor(1., dtype=torch.float64)

In [81]:
test_lf.shape[0]

2