In [76]:
# ========== IMPORTING ==========
import os
import math
import zipfile
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text
from bokeh.plotting import figure, show
from bokeh.layouts import gridplot
import unittest
from bokeh.io import output_notebook
output_notebook()

In [77]:
# ========== Extracting Dataset ==========
ZIP_FILE_PATH = '/content/Dataset2.zip'
EXTRACT_DIR = '/content/unzipDataset'

os.makedirs(EXTRACT_DIR, exist_ok=True)

with zipfile.ZipFile(ZIP_FILE_PATH, 'r') as zip_ref:
    zip_ref.extractall(EXTRACT_DIR)

train_df = pd.read_csv(os.path.join(EXTRACT_DIR, 'train.csv'))
ideal_df = pd.read_csv(os.path.join(EXTRACT_DIR, 'ideal.csv'))
test_df = pd.read_csv(os.path.join(EXTRACT_DIR, 'test.csv'))

print("Training Data Sample:")
print(train_df.head())
print("\nIdeal Functions Sample:")
print(ideal_df.head())
print("\nTest Data Sample:")
print(test_df.head())

Training Data Sample:
      x         y1         y2         y3        y4
0 -20.0  39.778572 -40.078590 -20.214268 -0.324914
1 -19.9  39.604813 -39.784000 -20.070950 -0.058820
2 -19.8  40.099070 -40.018845 -19.906782 -0.451830
3 -19.7  40.151100 -39.518402 -19.389118 -0.612044
4 -19.6  39.795662 -39.360065 -19.815890 -0.306076

Ideal Functions Sample:
      x        y1        y2        y3        y4        y5        y6        y7  \
0 -20.0 -0.912945  0.408082  9.087055  5.408082 -9.087055  0.912945 -0.839071   
1 -19.9 -0.867644  0.497186  9.132356  5.497186 -9.132356  0.867644 -0.865213   
2 -19.8 -0.813674  0.581322  9.186326  5.581322 -9.186326  0.813674 -0.889191   
3 -19.7 -0.751573  0.659649  9.248426  5.659649 -9.248426  0.751573 -0.910947   
4 -19.6 -0.681964  0.731386  9.318036  5.731386 -9.318036  0.681964 -0.930426   

         y8        y9  ...        y41        y42       y43       y44  \
0 -0.850919  0.816164  ... -40.456474  40.204040  2.995732 -0.008333   
1  0.168518  0.9

In [78]:
# ========== Setup SQLite database engine ==========
engine = create_engine('sqlite:///fitting_results.db', echo=False)

# ========== Save dataframes to SQLite ==========
train_df.to_sql('training_data', con=engine, if_exists='replace', index=False)
ideal_df.to_sql('ideal_functions', con=engine, if_exists='replace', index=False)

# ========== Create test_mapping table if it doesn't exist ==========
with engine.connect() as conn:
    conn.execute(text("""
        CREATE TABLE IF NOT EXISTS test_mapping (
            X REAL,
            Y REAL,
            Delta_Y REAL,
            Ideal_func INTEGER
        )
    """))
    conn.commit()

In [79]:
# ========== Custom Exception for data processing ==========
class DataProcessingError(Exception):
    pass

In [80]:
# ========== Function Fitter class ==========
class FunctionFitter:
    """
    Fits the four training functions (y1..y4) to the best matching ideal functions (y1..y50)
    by minimizing sum of squared deviations.
    """
    def __init__(self, train_df, ideal_df, engine):
        self.train_df = train_df
        self.ideal_df = ideal_df
        self.engine = engine
        self.selected_funcs_indices = []

    def least_squares(self, y_true, y_pred):
        """Calculate sum of squared errors."""
        return np.sum((y_true - y_pred) ** 2)

    def find_best_fits(self):
        """Find the ideal function indices (1 to 50) best fitting each training function y1..y4."""
        try:
            selected = []
            for train_i in range(1, 5):  # y1 to y4
                train_y = self.train_df[f'y{train_i}'].values
                min_error = float('inf')
                best_idx = None

                for ideal_j in range(1, 51):  # y1 to y50
                    ideal_y = self.ideal_df[f'y{ideal_j}'].values
                    error = self.least_squares(train_y, ideal_y)

                    if error < min_error:
                        min_error = error
                        best_idx = ideal_j

                selected.append(best_idx)

            self.selected_funcs_indices = selected
        except Exception as e:
            raise DataProcessingError(f"Error finding best fits: {e}")

    def max_abs_deviation(self, train_y, ideal_y):
        """Calculate maximum absolute deviation between training and ideal y-values."""
        return np.max(np.abs(train_y - ideal_y))


In [81]:
# ========== Test Data Mapper class ==========
class TestDataMapper(FunctionFitter):
    """
    Maps test points to one of the selected ideal functions if deviation criterion is met.
    """
    def __init__(self, train_df, ideal_df, test_df, engine):
        super().__init__(train_df, ideal_df, engine)
        self.test_df = test_df

    def map_test_data(self):
        if not self.selected_funcs_indices:
            raise DataProcessingError("Best fit functions not selected before mapping.")

        try:
            max_devs = []
            for i, ideal_idx in enumerate(self.selected_funcs_indices, start=1):
                train_y = self.train_df[f'y{i}'].values
                ideal_y = self.ideal_df[f'y{ideal_idx}'].values
                max_devs.append(self.max_abs_deviation(train_y, ideal_y))

            mappings = []
            for _, row in self.test_df.iterrows():
                x_test, y_test = row['x'], row['y']

                best_func = None
                best_dev = float('inf')

                for i, ideal_idx in enumerate(self.selected_funcs_indices):
                    ideal_rows = self.ideal_df[self.ideal_df['x'] == x_test]
                    if ideal_rows.empty:
                        continue
                    ideal_y = ideal_rows.iloc[0][f'y{ideal_idx}']

                    deviation = abs(y_test - ideal_y)
                    threshold = max_devs[i] * math.sqrt(2)

                    if deviation <= threshold and deviation < best_dev:
                        best_dev = deviation
                        best_func = ideal_idx

                if best_func is not None:
                    mappings.append((x_test, y_test, best_dev, best_func))

            # Save mappings to DB
            with self.engine.connect() as conn:
                conn.execute(text("DELETE FROM test_mapping"))
                for x, y, delta_y, func_no in mappings:
                    conn.execute(
                        text("INSERT INTO test_mapping (X, Y, Delta_Y, Ideal_func) VALUES (:x, :y, :dy, :fn)"),
                        {"x": x, "y": y, "dy": delta_y, "fn": func_no}
                    )
                conn.commit()

            return mappings
        except Exception as e:
            raise DataProcessingError(f"Error mapping test data: {e}")

In [82]:
# ========== Visualization class ==========
from bokeh.io import output_notebook
from bokeh.plotting import figure, show
from bokeh.layouts import gridplot

class Visualizer:
    def __init__(self, train_df, ideal_df, selected_indices, test_mappings):
        self.train_df = train_df
        self.ideal_df = ideal_df
        self.selected_indices = selected_indices
        self.test_mappings = test_mappings

    def plot(self):
        output_notebook()  # Enable notebook output

        # Define separate color palettes for clarity
        train_colors = ['navy', 'olive', 'firebrick', 'goldenrod']
        ideal_colors = ['purple', 'teal', 'darkorange', 'darkgreen']
        test_colors = ['crimson', 'darkcyan', 'darkmagenta', 'darkgoldenrod']

        # Plot training functions
        p_train = figure(title="Training Functions", x_axis_label='x', y_axis_label='y')
        for i, color in enumerate(train_colors, start=1):
            p_train.line(self.train_df['x'], self.train_df[f'y{i}'], legend_label=f'Train y{i}', color=color)

        # Plot selected ideal functions
        p_ideal = figure(title="Selected Ideal Functions", x_axis_label='x', y_axis_label='y')
        for i, idx in enumerate(self.selected_indices):
            color = ideal_colors[i % len(ideal_colors)]
            p_ideal.line(self.ideal_df['x'], self.ideal_df[f'y{idx}'], legend_label=f'Ideal y{idx}', color=color)

        # Plot test data points mapped to ideal functions using scatter to avoid deprecation warning
        p_test = figure(title="Test Data Mapping", x_axis_label='x', y_axis_label='y')
        func_color_map = {func_idx: test_colors[i % len(test_colors)] for i, func_idx in enumerate(self.selected_indices)}

        for x, y, _, func_idx in self.test_mappings:
            color = func_color_map.get(func_idx, 'black')
            p_test.scatter([x], [y], marker="circle", size=6, color=color, alpha=0.6)

        # Arrange the three plots horizontally
        grid = gridplot([[p_train, p_ideal, p_test]], sizing_mode='stretch_width')

        # Show the grid of plots
        show(grid)

In [83]:
def main():
    try:
        fitter = TestDataMapper(train_df, ideal_df, test_df, engine)
        fitter.find_best_fits()
        print("Selected ideal functions indices:", fitter.selected_funcs_indices)

        mappings = fitter.map_test_data()
        print(f"Number of test points mapped: {len(mappings)}")

        visualizer = Visualizer(train_df, ideal_df, fitter.selected_funcs_indices, mappings)
        visualizer.plot()

    except DataProcessingError as e:
        print("Data processing error: ", e)
    except Exception as e:
        print("Unexpected error: ", e)

In [84]:
if __name__ == "__main__":
    main()

Selected ideal functions indices: [42, 41, 11, 48]
Number of test points mapped: 48


In [85]:
# ========== Unit tests section ==========
class TestFunctionFitter(unittest.TestCase):
    def setUp(self):
        self.train_df = pd.DataFrame({
            'x': np.arange(3),
            'y1': np.array([1, 2, 3]),
            'y2': np.array([2, 3, 4]),
            'y3': np.array([3, 4, 5]),
            'y4': np.array([4, 5, 6]),
        })

        data = {'x': np.arange(3)}
        for i in range(1, 51):
            if i <= 4:
                data[f'y{i}'] = self.train_df[f'y{i}'].values
            else:
                data[f'y{i}'] = np.zeros(3)
        self.ideal_df = pd.DataFrame(data)

        self.engine = None  # No DB interaction in these tests
        self.fitter = FunctionFitter(self.train_df, self.ideal_df, self.engine)

    def test_least_squares(self):
        y_true = np.array([1, 2, 3])
        y_pred = np.array([1, 2, 3])
        self.assertEqual(self.fitter.least_squares(y_true, y_pred), 0)

    def test_find_best_fits(self):
        self.fitter.find_best_fits()
        self.assertEqual(len(self.fitter.selected_funcs_indices), 4)
        for idx in self.fitter.selected_funcs_indices:
            self.assertTrue(1 <= idx <= 50)

if __name__ == "__main__":
    unittest.main(argv=[''], verbosity=2, exit=False)

test_find_best_fits (__main__.TestFunctionFitter.test_find_best_fits) ... ok
test_least_squares (__main__.TestFunctionFitter.test_least_squares) ... ok

----------------------------------------------------------------------
Ran 2 tests in 0.011s

OK
