# Лабораторна робота №6: ANFIS для варіанту №1

In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt


In [2]:
df = pd.read_csv("anfis_variant1_data.csv")
df.head()


Unnamed: 0,IOC(0),IPC(0),KVVE(-7),M2(-7),IPC(+1)
0,194.935538,78.736345,473.80323,332.750702,54.24329
1,127.479018,124.653052,456.118962,271.936792,51.657539
2,177.933181,70.042578,402.224072,251.223605,45.041167
3,109.042026,86.984651,445.323028,248.314053,45.660932
4,185.73095,126.994302,403.381931,330.144884,57.520285


In [4]:
X = df[['IOC(0)', 'IPC(0)', 'KVVE(-7)', 'M2(-7)']].values
y = df['IPC(+1)'].values

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


In [6]:
!pip install anfis

Collecting anfis
  Downloading anfis-0.3.1-py3-none-any.whl.metadata (756 bytes)
Collecting scikit-fuzzy (from anfis)
  Downloading scikit_fuzzy-0.5.0-py2.py3-none-any.whl.metadata (2.6 kB)
Downloading anfis-0.3.1-py3-none-any.whl (7.4 kB)
Downloading scikit_fuzzy-0.5.0-py2.py3-none-any.whl (920 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m920.8/920.8 kB[0m [31m12.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: scikit-fuzzy, anfis
Successfully installed anfis-0.3.1 scikit-fuzzy-0.5.0


In [23]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import mean_squared_error

import anfis
import membership

In [24]:
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).view(-1, 1)

In [25]:
def train_anfis_model(n_rules):

    input_mfs = []
    for i in range(X.shape[1]):
        input_mfs.append(
            [BellMembFunc(a=1, b=1, c=np.random.rand()*0.5 + 0.5) for _ in range(n_rules)]
        )

    model = AnfisNet(n_inputs=X.shape[1], n_rules=n_rules, input_mfs=input_mfs)
    optimizer = optim.Adam(model.parameters(), lr=0.01)
    criterion = nn.MSELoss()

    n_epochs = 300
    loss_history = []

    for epoch in range(n_epochs):
        model.train()
        optimizer.zero_grad()
        y_pred = model(X_train_tensor)
        loss = criterion(y_pred, y_train_tensor)
        loss.backward()
        optimizer.step()
        loss_history.append(loss.item())

    return model, loss_history

In [27]:
class BellMembFunc(torch.nn.Module):
    '''
        Generalised Bell membership function; defined by three parameters:
            a, the half-width (at the crossover point)
            b, controls the slope at the crossover point (which is -b/2a)
            c, the center point
    '''
    def __init__(self, a, b, c):
        super(BellMembFunc, self).__init__()
        self.register_parameter('a', _mk_param(a))
        self.register_parameter('b', _mk_param(b))
        self.register_parameter('c', _mk_param(c))
        self.b.register_hook(BellMembFunc.b_log_hook)

    @staticmethod
    def b_log_hook(grad):
        '''
            Possibility of a log(0) in the grad for b, giving a nan.
            Fix this by replacing any nan in the grad with ~0.
        '''
        grad[torch.isnan(grad)] = 1e-9
        return grad

    def forward(self, x):
        dist = torch.pow((x - self.c)/self.a, 2)
        return torch.reciprocal(1 + torch.pow(dist, self.b))

    def pretty(self):
        return 'BellMembFunc {} {} {}'.format(self.a, self.b, self.c)


def make_bell_mfs(a, b, clist):
    '''Return a list of bell mfs, same (a,b), list of centers'''
    return [BellMembFunc(a, b, c) for c in clist]

In [32]:
def _mk_param(val):
    '''Make a torch parameter from a scalar value'''
    if isinstance(val, torch.Tensor):
        val = val.item()
    return torch.nn.Parameter(torch.tensor(val, dtype=torch.float))

In [34]:
class AnfisNet:
    '''
        This is a container for the 5 layers of the ANFIS net.
        The forward pass maps inputs to outputs based on current settings,
        and then fit_coeff will adjust the TSK coeff using LSE.
    '''
    def __init__(self, description, invardefs, outvarnames, hybrid=True):
        super(AnfisNet, self).__init__()
        self.description = description
        self.outvarnames = outvarnames
        self.hybrid = hybrid
        varnames = [v for v, _ in invardefs]
        mfdefs = [FuzzifyVariable(mfs) for _, mfs in invardefs]
        self.num_in = len(invardefs)
        self.num_rules = np.prod([len(mfs) for _, mfs in invardefs])
        if self.hybrid:
            cl = ConsequentLayer(self.num_in, self.num_rules, self.num_out)
        else:
            cl = PlainConsequentLayer(self.num_in, self.num_rules, self.num_out)
        self.layer = torch.nn.ModuleDict(OrderedDict([
            ('fuzzify', FuzzifyLayer(mfdefs, varnames)),
            ('rules', AntecedentLayer(mfdefs)),
            # normalisation layer is just implemented as a function.
            ('consequent', cl),
            # weighted-sum layer is just implemented as a function.
            ]))

    @property
    def num_out(self):
        return len(self.outvarnames)

    @property
    def coeff(self):
        return self.layer['consequent'].coeff

    @coeff.setter
    def coeff(self, new_coeff):
        self.layer['consequent'].coeff = new_coeff

    def fit_coeff(self, x, y_actual):
        '''
            Do a forward pass (to get weights), then fit to y_actual.
            Does nothing for a non-hybrid ANFIS, so we have same interface.
        '''
        if self.hybrid:
            self(x)
            self.layer['consequent'].fit_coeff(x, self.weights, y_actual)

    def input_variables(self):
        '''
            Return an iterator over this system's input variables.
            Yields tuples of the form (var-name, FuzzifyVariable-object)
        '''
        return self.layer['fuzzify'].varmfs.items()

    def output_variables(self):
        '''
            Return an list of the names of the system's output variables.
        '''
        return self.outvarnames

    def extra_repr(self):
        rstr = []
        vardefs = self.layer['fuzzify'].varmfs
        rule_ants = self.layer['rules'].extra_repr(vardefs).split('\n')
        for i, crow in enumerate(self.layer['consequent'].coeff):
            rstr.append('Rule {:2d}: IF {}'.format(i, rule_ants[i]))
            rstr.append(' '*9+'THEN {}'.format(crow.tolist()))
        return '\n'.join(rstr)

    def forward(self, x):
        '''
            Forward pass: run x thru the five layers and return the y values.
            I save the outputs from each layer to an instance variable,
            as this might be useful for comprehension/debugging.
        '''
        self.fuzzified = self.layer['fuzzify'](x)
        self.raw_weights = self.layer['rules'](self.fuzzified)
        self.weights = F.normalize(self.raw_weights, p=1, dim=1)
        self.rule_tsk = self.layer['consequent'](x)
        # y_pred = self.layer['weighted_sum'](self.weights, self.rule_tsk)
        y_pred = torch.bmm(self.rule_tsk, self.weights.unsqueeze(2))
        self.y_pred = y_pred.squeeze(2)
        return self.y_pred

In [36]:
def train_anfis_model(n_rules):

    input_mfs = []
    for i in range(X.shape[1]):
        input_mfs.append([BellMembFunc(a=1, b=1, c=np.random.uniform(0.5, 1.5)) for _ in range(n_rules)])

    invardefs = [(f'x{i}', input_mfs[i]) for i in range(X.shape[1])]
    outvarnames = ['y']
    description = f"ANFIS with {n_rules} rules"

    model = AnfisNet(description=description, invardefs=invardefs, outvarnames=outvarnames, hybrid=False)

    optimizer = optim.Adam(model.parameters(), lr=0.01)
    criterion = nn.MSELoss()

    n_epochs = 300
    loss_history = []

    for epoch in range(n_epochs):
        model.train()
        optimizer.zero_grad()
        y_pred = model(X_train_tensor)
        loss = criterion(y_pred, y_train_tensor)
        loss.backward()
        optimizer.step()
        loss_history.append(loss.item())

    return model, loss_history

In [39]:
import torch.nn as nn

class FuzzifyVariable(nn.Module):
    def __init__(self, mfs):
        super(FuzzifyVariable, self).__init__()
        self.mfs = nn.ModuleList(mfs)

    def forward(self, x):
        return torch.stack([mf(x) for mf in self.mfs], dim=1)

In [41]:
import torch
import torch.nn as nn

class PlainConsequentLayer(nn.Module):
    def __init__(self, in_features, num_rules, out_features):
        super().__init__()
        self.linear = nn.Linear(in_features * num_rules, out_features)

    def forward(self, x):
        # x: (batch_size, num_rules, in_features)
        batch_size = x.size(0)
        x = x.view(batch_size, -1)  # flatten all rule inputs
        return self.linear(x)

In [44]:
from collections import OrderedDict

In [47]:
from anfis_complete import AnfisNet, BellMembFunc

In [49]:
results = {}

for n_rules in [10, 20, 40]:
    model, loss_history = train_anfis_model(n_rules)

    with torch.no_grad():
        y_pred = model(X_test_tensor).numpy().flatten()
        mse = mean_squared_error(y_test, y_pred)

    results[n_rules] = {
        "model": model,
        "loss": loss_history,
        "mse": mse
    }

    print(f"Number of rules: {n_rules}, MSE: {mse:.4f}")

RuntimeError: The size of tensor a (100) must match the size of tensor b (10) at non-singleton dimension 2