Installing `scikit-fuzzy`

In [None]:
!pip install scikit-fuzzy

Import Libraries and Load Dataset

In [None]:
import pandas as pd
import numpy as np
import skfuzzy as fuzz
from skfuzzy import control as ctrl
import matplotlib.pyplot as plt

# Load the Air Quality dataset (replace with your local path or Kaggle link)
data = pd.read_csv("AirQualityUCI.csv", sep=';')

# Display the first few rows
print(data.head())

Preprocess Data and Compute a “Noise Index”

In [None]:
# Select key columns
columns = ['CO(GT)', 'NO2(GT)', 'T', 'RH']
data = data[columns]

# Replace commas with dots and convert to numeric
for col in columns:
    data[col] = data[col].astype(str).str.replace(',', '.')
    data[col] = pd.to_numeric(data[col], errors='coerce')

# Replace invalid values (-200) with NaN and drop missing rows
data.replace(-200, np.nan, inplace=True)
data.dropna(inplace=True)

# Compute rolling variance (Noise Index)
window = 5
data['NoiseIndex'] = data['CO(GT)'].rolling(window).std() / data['CO(GT)'].mean()
data['NoiseIndex'].fillna(0, inplace=True)

# Normalize Noise Index between 0 and 1
data['NoiseIndex'] = (data['NoiseIndex'] - data['NoiseIndex'].min()) / (data['NoiseIndex'].max() - data['NoiseIndex'].min())

print(data.head())
print(data.dtypes)

Define Input Fuzzy Variables

In [None]:
# Define fuzzy variables (inputs)
noise = ctrl.Antecedent(np.arange(0, 1.01, 0.01), 'NoiseIndex')
co = ctrl.Antecedent(np.arange(data['CO(GT)'].min(), data['CO(GT)'].max(), 0.1), 'CO(GT)')
no2 = ctrl.Antecedent(np.arange(data['NO2(GT)'].min(), data['NO2(GT)'].max(), 0.1), 'NO2(GT)')
temp = ctrl.Antecedent(np.arange(data['T'].min(), data['T'].max(), 0.1), 'T')
rh = ctrl.Antecedent(np.arange(data['RH'].min(), data['RH'].max(), 0.1), 'RH')

# Define fuzzy variables (outputs)
clean_strength = ctrl.Consequent(np.arange(0, 1.01, 0.01), 'CleaningStrength')
trust_level = ctrl.Consequent(np.arange(0, 1.01, 0.01), 'TrustLevel')

Define Membership Functions

In [None]:
# Noise Index membership
noise['low'] = fuzz.trimf(noise.universe, [0, 0, 0.4])
noise['medium'] = fuzz.trimf(noise.universe, [0.2, 0.5, 0.8])
noise['high'] = fuzz.trimf(noise.universe, [0.6, 1, 1])

# CO levels (ppm)
co['low'] = fuzz.trimf(co.universe, [data['CO(GT)'].min(), data['CO(GT)'].min(), 2])
co['medium'] = fuzz.trimf(co.universe, [1.5, 3, 5])
co['high'] = fuzz.trimf(co.universe, [4, data['CO(GT)'].max(), data['CO(GT)'].max()])

# NO2 levels
no2['low'] = fuzz.trimf(no2.universe, [data['NO2(GT)'].min(), data['NO2(GT)'].min(), 80])
no2['medium'] = fuzz.trimf(no2.universe, [70, 100, 130])
no2['high'] = fuzz.trimf(no2.universe, [120, data['NO2(GT)'].max(), data['NO2(GT)'].max()])

# Temperature (°C)
temp['low'] = fuzz.trimf(temp.universe, [data['T'].min(), data['T'].min(), 10])
temp['normal'] = fuzz.trimf(temp.universe, [5, 15, 25])
temp['high'] = fuzz.trimf(temp.universe, [20, data['T'].max(), data['T'].max()])

# Relative Humidity (%)
rh['low'] = fuzz.trimf(rh.universe, [data['RH'].min(), data['RH'].min(), 40])
rh['medium'] = fuzz.trimf(rh.universe, [30, 50, 70])
rh['high'] = fuzz.trimf(rh.universe, [60, data['RH'].max(), data['RH'].max()])

# Outputs
clean_strength['weak'] = fuzz.trimf(clean_strength.universe, [0, 0, 0.4])
clean_strength['moderate'] = fuzz.trimf(clean_strength.universe, [0.3, 0.5, 0.7])
clean_strength['strong'] = fuzz.trimf(clean_strength.universe, [0.6, 1, 1])

trust_level['low'] = fuzz.trimf(trust_level.universe, [0, 0, 0.4])
trust_level['medium'] = fuzz.trimf(trust_level.universe, [0.3, 0.5, 0.7])
trust_level['high'] = fuzz.trimf(trust_level.universe, [0.6, 1, 1])

Visualize Membership Functions

In [None]:
noise.view()
co.view()
no2.view()
temp.view()
rh.view()
clean_strength.view()
trust_level.view()

Initializing Fuzzy Variables

In [None]:
CO = ctrl.Antecedent(np.arange(0, 11, 0.1), 'CO')
NO2 = ctrl.Antecedent(np.arange(0, 251, 1), 'NO2')
T = ctrl.Antecedent(np.arange(-5, 41, 1), 'T')
RH = ctrl.Antecedent(np.arange(10, 101, 1), 'RH')
DQI = ctrl.Consequent(np.arange(0, 1.1, 0.1), 'DQI')

Define MFs for Inputs

In [None]:
import skfuzzy as fuzz

# CO (ppm)
CO['low'] = fuzz.trapmf(CO.universe, [0, 0, 2, 4])
CO['medium'] = fuzz.trapmf(CO.universe, [2, 4, 6, 8])
CO['high'] = fuzz.trapmf(CO.universe, [6, 8, 10, 10])

# NO2 (μg/m³)
NO2['low'] = fuzz.trapmf(NO2.universe, [0, 0, 50, 100])
NO2['moderate'] = fuzz.trapmf(NO2.universe, [80, 120, 150, 200])
NO2['high'] = fuzz.trapmf(NO2.universe, [180, 220, 250, 250])

# Temperature (°C)
T['cold'] = fuzz.trapmf(T.universe, [-5, -5, 5, 10])
T['normal'] = fuzz.trapmf(T.universe, [8, 15, 22, 27])
T['hot'] = fuzz.trapmf(T.universe, [25, 30, 40, 40])

# Relative Humidity (%)
RH['dry'] = fuzz.trapmf(RH.universe, [10, 10, 30, 40])
RH['normal'] = fuzz.trapmf(RH.universe, [35, 50, 60, 70])
RH['humid'] = fuzz.trapmf(RH.universe, [65, 80, 100, 100])

Define MFs for Output (Data Quality Index)

In [None]:
# DQI (0–1)
DQI['poor'] = fuzz.trapmf(DQI.universe, [0, 0, 0.2, 0.4])
DQI['fair'] = fuzz.trapmf(DQI.universe, [0.3, 0.5, 0.6, 0.8])
DQI['good'] = fuzz.trapmf(DQI.universe, [0.7, 0.9, 1.0, 1.0])

Visualization

In [None]:
import matplotlib.pyplot as plt

T['cold'].view()
T['normal'].view()
T['hot'].view()
plt.show()

Implementing the Rules in Python

In [None]:
from skfuzzy import control as ctrl

rule1 = ctrl.Rule(CO['low'] & NO2['low'] & T['normal'] & RH['normal'], DQI['good'])
rule2 = ctrl.Rule(CO['medium'] & NO2['moderate'] & T['normal'], DQI['fair'])
rule3 = ctrl.Rule(CO['high'] | NO2['high'], DQI['poor'])
rule4 = ctrl.Rule(RH['humid'] & CO['high'], DQI['poor'])
rule5 = ctrl.Rule(T['cold'] & RH['dry'], DQI['fair'])
rule6 = ctrl.Rule(T['hot'] & CO['medium'], DQI['fair'])
rule7 = ctrl.Rule(RH['normal'] & (CO['low'] | NO2['moderate']), DQI['good'])

# Combine all rules into a list
rules = [rule1, rule2, rule3, rule4, rule5, rule6, rule7]

Building the Control System

In [None]:
DQI_ctrl = ctrl.ControlSystem([rule1, rule2, rule3, rule4, rule5, rule6, rule7])
DQI_sim = ctrl.ControlSystemSimulation(DQI_ctrl)

Build and test the fuzzy control system

In [None]:
# Create the control system and simulation
dqi_ctrl = ctrl.ControlSystem(rules)
dqi_sim = ctrl.ControlSystemSimulation(dqi_ctrl)

# Example test input values
# You can modify these to test other conditions
dqi_sim.input['CO'] = 5.5      # CO concentration (ppm)
dqi_sim.input['NO2'] = 120     # NO2 concentration (µg/m³)
dqi_sim.input['T'] = 25        # Temperature (°C)
dqi_sim.input['RH'] = 60       # Relative Humidity (%)

# Compute the result
dqi_sim.compute()

# Display the resulting DQI
print(f"Predicted DQI value: {dqi_sim.output['DQI']:.3f}")

# Optional: visualize the DQI membership and activation
DQI.view(sim=dqi_sim)

Batch code for CO(GT): compute DQI for all rows and apply fuzzy-driven cleaning

In [None]:
# ---------- Batch DQI computation and fuzzy-driven cleaning ----------
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm  # pip install tqdm (optional, for a progress bar)

# Build control system if not yet built
try:
    dqi_ctrl
except NameError:
    dqi_ctrl = ctrl.ControlSystem(rules)
dqi_sim = ctrl.ControlSystemSimulation(dqi_ctrl)

# Ensure DataFrame index is continuous and has enough rows for rolling means
df = data.copy().reset_index(drop=True)

# Precompute a fallback estimator: rolling mean (window) for CO
window = 5
df['CO_roll_mean'] = df['CO(GT)'].rolling(window=window, min_periods=1, center=True).mean()

# Prepare column to store DQI
df['DQI'] = np.nan

# Compute DQI for each row
# NOTE: doing this row-by-row is simple but can be slow for very large datasets.
for i, row in tqdm(df.iterrows(), total=len(df)):
    # set inputs - ensure correct names used in the FIS
    try:
        dqi_sim.input['CO'] = float(row['CO(GT)'])
        dqi_sim.input['NO2'] = float(row['NO2(GT)'])
        dqi_sim.input['T'] = float(row['T'])
        dqi_sim.input['RH'] = float(row['RH'])
        dqi_sim.compute()
        df.at[i, 'DQI'] = float(dqi_sim.output['DQI'])
    except Exception as e:
        # If a single row fails, set DQI to 0 (very low trust) and continue
        df.at[i, 'DQI'] = 0.0

# --------- Fuzzy-driven cleaning policy for CO(GT) ----------
# Philosophy: cleaned = DQI * original + (1 - DQI) * fallback
# - if DQI ~ 1 => keep original
# - if DQI ~ 0 => use fallback (rolling mean)

df['CO_cleaned'] = df['DQI'] * df['CO(GT)'] + (1.0 - df['DQI']) * df['CO_roll_mean']

# If there are NaNs originally in CO(GT), you might want to impute them using fallback:
df.loc[df['CO(GT)'].isna(), 'CO_cleaned'] = df.loc[df['CO(GT)'].isna(), 'CO_roll_mean']

# ---------- Quick diagnostics ----------
print("DQI stats:\n", df['DQI'].describe())
print("\nCO original stats:\n", df['CO(GT)'].describe())
print("\nCO cleaned stats:\n", df['CO_cleaned'].describe())

# ---------- Plot example: before vs after for a slice ----------
start, end = 0, min(len(df), 300)  # adjust slice size for readability
plt.figure(figsize=(12,4))
plt.plot(df.index[start:end], df['CO(GT)'].iloc[start:end], label='Original CO', alpha=0.6)
plt.plot(df.index[start:end], df['CO_cleaned'].iloc[start:end], label='Cleaned CO', linewidth=1.5)
plt.plot(df.index[start:end], df['CO_roll_mean'].iloc[start:end], '--', label=f'Roll mean (w={window})', alpha=0.5)
plt.legend(); plt.title('CO: Original vs Cleaned (slice)'); plt.xlabel('Index'); plt.ylabel(r'CO (mg/m$^3$)')
plt.show()

# ---------- Save cleaned dataset ----------
out_path = "airquality_cleaned_with_DQI.csv"
df.to_csv(out_path, index=False)
print(f"Saved cleaned dataset to: {out_path}")
# --------------------------------------------------------------------

Batch code for NO2(GT)

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
import skfuzzy.control as ctrl

# Build control system if not yet built
try:
    dqi_ctrl
except NameError:
    # Assuming 'ctrl' and 'rules' are defined elsewhere
    dqi_ctrl = ctrl.ControlSystem(rules)
dqi_sim = ctrl.ControlSystemSimulation(dqi_ctrl)

# Ensure DataFrame index is continuous and has enough rows for rolling means
df = data.copy().reset_index(drop=True)

# Precompute a fallback estimator: rolling mean (window) for NO2
window = 5
df['NO2_roll_mean'] = df['NO2(GT)'].rolling(window=window, min_periods=1, center=True).mean()

# Prepare column to store DQI
df['DQI'] = np.nan

# Compute DQI for each row
for i, row in tqdm(df.iterrows(), total=len(df)):
    # set inputs - ensure correct names used in the FIS
    try:
        dqi_sim.input['CO'] = float(row['CO(GT)'])
        dqi_sim.input['NO2'] = float(row['NO2(GT)'])
        dqi_sim.input['T'] = float(row['T'])
        dqi_sim.input['RH'] = float(row['RH'])
        dqi_sim.compute()
        df.at[i, 'DQI'] = float(dqi_sim.output['DQI'])
    except Exception as e:
        # If a single row fails, set DQI to 0 (very low trust) and continue
        df.at[i, 'DQI'] = 0.0

# --------- Fuzzy-driven cleaning policy for NO2(GT) ----------

df['NO2_cleaned'] = df['DQI'] * df['NO2(GT)'] + (1.0 - df['DQI']) * df['NO2_roll_mean']

# Impute original NaNs using fallback:
df.loc[df['NO2(GT)'].isna(), 'NO2_cleaned'] = df.loc[df['NO2(GT)'].isna(), 'NO2_roll_mean']

# ---------- Quick diagnostics ----------
print("DQI stats:\n", df['DQI'].describe())
print("\nNO2 original stats:\n", df['NO2(GT)'].describe())
print("\nNO2 cleaned stats:\n", df['NO2_cleaned'].describe())

# ---------- Plot example: before vs after for a slice ----------
start, end = 0, min(len(df), 300)
plt.figure(figsize=(12,4))
plt.plot(df.index[start:end], df['NO2(GT)'].iloc[start:end], label='Original NO2', alpha=0.6)
plt.plot(df.index[start:end], df['NO2_cleaned'].iloc[start:end], label='Cleaned NO2', linewidth=1.5)
plt.plot(df.index[start:end], df['NO2_roll_mean'].iloc[start:end], '--', label=f'Roll mean (w={window})', alpha=0.5)
plt.legend(); plt.title('NO2: Original vs Cleaned (slice)'); plt.xlabel('Index'); plt.ylabel(r'NO2 ($\mu$g/m$^3$)')
plt.show()

# ---------- Save cleaned dataset ----------
out_path = "airquality_NO2_cleaned_with_DQI.csv"
df.to_csv(out_path, index=False)
print(f"Saved cleaned dataset to: {out_path}")