Required class for holding interval data & corresponding methods (e.g. merge)

In [2]:
class Interval:
    def __init__(self, lower_bound, upper_bound) -> None:

        if lower_bound > upper_bound:
            temp = lower_bound
            lower_bound = upper_bound
            upper_bound = temp

        self.lower_bound = float(lower_bound)
        self.upper_bound = float(upper_bound)
        
    def __str__(self) -> str:
        return f'[{self.lower_bound}, {self.upper_bound}]'
    
    def merge(self, other_interval):
        new_interval_lower_bound = min(self.lower_bound, float(other_interval.lower_bound))
        new_interval_upper_bound = max(self.upper_bound, float(other_interval.upper_bound))
        return Interval(new_interval_lower_bound, new_interval_upper_bound)
    
    def contains(self, number):
        return self.lower_bound <= number <= self.upper_bound
        
    def __eq__(self, other) -> bool:
        if not isinstance(other, Interval):
            return False
        return (abs(self.lower_bound - other.lower_bound) < 1e-10 and 
                abs(self.upper_bound - other.upper_bound) < 1e-10)

Function for counting rows at which values of passed column, fall in the passed interval

In [3]:
import pandas as pd
def count_occurence(interval: Interval, species_type: str, column_name: str, dataset: pd.DataFrame) -> int:
    return len([row for index, row in dataset.iterrows() if dict(row)['species'] == species_type and interval.contains(float(dict(row)[column_name]))])

Function for creating contingency table and returning its data + visual representation as string

In [4]:
def create_contingency_table(interval1: Interval, interval2: Interval,column_name: str, dataset: pd.DataFrame):
    from tabulate import tabulate

    headers = ['Interval']
    headers.extend(dataset['species'].unique().tolist())
    headers.append('Total')

    interval1_row = [
        str(interval1),
        count_occurence(interval1, headers[1], column_name, dataset),
        count_occurence(interval1, headers[2], column_name, dataset),
        count_occurence(interval1, headers[3], column_name, dataset),
        0
    ]
    
    interval1_row[4] = interval1_row[1] + interval1_row[2] + interval1_row[3]

    interval2_row = [
        str(interval2),
        count_occurence(interval2, headers[1], column_name, dataset),
        count_occurence(interval2, headers[2], column_name, dataset),
        count_occurence(interval2, headers[3], column_name, dataset),
        0
    ]

    interval2_row[4] = interval2_row[1] + interval2_row[2] + interval2_row[3]

    total_row = [
        '-',
        interval1_row[1] + interval2_row[1],
        interval1_row[2] + interval2_row[2],
        interval1_row[3] + interval2_row[3],
        interval1_row[4] + interval2_row[4],
    ]

    table_data = [interval1_row, interval2_row, total_row]

    table = tabulate(table_data, headers=headers, numalign='center', tablefmt='grid')

    return table, table_data

Function for calculating chi-square of two passed intervals

In [5]:
def get_chi_square(interval1: Interval, interval2: Interval,column_name: str, dataset: pd.DataFrame):
    contingency_table, table_data = create_contingency_table(interval1, interval2, column_name, dataset)

    total_freqs = table_data[2][4]
    chi_square = 0
    for row_index in range(0, 2):
        for column_index in range(1, 4):
            expected_frequency = table_data[row_index][4] * table_data[2][column_index] / total_freqs
            expected_frequency += 1e-10 # prevent division by zero
            chi_square += ((table_data[row_index][column_index] - expected_frequency)) ** 2 / expected_frequency

    return chi_square

Apriori algorithm with the help of created functions on all columns of the loop

In [6]:
def perform_chi_merge(df, max_intervals, chi_threshold):

    intervals = dict()

    for column in df.columns:

        if column == 'species':
            continue

        values = df[column].dropna().unique()
        intervals[column] = [Interval(v, v) for v in sorted(values)]

        reached_max_interval = False
        while not reached_max_interval:
            
            min_chi_square = tuple()
            for index in range(len(intervals[column])):
                
                if index == len(intervals[column]) - 1: # index + 1 will raise error at last iteration
                    continue
                interval1 = intervals[column][index]
                interval2 = intervals[column][index + 1]
                chi_square = get_chi_square(interval1, interval2, column, df)
                if min_chi_square == () or chi_square < min_chi_square[2]: # [2] = prev. level chi_square
                    min_chi_square = (interval1, interval2, chi_square)

            new_interval = min_chi_square[0].merge(min_chi_square[1])
            intervals[column][intervals[column].index(min_chi_square[0])] = new_interval
            intervals[column].remove(min_chi_square[1])

            reached_max_interval = len(intervals[column]) == max_intervals or min_chi_square[2] > chi_threshold

    return intervals

Loading the dataset, specifying the column headers and running the algorithm

In [8]:
df = pd.read_csv('./datasets/iris.csv', header=None)
df = df[0].str.replace('"', '').str.split(',', expand=True)
df.columns = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species']

intervals = perform_chi_merge(df, max_intervals=6, chi_threshold=5.991)

for column in intervals:
    print(f'intervals found for {column}:')
    for interval in intervals[column]:
        print(interval)

intervals found for sepal_length:
[4.3, 4.8]
[4.9, 4.9]
[5.0, 5.4]
[5.5, 5.7]
[5.8, 7.0]
[7.1, 7.9]
intervals found for sepal_width:
[2.0, 2.2]
[2.3, 2.4]
[2.5, 2.8]
[2.9, 2.9]
[3.0, 3.3]
[3.4, 4.4]
intervals found for petal_length:
[1.0, 1.9]
[3.0, 4.4]
[4.5, 4.7]
[4.8, 4.9]
[5.0, 5.1]
[5.2, 6.9]
intervals found for petal_width:
[0.1, 0.6]
[1.0, 1.3]
[1.4, 1.6]
[1.7, 1.7]
[1.8, 1.8]
[1.9, 2.5]
