In [84]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import random
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
df=pd.read_csv("concrete_data.csv", skiprows=1)
df.head()
df.shape
#features for prediction
X=df.iloc[:, :-1]
#compressive strength
y=df.iloc[:, -1]

print(X.head())


   cement  blast_furnace_slag  fly_ash  water  superplasticizer  \
0   540.0                 0.0      0.0  162.0               2.5   
1   540.0                 0.0      0.0  162.0               2.5   
2   332.5               142.5      0.0  228.0               0.0   
3   332.5               142.5      0.0  228.0               0.0   
4   198.6               132.4      0.0  192.0               0.0   

   coarse_aggregate  fine_aggregate   age  
0            1040.0            676.0   28  
1            1055.0            676.0   28  
2             932.0            594.0  270  
3             932.0            594.0  365  
4             978.4            825.5  360  


In [85]:
print(y.head())

0    79.99
1    61.89
2    40.27
3    41.05
4    44.30
Name: concrete_compressive_strength, dtype: float64


In [86]:
# Split data
X_train, X_test, y_train, y_test=train_test_split(X, y, test_size=0.3, random_state=42)

# Scale features
scale=StandardScaler()
X_train_scaled=scale.fit_transform(X_train)
X_test_scaled=scale.transform(X_test)

In [87]:
class activationFunction:
    def logisticFunction(x):
        return 1 / (1 + np.exp(-x))

    def reluFunction(x):
        return np.maximum(0, x)

    def hyperbolicFunction(x):
        return np.tanh(x)

    def leakyReLU(x, alpha=0.01):
        return np.maximum(x, alpha * x)

    def linearFunction(x):  # Identity function for regression output
        return x

In [88]:
class ArtificialNeuralNetwork:
    def __init__(self, layerSize, activationFunction):
        self.layerSize = layerSize
        self.activationFunction = activationFunction
        self.weights = [np.random.randn(layer_sizes[i], layer_sizes[i + 1]) for i in range(len(layer_sizes) - 1)]
        self.biases = [np.random.randn(1, layer_sizes[i + 1])  for i in range(len(layer_sizes) - 1)]

    def visualize_layers(self):
        for i in range(len(self.layerSize)-1):
            print("LAYER COUNT:  ", i,"    NODES: ",self.layerSize[i])
            print("  Weights Shape:   ", self.weights[i].shape)
            print("  Biases Shape:    ",self.biases[i].shape )
            print("  Weights= : ", self.weights[i])
            print("  Biases= ", self.biases[i])

    def forwardPropagation(self, x):
        for i in range(len(self.weights)):
            product=np.dot(x,self.weights[i])+self.biases[i]
            x=self.activationFunction[i](product)
        return x

In [89]:
layer_sizes = [8, 200, 80, 1]

activation_functions = [
    activationFunction.logisticFunction,
    activationFunction.reluFunction,
    activationFunction.linearFunction,
]
ann = ArtificialNeuralNetwork(layer_sizes, activation_functions)
ann.visualize_layers()

x_example = np.random.rand(8, 8)
output = ann.forwardPropagation(x_example)
print("ANN Output :", output)

LAYER COUNT:   0     NODES:  8
  Weights Shape:    (8, 200)
  Biases Shape:     (1, 200)
  Weights= :  [[-0.60577421 -0.79345947  0.38671319 ... -0.06676586 -0.12693171
  -0.22887286]
 [-1.65996931  0.60805712 -0.536855   ...  1.42327957  0.17057676
  -1.67237833]
 [-0.52914374  1.58035913 -0.1136529  ... -0.40490636  2.11167026
  -0.04825951]
 ...
 [-1.10041977  0.87928295  0.56615907 ... -2.0688253   0.90671153
  -0.02853852]
 [ 0.11632389 -0.46060376  1.30156804 ... -1.43932191  0.04382391
   1.18068967]
 [-1.14648586  0.51220223  0.37331072 ...  0.37750581  0.07942898
   2.18473898]]
  Biases=  [[-1.47594493e-01  9.55091780e-01  5.73865469e-01  2.51152051e-01
  -3.70623210e-01  2.76584430e-01  8.60291452e-02 -4.41264954e-01
   6.96787259e-01  1.72047067e+00  1.32224247e+00  1.06568342e+00
   1.38872622e+00 -1.88674369e+00 -3.96426369e-01 -1.13336185e+00
   1.89643325e+00 -1.20947122e-01  9.61033722e-01  4.69178083e-01
  -1.04676899e+00 -6.36458084e-01  7.22844899e-02 -1.76782492e+0

In [90]:
class lossFunction:
    def evaluate(self,yPred,yTrain):
        self.yPred=yPred
        self.yTrain=yTrain
class MeanAbsoluteError(lossFunction):
    def evaluate(self,yPred,yTrain):
        return np.mean(np.abs(yPred-yTrain))

In [91]:
#y_train = y_train.to_numpy().reshape(-1,)
y_pred = ann.forwardPropagation(X_train_scaled)
y_pred = y_pred.reshape(-1,)
# checking y_pred and y_train has the correct shape
print("y_pred shape:", y_pred.shape)
print("y_train shape:", y_train.shape)

# use to calculate the loss function
loss_function=MeanAbsoluteError()
loss=loss_function.evaluate(y_pred, y_train)
print("Loss:", loss)

y_pred shape: (721,)
y_train shape: (721,)
Loss: 42.815727366137494


In [92]:
class Particle:
    def __init__(self,vectorSize):
            self.particlePosition=np.random.rand(vectorSize)
            self.particleVelocity=np.random.rand(vectorSize)
            self.bestPosition=np.copy(self.particlePosition)
            self.informants=[]

In [102]:
def particleToAnn(particle, annLayers, activationFunctions):
    neuralNetwork = ArtificialNeuralNetwork(layerSize=annLayers, activationFunction=activationFunctions)
    weightBiasIndexCount = 0
    for i in range(len(annLayers) - 1):
        prevValue = annLayers[i]
        nextValue = annLayers[i + 1]
        weightRange = prevValue * nextValue
        print("TYPE:",type(particle.particlePosition), particle.particlePosition.shape, i)
        weight=particle.particlePosition[weightBiasIndexCount:weightBiasIndexCount + weightRange].reshape((prevValue, nextValue))
        weightBiasIndexCount += weightRange

        bias=particle.particlePosition[weightBiasIndexCount:weightBiasIndexCount + nextValue].reshape((1, nextValue))
        weightBiasIndexCount += nextValue

        neuralNetwork.weights[i] = weight
        neuralNetwork.biases[i] = bias
    return neuralNetwork

In [103]:
def calculateParameters(layerSize):
    sumWeights=0
    sumBiases=0
    for i in range(len(layerSize)-1):
         sumWeights+=layerSize[i]*layerSize[i+1]
         sumBiases+=layerSize[i+1]
    total_params=sumBiases+sumWeights
    return total_params
ann_layers=[8,200,80,1]
vector=calculateParameters(ann_layers)
print(vector)

17961


In [104]:
def assessFitness(particle, dataset, annLayers, activationFunctions, loss_function):
    x, y = dataset
    ann = particleToAnn(particle, annLayers, activationFunctions)
    predictions = ann.forwardPropagation(x)
    loss = loss_function.evaluate(predictions, y.reshape(-1, 1))
    return loss

In [105]:
class ParticleSwarmOptimisation:
    def __init__(self, swarmSize, alpha, beta, delta, gamma, jumpSize, informantCount, vectorSize):
        self.swarmSize = swarmSize
        self.alpha=alpha
        self.beta=beta
        self.delta=delta
        self.gamma=gamma
        self.jumpSize=jumpSize
        self.informantCount=informantCount
        self.vectorSize=vectorSize
        self.bestParticle=None
        self.bestFitness = float('-inf')

    def initInformants(self,informantCount,particleArray):
            informants=[]
            for p in particleArray:
                potentialInformants=[]
                for potInf in particleArray:
                    if potInf!=p:
                        potentialInformants.append(potInf)
                for i in range(informantCount):
                     informants.append(random.choice(potentialInformants))
                p.informants=informants

    def getBestInformant(self,particle,dataset,annLayers,activationFunctions,lossFunction):
          bestInf=None
          bestFitnessInf=float('-inf')
          for i in particle.informants:
              fitness = assessFitness(i,dataset,annLayers,activationFunctions,lossFunction)
              if fitness > bestFitnessInf:
                  bestFitnessInf=fitness
                  bestInf=i
          bestInf.particlePosition=bestFitnessInf
          return bestInf.particlePosition

    def psoOptimisation(self, swarmSize, alpha, beta, gamma, delta, jumpSize, informantCount, vectorSize,
                        dataset, annLayers, activationFunctions, loss_function, max_iterations=100):
        particleArray=[]
        for i in range(swarmSize):
                particleArray.append(Particle(vectorSize))
        self.initInformants(informantCount, particleArray)

        iteration = 0

        while iteration < max_iterations:
            newVelocity=0
            for p in particleArray:
                particleFitness=assessFitness(p, dataset, annLayers, activationFunctions, loss_function)
                if self.bestParticle is None or particleFitness > assessFitness(self.bestParticle, dataset, annLayers, activationFunctions, loss_function):
                    self.bestParticle=p

            for p in particleArray:
                previousBest=p.bestPosition
                informantsBest=self.getBestInformant(p, dataset, annLayers, activationFunctions, loss_function)
                allBest=self.bestParticle.bestPosition

                b = np.random.uniform(0.0, beta)
                c = np.random.uniform(0.0, gamma)
                d = np.random.uniform(0.0, delta)

                newVelocity = alpha*p.particleVelocity+b*(previousBest-p.particlePosition)+c*(informantsBest-p.particlePosition)+d*(allBest - p.particlePosition)

            for p in particleArray:
              p.particleVelocity=newVelocity
              p.particlePosition+=jumpSize * newVelocity
            iteration += 1

        return self.bestParticle.particlePosition



In [106]:
# Code to test out PSO functionality
X_train_scaled = np.random.rand(100, 8)  # Simulated scaled training features (100 samples, 8 features)
y_train = np.random.rand(100, 1)  # Simulated training labels (100 samples, 1 output)
print("y_pred shape:", y_pred.shape)
# define hyperparameters for PSO
swarmSize = 50
alpha = 0.9  # inertia weight
beta = 0.7   # cognitive parameter
gamma = 0.5  # social parameter
delta = 0.5
jumpSize = 0.5
informantCount = 2
annLayers=layer_sizes
vectorSize = sum([(annLayers[i] * annLayers[i + 1]) + annLayers[i + 1] for i in range(len(annLayers) - 1)])  # Total weights + biases
max_iterations = 100

# initialize the PSO optimiser
pso = ParticleSwarmOptimisation(
    swarmSize = swarmSize,
    alpha = alpha,
    beta = beta,
    delta = delta,
    gamma = gamma,
    jumpSize = jumpSize,
    informantCount = informantCount,
    vectorSize = vectorSize
)

# run it into the psoOptimisation
best_parameters=pso.psoOptimisation(
    swarmSize = swarmSize,
    alpha = alpha,
    beta = beta,
    gamma = gamma,
    delta=delta,
    jumpSize = jumpSize,
    informantCount = informantCount,
    vectorSize = vectorSize,
    dataset = (X_train_scaled, y_train),
    annLayers = annLayers,
    activationFunctions = activation_functions,
    loss_function= MeanAbsoluteError(),
    max_iterations = max_iterations
)
print("Optimised ANN Parameters:", best_parameters)
print("y_pred shape:", y_pred.shape)

y_pred shape: (721,)
TYPE: <class 'numpy.ndarray'> (17961,) 0
TYPE: <class 'numpy.ndarray'> (17961,) 1
TYPE: <class 'numpy.ndarray'> (17961,) 2
TYPE: <class 'numpy.ndarray'> (17961,) 0
TYPE: <class 'numpy.ndarray'> (17961,) 1
TYPE: <class 'numpy.ndarray'> (17961,) 2
TYPE: <class 'numpy.ndarray'> (17961,) 0
TYPE: <class 'numpy.ndarray'> (17961,) 1
TYPE: <class 'numpy.ndarray'> (17961,) 2
TYPE: <class 'numpy.ndarray'> (17961,) 0
TYPE: <class 'numpy.ndarray'> (17961,) 1
TYPE: <class 'numpy.ndarray'> (17961,) 2
TYPE: <class 'numpy.ndarray'> (17961,) 0
TYPE: <class 'numpy.ndarray'> (17961,) 1
TYPE: <class 'numpy.ndarray'> (17961,) 2
TYPE: <class 'numpy.ndarray'> (17961,) 0
TYPE: <class 'numpy.ndarray'> (17961,) 1
TYPE: <class 'numpy.ndarray'> (17961,) 2
TYPE: <class 'numpy.ndarray'> (17961,) 0
TYPE: <class 'numpy.ndarray'> (17961,) 1
TYPE: <class 'numpy.ndarray'> (17961,) 2
TYPE: <class 'numpy.ndarray'> (17961,) 0
TYPE: <class 'numpy.ndarray'> (17961,) 1
TYPE: <class 'numpy.ndarray'> (17961

IndexError: invalid index to scalar variable.