In [1]:
import numpy as np
import pandas as pd
from sklearn.linear_model import Perceptron
import Perceptron.perceptron as pn
from Perceptron.data_gen import Universe, separable_regression, data_distribution
from sklearn.metrics import accuracy_score, mean_absolute_error, mean_squared_error, zero_one_loss
from sklearn import datasets
import matplotlib.pyplot as plt
from collections import defaultdict
import pickle
import math
from tqdm import tqdm
import argparse
import random

## Data Corruption Experiment
from typing import List, Tuple
import random


In [2]:
# Utility Functions
def pickle_data(
    root_dir, 
    results,
    args):
    
    # Make sure it is a directory!
    if root_dir[-1] != '/':
        root_dir += '/'
    
    # Create pickle structure
    pkl = {
        'results': dict(results),
        'args':    args,
    }
    
    
    # Create file name

    file_name = f"{args.label}_test_size_{args.test_size}.pkl"
    
    with open(f"{root_dir}{file_name}", 'wb') as pkl_file:
        pickle.dump(pkl, pkl_file)


In [3]:
# Experiment required functions
def sample_data(
    lows:      List[float],
    highs:     List[float],
    n_samples: int,
    seed:      int=None
) -> List[List[int]]:
    """Sample uniform distribution bounded by lows and highs
    
        Using a uniform distribution, perform sampling over the 
    distribution such that the space the distribution is sampling will 
    be bounded by the given bounds from the lows and highs. Lows and 
    highs will be arrays that contain the minimum and maximum values 
    per dimension on the data to be samples. For example, if we have 4 
    values in both lows and highs, then, at the time of sampling n_samples
    samples we will have n_samples of 4 attributes each: (n_samples, 4).
    """
    
    assert len(lows) == len(highs), f"Non-matching lows and highs: {len(lows) != {len(highs)}}"
    
    rng = np.random.default_rng(seed)
    data_shape = (n_samples, len(lows)) # See assertion #1
    data = rng.uniform(lows, highs, data_shape)
    return data

# splitting the dataset into bins can be done with: np.split(data, n_buckets)
# Recommend shuffling beforehand tho.

class Concept:
    """Label given data
    Using a model as truth, label given data.
    """
    def __init__(self, model):
        self.model = model
        
    def __call__(self, X):
        return self.model.solve(X)

In [17]:
# Set up learning concept
ins = 4 + 1 # +1 because of bias!
rng = np.random.default_rng(42) # For reproducibility
W = np.concatenate([rng.uniform(-100, 100, (ins-1, 1)), [[1]]])

truth = pn.PocketPerceptron()
truth.pi = truth.W = W
c = Concept(truth)


# Create dataset
lows  = [-10, -10, -10, -10] + [1]
highs = [10, 10, 10, 10] + [1]
n_samples = 100
n_buckets = 10

assert len(lows) == len(highs) == ins, \
    f"Data dimensions do not match concept's: {ins} vs {len(lows)} vs {len(highs)}"
# We sample separately the data from the uniform distribution. Then, we label according
# to the concept (perceptron with weights W)
train_data   = sample_data(lows, highs, n_samples=n_samples, seed=42)
train_data   = np.split(train_data, n_buckets)
train_data   = np.array(train_data)
train_labels = c(train_data)
test_data    = sample_data(lows, highs, n_samples=n_samples//6, seed=42)
test_labels  = c(test_data)

# Experiment
n_runs = 25
seed = 42

rng = np.random.default_rng(seed) # For reproducibility
for run in range(n_runs):
    for buckets in range(1, n_buckets):
        indices = rng.choice(range(1, n_buckets), size=buckets, replace=False)
        remaining_data = np.concatenate(train_data[indices])
        # Data correctly reshapes to be put together.
        

# Note: train_data has buckets, test_data does not: only instances.

> [0;32m/tmp/ipykernel_118573/3073147010.py[0m(34)[0;36m<module>[0;34m()[0m
[0;32m     32 [0;31m[0mrng[0m [0;34m=[0m [0mnp[0m[0;34m.[0m[0mrandom[0m[0;34m.[0m[0mdefault_rng[0m[0;34m([0m[0mseed[0m[0;34m)[0m [0;31m# For reproducibility[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m     33 [0;31m[0;32mfor[0m [0mrun[0m [0;32min[0m [0mrange[0m[0;34m([0m[0mn_runs[0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m---> 34 [0;31m    [0;32mfor[0m [0mbuckets[0m [0;32min[0m [0mrange[0m[0;34m([0m[0;36m1[0m[0;34m,[0m [0mn_buckets[0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m     35 [0;31m        [0mindices[0m [0;34m=[0m [0mrng[0m[0;34m.[0m[0mchoice[0m[0;34m([0m[0mrange[0m[0;34m([0m[0;36m1[0m[0;34m,[0m [0mn_buckets[0m[0;34m)[0m[0;34m,[0m [0msize[0m[0;34m=[0m[0mbuckets[0m[0;34m,[0m [0mreplace[0m[0;34m=[0m[0;32mFalse[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m     36 [

BdbQuit: 

In [4]:

def experiment(
    X,
    y,
    metric,
    test_split:   float,
    buckets:       int,
    n_runs:        int,
    verbose:       bool,
    n_buckets:     int, 
    max_iter:      int,
    eta:           float,
    ):
    
    assert len(X) == len(y), 'Shapes of input data and labels does not match!'
    
    # Bukcetize data
    training_size = int(len(X)*0.8)
    testing_size  = len(X) - training_size 
    train, test = corrupt_data(universe_len=len(X), 
                               buckets=buckets,
                               test_split=test_split)
    
    # Create dictionary to store results
    exp_data = defaultdict(lambda : [])
    
    # Experiment
    for run in range(n_runs):
        if verbose > 0:
            print(f"Start of run {run}.")
        
        
        # begin bining
        empirical_score = []
        for bins in range(1, n_buckets):

            # Create model; No innate bias included!
            model = pn.PocketPerceptron(
                input=X.shape[-1], 
                eta=eta, 
                max_iter=max_iter
            ) 
            
            # Grab training data
            m      = np.concatenate(X[train[:bins]])
            labels = np.concatenate(y[train[:bins]])
            
            if verbose > 1:
                print(f"Training with {bins} buckets -- {len(m)}")
            
            # Train model
            model.train(m, labels)
            
            # Store risk data
            if testing_size: 
                pred = model.solve(X[test])
                exp_data[bins].append(metric(y[test], pred))
            
            else: # No empirical testing. Take error over all data.
                pred = model.solve(X)
                exp_data[bins].append(metric(y, pred))

            #true_score.append(accuracy_score(y, model.solve(X)))
        #import pdb; pdb.set_trace()
    return dict(exp_data)
