Preprocessing Steps -Binning Algorithms,.min-max..Normalization Techniques, Hypothesis Testing, ChiSquare Test, Confusion Matrix,
Implement Dimensionality reduction using Principle component Analysis method on a dataset iris
Implement and demonstrate the FIND-S algorithm for finding the most specific hypothesis based on a given set of training data samples. Read the training data from a .CSV file.
For a given set of training data examples stored in a .CSV file, implement and demonstrate the Candidate-Elimination algorithm to output a description of the set of all hypotheses consistent with the training examples.


In [6]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.datasets import load_iris
from collections import defaultdict

# Load the dataset
try:
    df = pd.read_csv('dataset/airlines.csv')
except FileNotFoundError:
    print("Dataset not found. Please make sure 'airlines_flights_data.csv' is in the correct directory.")
    df = None # Set df to None if file not found

# Check if the dataset was loaded successfully
if df is not None:
    # --- Preprocessing Steps ---
    
    # Display available columns
    print("Available columns:", df.columns.tolist())
    print("First few rows:")
    print(df.head())

    # 1. Binning Algorithms (Example: Equal-width binning for 'Flight Price')
    # Convert 'Flight Price' to numeric, forcing errors to NaN
    df['Flight Price'] = pd.to_numeric(df['Flight Price'], errors='coerce')
    
    # Fill NaN values with median price to ensure binning works
    median_price = df['Flight Price'].median()
    if pd.isna(median_price):
        # If all prices are NaN, use a default value
        df['Flight Price'].fillna(10000, inplace=True)
    else:
        df['Flight Price'].fillna(median_price, inplace=True)
    
    # Now perform binning
    df['price_binned'] = pd.cut(df['Flight Price'], bins=5, labels=False)

    # 2. Min-Max Normalization Techniques (Example: 'Duration Time')
    scaler = MinMaxScaler()
    # Convert Duration Time to numeric first
    df['Duration Time'] = pd.to_numeric(df['Duration Time'], errors='coerce')
    df['duration_normalized'] = scaler.fit_transform(df[['Duration Time']])

    # --- Hypothesis Testing (Example: Comparing mean duration for two airlines) ---
    # This is a basic example, a proper hypothesis test requires more rigorous statistical methods
    # and checking of assumptions.
    airline1 = 'SpiceJet'
    airline2 = 'IndiGo'

    duration_airline1 = df[df['Company'] == airline1]['Duration Time']
    duration_airline2 = df[df['Company'] == airline2]['Duration Time']

    # Simple comparison of means
    mean_duration_airline1 = duration_airline1.mean()
    mean_duration_airline2 = duration_airline2.mean()

    print(f"\n--- Hypothesis Testing (Simple Mean Comparison) ---")
    print(f"Mean duration for {airline1}: {mean_duration_airline1:.2f}")
    print(f"Mean duration for {airline2}: {mean_duration_airline2:.2f}")
    print("Note: This is a basic comparison, not a formal statistical hypothesis test.")


    # --- Chi-Square Test (Manual Implementation Example) ---
    # This example performs a chi-square test of independence between 'Company' and 'Cabin Class'.
    # A more complete implementation would involve calculating expected frequencies and the chi-square statistic.
    print(f"\n--- Chi-Square Test (Manual Implementation Example) ---")
    contingency_table = pd.crosstab(df['Company'], df['Cabin Class'])
    print("Contingency Table:")
    print(contingency_table)

    # Manual Chi-Square calculation
    chi2_statistic = 0
    rows = contingency_table.shape[0]
    cols = contingency_table.shape[1]
    row_sums = contingency_table.sum(axis=1)
    col_sums = contingency_table.sum(axis=0)
    total_sum = contingency_table.sum().sum()

    for i in range(rows):
        for j in range(cols):
            observed_frequency = contingency_table.iloc[i, j]
            expected_frequency = (row_sums[i] * col_sums[j]) / total_sum
            if expected_frequency != 0:
                chi2_statistic += ((observed_frequency - expected_frequency) ** 2) / expected_frequency

    print(f"\nCalculated Chi-Square Statistic: {chi2_statistic:.2f}")


    # --- Confusion Matrix (Manual Implementation Example) ---
    # This requires a classification task and predicted vs actual values.
    # As we don't have a classification model trained yet, we will create a dummy example
    print(f"\n--- Confusion Matrix (Manual Implementation Example) ---")

    # Create dummy true and predicted labels for demonstration
    # In a real scenario, these would come from a trained classification model
    true_labels = np.random.choice(['Economy', 'Business'], size=100)
    predicted_labels = np.random.choice(['Economy', 'Business'], size=100)

    # Define unique classes
    classes = sorted(list(set(true_labels) | set(predicted_labels)))
    n_classes = len(classes)

    # Initialize confusion matrix
    conf_matrix = np.zeros((n_classes, n_classes), dtype=int)

    # Populate confusion matrix
    for true, pred in zip(true_labels, predicted_labels):
        true_idx = classes.index(true)
        pred_idx = classes.index(pred)
        conf_matrix[true_idx, pred_idx] += 1

    print("Confusion Matrix:")
    print(pd.DataFrame(conf_matrix, index=classes, columns=classes))

    # Calculate metrics from confusion matrix (example for binary classification)
    if n_classes == 2:
        tn = conf_matrix[0, 0]
        fp = conf_matrix[0, 1]
        fn = conf_matrix[1, 0]
        tp = conf_matrix[1, 1]

        accuracy = (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) > 0 else 0
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

        print(f"\nAccuracy: {accuracy:.2f}")
        print(f"Precision: {precision:.2f}")
        print(f"Recall: {recall:.2f}")
        print(f"F1-Score: {f1_score:.2f}")
    else:
        print("\nMetrics like Accuracy, Precision, Recall, F1-Score are more commonly calculated for binary classification.")

# --- Implement Dimensionality reduction using Principle component Analysis method on a dataset iris ---

print(f"\n--- Principal Component Analysis (PCA) on Iris Dataset ---")
# Load the iris dataset
iris = load_iris()
X_iris = iris.data
y_iris = iris.target
feature_names = iris.feature_names

# Standardize the data (important for PCA)
mean = np.mean(X_iris, axis=0)
std = np.std(X_iris, axis=0)
X_scaled = (X_iris - mean) / std

# Calculate the covariance matrix
cov_matrix = np.cov(X_scaled.T)

# Calculate eigenvalues and eigenvectors
eigenvalues, eigenvectors = np.linalg.eig(cov_matrix)

# Sort eigenvectors by decreasing eigenvalues
sorted_indices = np.argsort(eigenvalues)[::-1]
sorted_eigenvalues = eigenvalues[sorted_indices]
sorted_eigenvectors = eigenvectors[:, sorted_indices]

# Choose the number of components (e.e., 2 for visualization)
n_components = 2
principal_components = sorted_eigenvectors[:, :n_components]

# Project the data onto the principal components
X_pca = np.dot(X_scaled, principal_components)

# Display the first few rows of the transformed data
print("Original data shape:", X_iris.shape)
print("Transformed data shape:", X_pca.shape)
print("First 5 rows of transformed data:")
print(X_pca[:5])


# --- Implement and demonstrate the FIND-S algorithm ---

print(f"\n--- FIND-S Algorithm ---")
# Load training data from a CSV file (assuming 'finds_training_data.csv' exists)
# Example structure of 'finds_training_data.csv': attribute1,attribute2,...,target_concept
try:
    finds_df = pd.read_csv('/datasets/finds_training_data.csv')
except FileNotFoundError:
    print("\n'finds_training_data.csv' not found. Skipping FIND-S algorithm.")
    finds_df = None # Set to None if file not found

if finds_df is not None:
    # Assuming the last column is the target concept (e.e., 'EnjoySport')
    concepts = finds_df.iloc[:, :-1].values
    targets = finds_df.iloc[:, -1].values

    # Initialize the most specific hypothesis
    # Use the first positive example if available, otherwise initialize with the most specific hypothesis
    initial_hypothesis = None
    for i in range(len(targets)):
        if targets[i] == 'Yes': # Assuming 'Yes' is the positive class
            initial_hypothesis = concepts[i].copy()
            break

    if initial_hypothesis is None:
        print("No positive examples found in the training data for FIND-S.")
    else:
        hypothesis = initial_hypothesis

        # Iterate through the training examples
        for i in range(len(concepts)):
            if targets[i] == 'Yes':
                for j in range(len(hypothesis)):
                    # If the hypothesis attribute is not specific enough, generalize it
                    if hypothesis[j] != concepts[i][j]:
                        hypothesis[j] = '?'

        print("Most specific hypothesis:", hypothesis)


# --- Implement and demonstrate the Candidate-Elimination algorithm ---

print(f"\n--- Candidate-Elimination Algorithm ---")
# Load training data from a CSV file (assuming 'candidate_elimination_training_data.csv' exists)
# Example structure of 'candidate_elimination_training_data.csv': attribute1,attribute2,...,target_concept
try:
    ce_df = pd.read_csv('/datasets/candidate_elimination_training_data.csv')
except FileNotFoundError:
    print("\n'candidate_elimination_training_data.csv' not found. Skipping Candidate-Elimination algorithm.")
    ce_df = None # Set to None if file not found

if ce_df is not None:
    # Assuming the last column is the target concept
    ce_concepts = ce_df.iloc[:, :-1].values
    ce_targets = ce_df.iloc[:, -1].values

    # Initialize the general and specific boundary sets
    # Assuming attributes can take any value from the training data or '?'
    attribute_values = defaultdict(set)
    for concept in ce_concepts:
        for i, attr in enumerate(concept):
            attribute_values[i].add(attr)

    # Initialize G to the most general hypothesis (all '?')
    G = {tuple(['?' for _ in range(ce_concepts.shape[1])])}

    # Initialize S to the most specific hypothesis (empty set or the first positive example)
    S = set()
    for i in range(len(ce_targets)):
        if ce_targets[i] == 'Yes': # Assuming 'Yes' is the positive class
            S.add(tuple(ce_concepts[i]))
            break
    if not S:
         # If no positive examples, S can remain empty or be initialized differently depending on the problem.
         # For simplicity here, we'll assume there's at least one positive example in a valid dataset.
         # A more robust implementation would handle this case.
         print("Warning: No positive examples found for Candidate-Elimination. S remains empty.")


    # Helper function to check if hypothesis h is more general than hypothesis h1
    def is_more_general(h, h1):
        for i in range(len(h)):
            if h[i] != '?' and h[i] != h1[i]:
                return False
        return True

    # Helper function to check if hypothesis h is consistent with example (x, target)
    def is_consistent(h, x, target):
        match = True
        for i in range(len(h)):
            if h[i] != '?' and h[i] != x[i]:
                match = False
                break
        if match and target == 'Yes':
            return True
        elif not match and target == 'No':
            return True
        return False


    # Iterate through the training examples
    for i in range(len(ce_concepts)):
        x = ce_concepts[i]
        target = ce_targets[i]

        if target == 'Yes':
            # Remove from G any hypothesis inconsistent with the positive example
            G = {g for g in G if is_consistent(g, x, target)}

            # For each hypothesis s in S inconsistent with the positive example
            S_new = set()
            for s in S:
                if not is_consistent(s, x, target):
                    # Remove s from S and add minimal generalizations of s that are consistent
                    # and more specific than some hypothesis in G
                    for j in range(len(s)):
                        if s[j] == '?':
                            continue # Cannot specialize '?'
                        if s[j] != x[j]:
                            # Create a generalization by replacing s[j] with '?'
                            s_generalized = list(s)
                            s_generalized[j] = '?'
                            s_generalized = tuple(s_generalized)

                            # Add the generalized hypothesis if it's more specific than some g in G
                            # and not more general than any other hypothesis in S
                            is_minimal_generalization = True
                            for g in G:
                                if is_more_general(s_generalized, g):
                                     # Check if it's not more general than any other hypothesis in S
                                    for other_s in S_new.union(S - {s}): # Check against new and existing S hypotheses (excluding current s)
                                        if is_more_general(s_generalized, other_s):
                                            is_minimal_generalization = False
                                            break
                                    if is_minimal_generalization:
                                         S_new.add(s_generalized)
                                    break # Found a g that is more general, no need to check others
                            # If no g is more general, this generalization is not valid in this context
                            # based on the standard CE algorithm update for positive examples.
                            # The standard update adds minimal generalizations that are consistent
                            # and more specific than *some* hypothesis in G.
                else:
                    S_new.add(s) # If consistent, keep s

            S = S_new
            # Remove from S any hypothesis that is more general than another hypothesis in S
            S = {s for s in S if not any(is_more_general(s, s1) for s1 in S if s != s1)}


        elif target == 'No':
            # Remove from S any hypothesis inconsistent with the negative example
            S = {s for s in S if is_consistent(s, x, target)}

            # For each hypothesis g in G inconsistent with the negative example
            G_new = set()
            for g in G:
                if not is_consistent(g, x, target):
                    # Remove g from G and add minimal specializations of g that are consistent
                    # and more general than some hypothesis in S
                    for j in range(len(g)):
                        if g[j] == '?':
                            # Create a specialization by replacing '?' with each possible attribute value
                            for value in attribute_values[j]:
                                g_specialized = list(g)
                                g_specialized[j] = value
                                g_specialized = tuple(g_specialized)

                                # Add the specialized hypothesis if it's consistent and more general than some s in S
                                is_minimal_specialization = True
                                for s in S:
                                    if is_more_general(g_specialized, s):
                                        # Check if it's not more specific than any other hypothesis in G
                                        for other_g in G_new.union(G - {g}): # Check against new and existing G hypotheses (excluding current g)
                                            if is_more_general(other_g, g_specialized): # Note the order for specialization check
                                                is_minimal_specialization = False
                                                break
                                        if is_minimal_specialization:
                                             G_new.add(g_specialized)
                                        break # Found an s that is more specific, no need to check others
                else:
                    G_new.add(g) # If consistent, keep g

            G = G_new
            # Remove from G any hypothesis that is more specific than another hypothesis in G
            G = {g for g in G if not any(is_more_general(g1, g) for g1 in G if g != g1)}


    print("Final Specific Boundary (S):", S)
    print("Final General Boundary (G):", G)
else:
    print("\nSkipping data processing and algorithm implementations due to dataset not found.")

Available columns: ['Origin', 'Destination', 'Company', 'Departure Time', 'Arrival Time', 'Duration Time', 'Flight Price', 'Date', 'Cabin Class']
First few rows:
  Origin Destination   Company Departure Time Arrival Time Duration Time  \
0    BOM         DEL   IndiGo           08:30        10:25        1h 55m   
1    BOM         DEL   IndiGo           07:45        09:50        2h 05m   
2    BOM         DEL  Vistara           12:25        14:30        2h 05m   
3    BOM         DEL   IndiGo           10:05        12:15        2h 10m   
4    BOM         DEL   IndiGo           13:40        15:50        2h 10m   

  Flight Price        Date Cabin Class  
0        6,153  14-02-2022     Economy  
1        5,943  14-02-2022     Economy  
2        6,249  14-02-2022     Economy  
3        5,943  14-02-2022     Economy  
4        5,943  14-02-2022     Economy  

--- Hypothesis Testing (Simple Mean Comparison) ---
Mean duration for SpiceJet: nan
Mean duration for IndiGo: nan
Note: This is a basi

  return np.nanmean(a, axis, out=out, keepdims=keepdims)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df['Flight Price'].fillna(10000, inplace=True)
  return xp.asarray(numpy.nanmin(X, axis=axis))
  return xp.asarray(numpy.nanmax(X, axis=axis))
  expected_frequency = (row_sums[i] * col_sums[j]) / total_sum
