In [1]:
import random
#from .src.pyModelCalibrate.model.histogram_binning import HistogramBinningCalibrator

In [39]:
def get_uniform_mass_partitions(samples: list, partition_num: int, decreasing: bool = False) -> tuple:
    """
    Given a list of probabilities and the corresponding class labels, this function partition the probabilities
    into `partition_num` equal-mass partitions and return a tuple containing the sorted probabilities, sorted labels,
    and partition IDs for each sample. The probabilities and labels are sorted in increasing order by default,
    but this can be changed by setting the `decreasing` parameter to False.

    Parameters:
    - samples (list[tuple]): A list of tuple containing probability and the label.
    - partition_num (int): The number of equal-mass partitions to create.
    - decreasing (bool): A flag indicating whether to sort the probabilities and labels in decreasing order. Default is
                        False.

    Returns:
    - tuple: A tuple containing the sorted probabilities, sorted labels, and partition IDs for each probability.
    """

    samples.sort(key=lambda x: x[0], reverse=decreasing)
    sorted_probs, sorted_labels = zip(*samples)

    partition_ids = [int(_iter / partition_num) for _iter in range(len(samples))]

    return sorted_probs, sorted_labels, partition_ids


def get_uniform_width_partitions(samples: list, width: float = None, partition_num: int = None,
                                 decreasing: bool = False) -> tuple:
    """
    Given a list of probabilities and the corresponding class labels, this function partition the probabilities
    into equal-width partitions and return a tuple containing the sorted probabilities, sorted labels, and partition IDs
    for each sample. The probabilities and labels are sorted in increasing order by default, but this can be
    changed by setting the `decreasing` parameter to False.

    By default, it uses width parameter if both width and partition_num is provided, otherwise it computes width if only
    partition_num is provided.

    Parameters:
    - samples (list[tuple]): A list of tuple containing probability and the label.
    - partition_num (int): The number of equal-width partitions to create.
    - width (float): width of each partition
    - decreasing (bool): A flag indicating whether to sort the probabilities and labels in decreasing order. Default is
                        False.

    Returns:
    - tuple: A tuple containing the sorted probabilities, sorted labels, and partition IDs for each probability.
    """

    samples.sort(key=lambda x: x[0], reverse=decreasing)

    sorted_probs, sorted_labels = zip(*samples)

    # Compute the minimum and maximum probability values
    min_prob = min(sorted_probs)
    max_prob = max(sorted_probs)

    # Compute the width of each partition
    if width is None & partition_num is None:
        raise ValueError(
            "Either pass width of the partition or the number of partitions in which samples set needs to be / "
            "partitioned")
    elif width is None:
        partition_width = (max_prob - min_prob) / partition_num
    else:
        partition_width = width

    partition_ids = [int((prob - min_prob) / partition_width) for prob in sorted_probs]

    return sorted_probs, sorted_labels, partition_ids


class Bin:

    def __init__(self, id: int) -> None:

        self.bin_id: int = id
        self.samples: list[tuple] = []
        self.calibrated_score = None

    @staticmethod
    def check_values(prob, label):

        if not isinstance(prob, (float, int)) :
            raise ValueError(f'variable prob contains value of incorrect datatype.\
                Expected float found {isinstance(prob)}')

        if not 0 <= prob <= 1:
            raise ValueError(f'Value out of Bound. Expected value between 0 and 1.')

        if not isinstance(label, (int)):
            raise ValueError(f'variable label contains value of incorrect datatype.\
                Expected float found {isinstance(label)}')

    def add_sample(self, prob: float, label: int) -> None:

        self.check_values(prob, label)

        self.samples.append((prob, label))

    def compute_calibrated_score(self) -> None:
        self.calibrated_score = sum(label for prob, label in self.samples)/ len(self.samples)


class HistogramBinningCalibrator:

    def __init__(self, probs, labels, partition_scheme='mass'):
        self.probs = probs
        self.labels = labels
        self.partition_scheme = partition_scheme

        # check for errors
        self.check_values(probs, labels)
        self.samples = [(prob, label) for prob, label in zip(probs, labels)]
        self.bins = {}

    @staticmethod
    def check_values(prob, label):
        if len(prob) != len(label):
            raise ValueError(f'Size mismatch. prob contains {len(prob)} elements \
                label contains {len(label)} element')

    def fit(self):

        # create partition
        if self.partition_scheme == 'mass':
            probs, labels, part_ids = get_uniform_mass_partitions(samples = self.samples,
                                                                 partition_num=3)
        elif self.partition_scheme == 'width':
            probs, labels, part_ids = get_uniform_width_partitions(probs=self.probs,
                                                                   labels=self.labels,
                                                                   )
        # learn bin wise statistics
        for part_id in set(part_ids):
            _bin = Bin(id=part_id)
            self.bins[part_id] = _bin
        
        for idx, part_id in enumerate(part_ids):
            self.bins[part_id].add_sample(prob=probs[idx],
                                           label=labels[idx])

        # compute calibrated score for bins
        for _, bin in self.bins.items():
            bin.compute_calibrated_score()

In [40]:
# params
sample_points = 100000

In [46]:
probs = [0.0442267438992181,
 0.11375459702752178,
 0.5890604639647622,
 0.7623402789039062,
 0.2745467442388353,
 0.47666111818209733,
 0.41523472132183836,
 0.6809798061913034,
 0.6354140010044562,
 0.8089442029746567,
 0.3218902641434721,
 0.8332911503277771,
 0.9174072903421372,
 0.33586681769294446,
 0.780364181301153,
 0.44062635697487884,
 0.052019066014013626,
 0.36230829720613256,
 0.7127661580738447,
 0.3384773022002108]

labels = [0, 1, 1, 1, 1, 0, 0, 1, 1, 0, 1, 1, 0, 0 ,1, 0 ,1, 1, 1, 0]

assert(len(probs)==len(labels))
#for iter in range(sample_points):
#    p = random.uniform(0,1)
#    l = random.sample([0,1],1)[0]
#    probs.append(p)
#    labels.append(l)

In [47]:
calibrator = HistogramBinningCalibrator(probs, labels)

In [48]:
calibrator.fit()

In [49]:
for i in range(len(calibrator.bins)):
    print(calibrator.bins[i].samples)
    print(calibrator.bins[i].calibrated_score)

[(0.0442267438992181, 0), (0.052019066014013626, 1), (0.11375459702752178, 1)]
0.6666666666666666
[(0.2745467442388353, 1), (0.3218902641434721, 1), (0.33586681769294446, 0)]
0.6666666666666666
[(0.3384773022002108, 0), (0.36230829720613256, 1), (0.41523472132183836, 0)]
0.3333333333333333
[(0.44062635697487884, 0), (0.47666111818209733, 0), (0.5890604639647622, 1)]
0.3333333333333333
[(0.6354140010044562, 1), (0.6809798061913034, 1), (0.7127661580738447, 1)]
1.0
[(0.7623402789039062, 1), (0.780364181301153, 1), (0.8089442029746567, 0)]
0.6666666666666666
[(0.8332911503277771, 1), (0.9174072903421372, 0)]
0.5
