# Install Dependencies

In [30]:
# Install dependencies
!pip install requests folium simanneal numpy pandas



# Import Libraries

In [31]:
# Import libraries
import numpy as np
import pandas as pd
import requests
import folium
from IPython.display import HTML, display, IFrame
import zipfile
import io

#  Load the Air Quality Dataset

This block defines a function to load the air quality dataset from the UCI repository.

In [32]:
def load_air_quality_dataset():
    """
    Load the air quality dataset from the UCI repository.
    Returns a DataFrame with PM2.5 data and approximate latitude/longitude.
    """
    # URL of the dataset
    url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00501/PRSA2017_Data_20130301-20170228.zip"

    # Fix random seed for reproducibility
    np.random.seed(42)

    # Download the ZIP file
    response = requests.get(url)
    zip_file = zipfile.ZipFile(io.BytesIO(response.content))

    # List all files in the ZIP archive
    zip_files = zip_file.namelist()
    print("Files in ZIP archive:", zip_files)

    # Select the first CSV file
    csv_file = [f for f in zip_files if f.endswith('.csv')][0]
    print("Selected CSV file:", csv_file)

    # Load the selected CSV file into a DataFrame
    with zip_file.open(csv_file) as f:
        df = pd.read_csv(f)

    # Inspect the columns in the dataset
    print("Columns in the dataset:", df.columns)

    # Extract station names and assign approximate latitude and longitude
    # We will use the station name to infer approximate coordinates
    df['lat'] = df['station'].apply(lambda x: 39.9042 + np.random.uniform(-0.1, 0.1))  # Approximate latitude for Beijing
    df['lon'] = df['station'].apply(lambda x: 116.4074 + np.random.uniform(-0.1, 0.1))  # Approximate longitude for Beijing

    # Filter for PM2.5 data and relevant columns
    df = df[['year', 'month', 'day', 'hour', 'PM2.5', 'lat', 'lon']]
    df = df.dropna(subset=['PM2.5', 'lat', 'lon'])  # Drop rows with missing values

    # Rename columns for consistency
    df = df.rename(columns={
        'PM2.5': 'value',
        'lat': 'lat',
        'lon': 'lon'
    })

    return df

# Load the dataset
df = load_air_quality_dataset()
print("Dataset loaded successfully!")
print(df.head())

Files in ZIP archive: ['PRSA_Data_20130301-20170228/', 'PRSA_Data_20130301-20170228/PRSA_Data_Aotizhongxin_20130301-20170228.csv', 'PRSA_Data_20130301-20170228/PRSA_Data_Changping_20130301-20170228.csv', 'PRSA_Data_20130301-20170228/PRSA_Data_Dingling_20130301-20170228.csv', 'PRSA_Data_20130301-20170228/PRSA_Data_Dongsi_20130301-20170228.csv', 'PRSA_Data_20130301-20170228/PRSA_Data_Guanyuan_20130301-20170228.csv', 'PRSA_Data_20130301-20170228/PRSA_Data_Gucheng_20130301-20170228.csv', 'PRSA_Data_20130301-20170228/PRSA_Data_Huairou_20130301-20170228.csv', 'PRSA_Data_20130301-20170228/PRSA_Data_Nongzhanguan_20130301-20170228.csv', 'PRSA_Data_20130301-20170228/PRSA_Data_Shunyi_20130301-20170228.csv', 'PRSA_Data_20130301-20170228/PRSA_Data_Tiantan_20130301-20170228.csv', 'PRSA_Data_20130301-20170228/PRSA_Data_Wanliu_20130301-20170228.csv', 'PRSA_Data_20130301-20170228/PRSA_Data_Wanshouxigong_20130301-20170228.csv']
Selected CSV file: PRSA_Data_20130301-20170228/PRSA_Data_Aotizhongxin_201303

# Preprocess Data into Grid
This block defines functions to convert latitude/longitude data into a grid.

In [33]:
def latlon_to_grid(lat, lon, grid_size, bbox=(39.8, 40.2, 116.2, 116.6)):
    """
    Convert latitude and longitude to grid coordinates.
    """
    min_lat, max_lat, min_lon, max_lon = bbox
    row = int((lat - min_lat) / (max_lat - min_lat) * grid_size)
    col = int((lon - min_lon) / (max_lon - min_lon) * grid_size)
    return row, col

def create_grid(df, grid_size=30):
    """
    Create a grid from the dataset.
    """
    grid = np.zeros((grid_size, grid_size))
    for _, row in df.iterrows():
        r, c = latlon_to_grid(row['lat'], row['lon'], grid_size)
        if 0 <= r < grid_size and 0 <= c < grid_size:
            grid[r][c] += row['value']
    if grid.max() > 0:
        grid /= grid.max()  # Normalize only if the maximum value is nonzero
    return grid

# Create the grid
grid = create_grid(df, grid_size=30)
print("Grid created successfully!")

Grid created successfully!


# Hybrid SA-GA Implementation
This block defines the Hybrid Simulated Annealing-Genetic Algorithm (SA-GA) class

In [34]:
class HybridSAGA:
    """
    Hybrid Simulated Annealing-Genetic Algorithm for sensor placement optimization.
    """
    def __init__(self, grid, sensor_radius=2, pop_size=50, mutation_rate=0.1, temp=1000, cooling_rate=0.95):
        self.grid = grid
        self.rows, self.cols = grid.shape
        self.sensor_radius = sensor_radius
        self.pop_size = pop_size
        self.mutation_rate = mutation_rate
        self.temp = temp
        self.cooling_rate = cooling_rate

    def initialize_population(self, k_sensors):
        """
        Initialize a population of sensor placements.
        """
        assert k_sensors <= self.rows * self.cols, "Number of sensors exceeds available grid cells."
        return [np.random.choice(self.rows * self.cols, k_sensors, replace=False)
                for _ in range(self.pop_size)]

    def fitness(self, individual):
        """
        Calculate the fitness of an individual (sensor placement).
        """
        covered = set()
        for sensor in individual:
            row = sensor // self.cols
            col = sensor % self.cols
            for dr in range(-self.sensor_radius, self.sensor_radius + 1):
                for dc in range(-self.sensor_radius, self.sensor_radius + 1):
                    if 0 <= row + dr < self.rows and 0 <= col + dc < self.cols:
                        covered.add((row + dr, col + dc))
        grid_sum = self.grid.sum()
        if grid_sum == 0:
            return 0  # No coverage possible
        return sum(self.grid[r][c] for (r, c) in covered) / grid_sum

    def mutate(self, individual):
        """
        Mutate an individual (sensor placement).
        """
        if np.random.rand() < self.mutation_rate:
            idx = np.random.randint(0, len(individual))
            new_pos = np.random.randint(0, self.rows * self.cols)
            individual[idx] = new_pos
            self.temp *= self.cooling_rate
        return individual

    def run(self, k_sensors, generations=50):
        """
        Run the Hybrid SA-GA algorithm.
        """
        pop = self.initialize_population(k_sensors)
        best_fitness = -np.inf
        best_solution = None
        for gen in range(generations):
            pop = sorted(pop, key=lambda x: self.fitness(x), reverse=True)
            elites = pop[:2]
            new_pop = elites.copy()
            while len(new_pop) < self.pop_size:
                fitness_values = [self.fitness(ind) for ind in pop]
                total_fitness = sum(fitness_values)

                # Handle cases where all fitness values are zero
                if total_fitness == 0:
                    probabilities = np.ones(len(pop)) / len(pop)  # Equal probabilities
                else:
                    probabilities = np.array(fitness_values) / total_fitness

                parents_indices = np.random.choice(len(pop), 2, replace=False, p=probabilities)
                parents = [pop[i] for i in parents_indices]
                split = np.random.randint(1, len(parents[0]))
                child = np.concatenate((parents[0][:split], parents[1][split:]))
                child = self.mutate(child)
                new_pop.append(child)
            pop = new_pop
            current_best = self.fitness(pop[0])
            if current_best > best_fitness:
                best_fitness = current_best
                best_solution = pop[0]
        return best_solution, best_fitness

# Run optimization
optimizer = HybridSAGA(grid, sensor_radius=2)
best_solution, best_fitness = optimizer.run(k_sensors=20, generations=50)
print(f"Best Coverage: {best_fitness:.2%}")

Best Coverage: 90.59%


# Visualization
This block defines a function to visualize the sensor placements on a map.

In [35]:
def plot_sensors_colab(grid, sensors, bbox=(39.8, 40.2, 116.2, 116.6)):
    """
    Visualize the sensor placements on a map.
    """
    grid_size = grid.shape[0]
    m = folium.Map(location=[39.9042, 116.4074], zoom_start=11)  # Center map on Beijing

    # Add grid cells with gradient colors
    for r in range(grid_size):
        for c in range(grid_size):
            lat = bbox[0] + (r / grid_size) * (bbox[1] - bbox[0])
            lon = bbox[2] + (c / grid_size) * (bbox[3] - bbox[2])
            intensity = grid[r][c]
            color = f'rgb({int(255 * (1 - intensity))}, {int(255 * intensity)}, 0)'
            folium.Circle(
                [lat, lon],
                radius=200,
                color=color,
                fill=True,
                fill_opacity=0.6,
                tooltip=f"PM2.5: {intensity:.2f}"
            ).add_to(m)

    # Add sensors and their coverage areas
    for i, sensor in enumerate(sensors):
        r = sensor // grid_size
        c = sensor % grid_size
        lat = bbox[0] + (r / grid_size) * (bbox[1] - bbox[0])
        lon = bbox[2] + (c / grid_size) * (bbox[3] - bbox[2])
        folium.Marker(
            [lat, lon],
            icon=folium.Icon(color='green'),
            tooltip=f"Sensor {i+1}"
        ).add_to(m)
        folium.Circle(
            [lat, lon],
            radius=500,  # Adjust based on sensor radius
            color='blue',
            fill=True,
            fill_opacity=0.2,
            tooltip=f"Sensor {i+1} Coverage"
        ).add_to(m)

    return m

# Plot the sensors
map = plot_sensors_colab(grid, best_solution)
display(map)

# Save the map as an HTML file
map.save('air_quality_sensors.html')

# Display the HTML file using an IFrame
display(IFrame('air_quality_sensors.html', width=800, height=600))