<a href="https://colab.research.google.com/github/Yashaswini-Sridhar/Physic-Induced-NN/blob/main/PINNEquation_AQI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras

# Pollutant importance percentages for Bengaluru
pollutant_importance = {
    'PM2.5': 0.65,  # 65% (midpoint of 60-70%)
    'PM10': 0.20,   # 20% (midpoint of 15-25%)
    'NO2': 0.075,   # 7.5% (midpoint of 5-10%)
    'O3': 0.075,    # 7.5% (midpoint of 5-10%)
    'CO': 0.015,    # 1.5% (<2%)
    'SO2': 0.005,   # 0.5% (<1%)
    'NH3': 0.005    # 0.5% (<1%)
}

class PINN_AQI_Model(keras.Model):
    def __init__(self, hidden_layers=[64, 32, 16], **kwargs):
        super().__init__(**kwargs)

        # Define the neural network architecture
        self.input_layer = keras.layers.Dense(hidden_layers[0], activation='tanh')
        self.hidden_layers = []
        for units in hidden_layers[1:]:
            self.hidden_layers.append(keras.layers.Dense(units, activation='tanh'))
        self.output_layer = keras.layers.Dense(1)

        # Physics parameters
        self.diffusion_coef = tf.Variable(0.1, dtype=tf.float32, name='diffusion_coef')
        self.reaction_rates = {
            'k1': tf.Variable(0.01, dtype=tf.float32, name='k1'),
            'k2': tf.Variable(0.005, dtype=tf.float32, name='k2'),
            'k3': tf.Variable(0.02, dtype=tf.float32, name='k3')
        }

    def call(self, inputs):
        # Unpack inputs
        T, P, H, SO2, NO2, CO, CO2, PM25, PM10, H2O, VOC, VOS = [
            inputs[:, i:i+1] for i in range(12)
        ]

        # Neural network part
        x = self.input_layer(inputs)
        for layer in self.hidden_layers:
            x = layer(x)
        nn_output = self.output_layer(x)

        # Physical model part
        # Temperature influence function (simplified Arrhenius equation)
        f_T = tf.exp(-0.1 / (0.008314 * (T + 273.15)))

        # Pressure influence function
        f_P = tf.pow(P / 1013.25, 0.2)

        # Humidity influence function
        f_H = 1.0 + 0.01 * H

        # Reaction terms
        R_term = (
            self.reaction_rates['k1'] * NO2 * VOC * f_T +
            self.reaction_rates['k2'] * SO2 * H2O * f_P +
            self.reaction_rates['k3'] * PM25 * f_H
        )

        # Climate correction factor
        kappa = 0.8 * tf.pow(T / 25.0, 0.1) * tf.pow(P / 1013.25, 0.05) * tf.pow(H / 60.0, 0.2)

        # Combine NN output with physical model
        # Scale each pollutant by importance percentage
        weighted_pollutants = (
            pollutant_importance['PM2.5'] * PM25 +
            pollutant_importance['PM10'] * PM10 +
            pollutant_importance['NO2'] * NO2 +
            pollutant_importance['O3'] * VOC + # Using VOC as proxy for O3
            pollutant_importance['CO'] * CO +
            pollutant_importance['SO2'] * SO2 +
            pollutant_importance['NH3'] * VOS  # Using VOS as proxy for NH3
        )

        # Final AQI calculation combining NN and physics
        aqi = nn_output + weighted_pollutants + R_term - kappa

        return aqi

    def physics_loss(self, inputs, time_delta=0.1, space_delta=0.1):
        """
        Compute the physics-informed loss component
        """
        with tf.GradientTape() as tape:
            tape.watch(inputs)
            aqi = self(inputs)

        # First derivatives
        grad_aqi = tape.gradient(aqi, inputs)

        # Extract time derivative (assuming first input is time)
        daqi_dt = grad_aqi[:, 0:1]

        # Compute diffusion term (Laplacian approximation)
        # This is simplified; a real implementation would compute proper spatial derivatives
        diffusion_term = self.diffusion_coef * tf.reduce_mean(grad_aqi[:, 1:4], axis=1, keepdims=True)

        # Unpack inputs for reaction terms
        T, P, H, SO2, NO2, CO, CO2, PM25, PM10, H2O, VOC, VOS = [
            inputs[:, i:i+1] for i in range(12)
        ]

        # Temperature influence function
        f_T = tf.exp(-0.1 / (0.008314 * (T + 273.15)))

        # Pressure influence function
        f_P = tf.pow(P / 1013.25, 0.2)

        # Humidity influence function
        f_H = 1.0 + 0.01 * H

        # Reaction terms
        R_term = (
            self.reaction_rates['k1'] * NO2 * VOC * f_T +
            self.reaction_rates['k2'] * SO2 * H2O * f_P +
            self.reaction_rates['k3'] * PM25 * f_H
        )

        # Climate correction factor
        kappa = 0.8 * tf.pow(T / 25.0, 0.1) * tf.pow(P / 1013.25, 0.05) * tf.pow(H / 60.0, 0.2)

        # PDE residual
        pde_residual = daqi_dt - diffusion_term + R_term - kappa

        return tf.reduce_mean(tf.square(pde_residual))

    def train_step(self, data):
        # Unpack data
        inputs, targets = data

        with tf.GradientTape() as tape:
            # Forward pass
            predictions = self(inputs)

            # Compute data loss
            data_loss = tf.reduce_mean(tf.square(predictions - targets))

            # Compute physics loss
            phys_loss = self.physics_loss(inputs)

            # Total loss (lambda = 0.1 balances data and physics losses)
            total_loss = data_loss + 0.1 * phys_loss

        # Compute gradients
        gradients = tape.gradient(total_loss, self.trainable_variables)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))

        # Return metrics
        return {
            "loss": total_loss,
            "data_loss": data_loss,
            "physics_loss": phys_loss
        }

# Example usage
def create_and_train_model(train_data, train_targets, epochs=100):
    model = PINN_AQI_Model()
    model.compile(optimizer='adam')

    model.fit(train_data, train_targets, epochs=epochs)
    return model

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras

# ... (Your PINN_AQI_Model class definition here) ...

# 1. Create an instance of the model:
model = PINN_AQI_Model()

# 2. Compile the model:
model.compile(optimizer='adam')  # You can choose a different optimizer if needed

# 3. Generate or load your input data:
# Replace this with your actual data loading/generation logic
num_samples = 1000
input_data = np.random.rand(num_samples, 12)  # 12 input features
target_data = np.random.rand(num_samples, 1)  # 1 output (AQI)


# 4. Train the model:
model.fit(input_data, target_data, epochs=100)  # Adjust epochs as needed

# 5. Make predictions:
# Replace this with your actual prediction data
prediction_data = np.random.rand(10, 12)  # 10 samples for prediction
predictions = model.predict(prediction_data)

# To see the output, run the code.


Epoch 1/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 3ms/step - data_loss: 0.1107 - loss: 0.2900 - physics_loss: 1.7933
Epoch 2/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - data_loss: 0.0877 - loss: 0.2636 - physics_loss: 1.7585
Epoch 3/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - data_loss: 0.0915 - loss: 0.2676 - physics_loss: 1.7601
Epoch 4/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - data_loss: 0.0913 - loss: 0.2668 - physics_loss: 1.7549
Epoch 5/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - data_loss: 0.0873 - loss: 0.2633 - physics_loss: 1.7602
Epoch 6/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - data_loss: 0.0868 - loss: 0.2615 - physics_loss: 1.7474
Epoch 7/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - data_loss: 0.0854 - loss: 0.2605 - physics_loss: 1.7511

In [None]:
import numpy as np

# ... (Your PINN_AQI_Model class definition and other code) ...

# Assume you have the following pollutant values:
pollutant_values = {
    'T': 25.0,  # Temperature in Celsius
    'P': 1013.25,  # Pressure in hPa
    'H': 60.0,  # Humidity in %
    'SO2': 10.0,  # Sulfur dioxide in ppb
    'NO2': 20.0,  # Nitrogen dioxide in ppb
    'CO': 5.0,  # Carbon monoxide in ppm
    'CO2': 400.0,  # Carbon dioxide in ppm
    'PM2.5': 50.0,  # PM2.5 in µg/m³
    'PM10': 70.0,  # PM10 in µg/m³
    'H2O': 10.0,  # Water vapor in g/m³
    'VOC': 30.0,  # Volatile organic compounds in ppb
    'VOS': 15.0  # Volatile organic sulfur in ppb
}

# Create an input array for the model:
input_data = np.array([[
    pollutant_values['T'],
    pollutant_values['P'],
    pollutant_values['H'],
    pollutant_values['SO2'],
    pollutant_values['NO2'],
    pollutant_values['CO'],
    pollutant_values['CO2'],
    pollutant_values['PM2.5'],
    pollutant_values['PM10'],
    pollutant_values['H2O'],
    pollutant_values['VOC'],
    pollutant_values['VOS']
]])

# Make a prediction using the model:
aqi_prediction = model.predict(input_data)

# To see the output, run the code.

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 536ms/step


In [None]:
# Print the predicted AQI value:
print("Predicted AQI:", aqi_prediction[0][0])

Predicted AQI: 57.58462


In [None]:
import numpy as np
# Categorize the AQI prediction
def categorize_aqi(aqi_value):
    if 0 <= aqi_value <= 50:
        return "Good"
    elif 51 <= aqi_value <= 100:
        return "Satisfactory"
    elif 101 <= aqi_value <= 200:
        return "Moderate"
    elif 201 <= aqi_value <= 300:
        return "Poor"
    elif 301 <= aqi_value <= 400:
        return "Very Poor"
    elif 401 <= aqi_value <= 500:
        return "Severe"
    else:
        return "Invalid AQI value"

# Get the AQI category
aqi_category = categorize_aqi(aqi_prediction[0][0])

# Print the predicted AQI value and category
print("Predicted AQI:", aqi_prediction[0][0])
print("AQI Category:", aqi_category)

Predicted AQI: 57.58462
AQI Category: Satisfactory
