<a href="https://colab.research.google.com/github/antoniosanchez-df/PolluTrack-Technologies/blob/main/PolluTrack_Technologies.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pandas numpy geopy countrygroups



In [2]:
# Process and manipulate data
import pandas as pd
import numpy as np
# Recognize European Union countries
from countrygroups import EUROPEAN_UNION
# Geolocate cities to determine the respective country
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderUnavailable
# Handle data input if in JSON string format
from io import StringIO



In [3]:
def get_uk_scale():
    data = {
        "Qualitative name": ["Low", "Low", "Low", "Moderate", "Moderate", "Moderate", "High", "High", "High", "Very High"],
        "Index": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
        "SO2": ["0-88", "89-177", "178-266", "267-354", "355-443", "444-532", "533-710", "711-887", "888-1064", "≥1065"],
        "NO2": ["0-67", "68-134", "135-200", "201-267", "268-334", "335-400", "401-467", "468-534", "535-600", "≥601"],
        "PM2_5": ["0-11", "12-23", "24-35", "36-41", "42-47", "48-53", "54-58", "59-64", "65-70", "≥71"],
        "PM10": ["0-16", "17-33", "34-50", "52-58", "59-66", "67-75", "76-83", "84-91", "92-100", "≥101"],
        "O3": ["0-33", "34-66", "67-100", "101-120", "121-140", "141-160", "161-187", "188-213", "214-240", "≥241"]
    }
    return pd.DataFrame(data)


def get_europe_scale():
    data = {
        "Qualitative name": ["Very Low", "Low", "Medium", "High", "Very high"],
        "Index or sub-index": ["0-25", "25-50", "50-75", "75-100", ">100"],
        "NO2": ["0-50", "50-100", "100-200", "200-400", ">400"],
        "PM10": ["0-25", "25-50", "50-90", "90-180", ">180"],
        "O3": ["0-60", "60-120", "120-180", "180-240", ">240"],
        "PM2_5": ["0-15", "15-30", "30-55", "55-110", ">110"]
    }
    return pd.DataFrame(data)

In [4]:
class BaseAQI:
    """
    Base class to handle Air Quality Index (AQI) calculations for different regions.

    Attributes:
    -----------
    standard_scale : DataFrame
        Data structure to store the standardized scale values for pollutants.
    starndard_columns : list
        List of standard columns that are considered for AQI calculations.
    index_ : str
        The column used as the index for scales.
    scale : DataFrame
        Temporary scale for internal usage.

    Methods:
    --------
    standardize_scale(scale: DataFrame) -> None:
        Convert and standardize the given pollutant scale to internal format.
    compute_AQI(data: DataFrame) -> DataFrame:
        Compute the AQI based on given pollutant concentrations.
    __get_AQI(row: Series) -> str:
        Utility method to determine AQI level for a given data row.
    """
    def __init__(self, ):
        """Initialize the class with default values."""
        self.standard_scale = None
        self.starndard_columns = [
            'Qualitative name', 'CO', 'NO', 'NO2', 'O3', 'SO2', 'PM2_5', 'PM10', 'NH3'
        ]
        self.index_ = 'Qualitative name'

    def standardize_scale(self, scale):
        """
        Convert and standardize the given pollutant scale to internal format.

        Parameters:
        -----------
        scale : DataFrame
            The input scale for pollutants.

        Returns:
        --------
        None
        """
        scale = scale[[x for x in scale.columns if x in self.starndard_columns]]
        scale = scale.set_index(self.index_)
        self.scale = scale.copy()
        for column in scale.columns:
            scale[column] = scale[column].str.split('-').apply(lambda x: x[0])
            scale[column] = scale[column].str.extract('(\d+)', expand=False).astype(float)

        # Assuming the minimum value is always 0
        scale.loc['Out_range'] = -1
        scale = scale.groupby(self.index_).min().sort_values(scale.columns[0])
        scale = scale.shift(-1).fillna(np.inf)
        self.standard_scale = scale.copy()

    def compute_AQI(self, data):
        """
        Compute the AQI based on given pollutant concentrations.

        Parameters:
        -----------
        data : DataFrame
            Data containing pollutant concentrations.

        Returns:
        --------
        DataFrame
            Data with an additional column indicating the AQI.
        """
        data.loc[:, 'qualitative_name'] = data.apply(self.__get_AQI, axis=1)
        return data

    def __get_AQI(self, row):
        """
        Utility method to determine AQI level for a given data row.

        Parameters:
        -----------
        row : Series
            A data row containing pollutant concentrations.

        Returns:
        --------
        str
            The qualitative name for AQI based on the highest pollutant concentration.
        """
        results = []
        for c in self.standard_scale.columns:
            results.append(
                self.standard_scale.query(f"{row[c]} <= {c}").index[0]
            )
        return max(set(results), key = results.count)

In [5]:
class UKAQI(BaseAQI):
    """
    A class to handle AQI calculations specific to UK standards.
    It inherits the core functionalities from the BaseAQI class.

    Methods:
    --------
    reload_scale() -> None:
        Loads and standardizes the AQI scale based on UK standards.
    """
    def __init__(self):
        """
        Initializes the UKAQI class and loads the UK-specific AQI scale.
        """
        super().__init__()
        self.reload_scale()

    def reload_scale(self):
        """
        Loads and standardizes the AQI scale specific to the UK.
        This method assumes the existence of a function named get_uk_scale that fetches the scale.

        Returns:
        --------
        None
        """
        scale = get_uk_scale()
        self.standardize_scale(scale)


class EuropeAQI(BaseAQI):
    """
    A class to handle AQI calculations specific to Europe standards.
    It inherits the core functionalities from the BaseAQI class.

    Methods:
    --------
    reload_scale() -> None:
        Loads and standardizes the AQI scale based on Europe standards.
    """
    def __init__(self):
        """
        Initializes the EuropeAQI class and loads the Europe-specific AQI scale.
        """
        super().__init__()
        self.reload_scale()

    def reload_scale(self):
        """
        Loads and standardizes the AQI scale specific to the Europe.
        This method assumes the existence of a function named get_europe_scale that fetches the scale.

        Returns:
        --------
        None
        """
        scale = get_europe_scale()
        self.standardize_scale(scale)


class USAAQI(BaseAQI):
    """
    A class to handle AQI calculations specific to USA standards.
    It inherits the core functionalities from the BaseAQI class.

    Methods:
    --------
    reload_scale() -> None:
        Loads and standardizes the AQI scale based on USA standards.
    """
    def __init__(self):
        """
        Initializes the USAAQI class and loads the USA-specific AQI scale.
        """
        super().__init__()
        self.reload_scale()

    def reload_scale(self):
        """
        Loads and standardizes the AQI scale specific to the USA.

        Returns:
        --------
        None
        """
        # Getting the scale.
        self.standardize_scale(scale)


class ChinaAQI(BaseAQI):
    """
    A class to handle AQI calculations specific to China standards.
    It inherits the core functionalities from the BaseAQI class.

    Methods:
    --------
    reload_scale() -> None:
        Loads and standardizes the AQI scale based on China standards.
    """
    def __init__(self):
        """
        Initializes the ChinaAQI class and loads the China-specific AQI scale.
        """
        super().__init__()
        self.reload_scale()

    def reload_scale(self):
        """
        Loads and standardizes the AQI scale specific to the China.

        Returns:
        --------
        None
        """
        # Getting the scale.
        self.standardize_scale(scale)

In [6]:
class AQICalculator():
    """
    A class to handle AQI calculations based on various regional rules.

    Attributes:
    -----------
    aqi_rule : NoneType
        Not utilized in the current implementation, but reserved for future use.
    df_data : DataFrame
        Data structure to hold input pollutant data.
    rules : list
        List of dictionaries mapping regions to their corresponding AQI calculation classes.

    Methods:
    --------
    set_data(data: Union[str, DataFrame]) -> None:
        Loads and preprocesses the input pollutant data.
    calculate_aqi() -> DataFrame:
        Calculates AQI for the input data based on region-specific rules.
    __enforce_schema(data: DataFrame) -> DataFrame:
        Ensures the input data adheres to the expected schema.
    __set_rules(city: str) -> str:
        Determines the region-specific rule to apply based on the city's country.
    """
    def __init__(self):
        """Initializes the AQICalculator class."""
        self.aqi_rule = None
        self.df_data = None
        self.rules = [
            {'rule':'Europe', 'class_':EuropeAQI},
            {'rule':'UK', 'class_':UKAQI},
            # {'rule':'USA', 'class_':USAAQI},
            # {'rule':'China', 'class_':ChinaAQI},
        ]

    def set_data(self, data):
        """
        Loads and preprocesses the input pollutant data.

        Parameters:
        -----------
        data : Union[str, DataFrame]
            Input data, either in JSON format or as a DataFrame.

        Returns:
        --------
        None
        """
        if isinstance(data, str):
            try:
                data = pd.read_json(StringIO(data))
                print("Leyendo datos JSON...")
            except ValueError:
                raise TypeError("El tipo de dato proporcionado no es válido. Se espera un JSON o un DataFrame.")

        elif isinstance(data, pd.DataFrame):
            print("Leyendo DataFrame...")

        else:
            raise TypeError("El tipo de dato proporcionado no es válido. Se espera un JSON o un DataFrame.")

        self.df_data = self.__enforce_schema(data.copy())
        self.df_data['rule'] = self.df_data['city'].apply(self.__set_rules)


    def __enforce_schema(self, data):
        """
        Ensures the input data adheres to the expected schema.

        Parameters:
        -----------
        data : DataFrame
            Input data.

        Returns:
        --------
        DataFrame
            Data adhering to the expected schema.
        """
        try:
            data['lon'] = data['lon'].astype(float)
            data['lat'] = data['lat'].astype(float)
            data['CO'] = data['CO'].astype(float)
            data['NO'] = data['NO'].astype(float)
            data['NO2'] = data['NO2'].astype(float)
            data['O3'] = data['O3'].astype(float)
            data['SO2'] = data['SO2'].astype(float)
            data['PM2_5'] = data['PM2_5'].astype(float)
            data['PM10'] = data['PM10'].astype(float)
            data['NH3'] = data['NH3'].astype(float)
            data['data'] = pd.to_datetime(data['date'], format='%Y-%m-%d')
            data['hour'] = data['hour'].astype(int)
            data['year'] = data['year'].astype(int)
            data['city'] = data['city'].astype(str)
            return data
        except Exception as e:
            raise TypeError(f"Error de esquema. {e}")

    def __set_rules(self, city):
        """
        Determines the region-specific rule to apply based on the city's country.

        Parameters:
        -----------
        city : str
            City name.

        Returns:
        --------
        str
            The rule to be applied for the given city.
        """
        geolocator = Nominatim(user_agent="google")
        location = None

        for _ in range(3):
            try:
                location = geolocator.geocode(city, language="en")
                break
            except GeocoderUnavailable:
                raise TimeoutError('Error de conexion')
        if not location:
            raise TimeoutError('Error de conexion')

        country= location.address.split(',')[-1].strip(' ')
        if country in EUROPEAN_UNION.names:
          return 'Europe'
        elif country =='United Kingdom':
          return 'UK'
        elif country =='United States':
          return 'USA'
        elif country =='China':
          return 'China'
        else:
          return None


    def calculate_aqi(self):
        """
        Calculates AQI for the input data based on region-specific rules.

        Returns:
        --------
        DataFrame
            Data with calculated AQI values.
        """
        response = pd.DataFrame()
        for rule in self.rules:
            rule_df = self.df_data.query(f"rule == '{rule['rule']}'")
            if not rule_df.empty:
                rule_aqi = rule['class_']()
                rule_df = rule_aqi.compute_AQI(rule_df.copy())
                response = pd.concat([response, rule_df])
        return response


#### Reading "air_pollution"

In [7]:
!gdown --id 1zEuiOj581tbB5aX1D-TZNtpUQYijEbZ7

Downloading...
From: https://drive.google.com/uc?id=1zEuiOj581tbB5aX1D-TZNtpUQYijEbZ7
To: /content/air_pollution.csv
100% 24.1M/24.1M [00:00<00:00, 96.0MB/s]


In [8]:
df = pd.read_csv("/content/air_pollution.csv", delimiter=';', index_col=0)

In [9]:
df

Unnamed: 0,lon,lat,CO,NO,NO2,O3,SO2,PM2_5,PM10,NH3,aqi,date,hour,year,city
0,40.4165,-3.7026,253.68,0.0,0.01,44.70,0.11,4.79,9.72,0.0,1,2021-01-01,0,2021,Madrid
1,40.4165,-3.7026,253.68,0.0,0.01,44.35,0.11,4.92,9.85,0.0,1,2021-01-01,1,2021,Madrid
2,40.4165,-3.7026,253.68,0.0,0.01,44.35,0.11,5.02,9.61,0.0,1,2021-01-01,2,2021,Madrid
3,40.4165,-3.7026,253.68,0.0,0.01,44.70,0.10,4.87,8.87,0.0,1,2021-01-01,3,2021,Madrid
4,40.4165,-3.7026,253.68,0.0,0.01,44.70,0.09,4.60,8.13,0.0,1,2021-01-01,4,2021,Madrid
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
17324,52.2298,21.0118,273.70,0.0,0.52,124.45,1.25,43.81,201.83,0.0,5,2022-12-31,20,2022,Varsovia
17325,52.2298,21.0118,277.04,0.0,0.51,124.45,1.36,43.86,201.64,0.0,5,2022-12-31,21,2022,Varsovia
17326,52.2298,21.0118,273.70,0.0,0.46,120.16,1.27,42.73,203.80,0.0,5,2022-12-31,22,2022,Varsovia
17327,52.2298,21.0118,270.37,0.0,0.40,114.44,1.09,42.13,208.51,0.0,5,2022-12-31,23,2022,Varsovia


In [10]:
aqi = AQICalculator()
aqi.set_data(df.sample(10).copy())

Leyendo DataFrame...


In [11]:
aqi.calculate_aqi()

Unnamed: 0,lon,lat,CO,NO,NO2,O3,SO2,PM2_5,PM10,NH3,aqi,date,hour,year,city,data,rule,qualitative_name
1569,48.8534,2.3488,240.33,0.01,0.03,55.79,0.28,15.6,24.47,0.0,4,2021-03-08,9,2021,Paris,2021-03-08,Europe,Very Low
2919,50.2584,19.0275,211.95,0.16,2.49,60.08,0.11,27.04,146.41,0.0,5,2021-05-03,16,2021,Katowice,2021-05-03,Europe,Low
6780,51.2199,4.4035,216.96,0.01,0.06,24.32,0.21,7.68,12.3,0.0,3,2021-10-11,13,2021,Amberes,2021-10-11,Europe,Very Low
1614,41.1496,-8.611,220.3,0.0,0.02,37.91,0.1,4.34,11.06,0.0,1,2021-03-10,6,2021,Oporto,2021-03-10,Europe,Very Low
5251,36.7202,-4.4203,195.27,0.0,0.17,49.35,0.05,1.58,2.28,0.3,1,2021-08-08,20,2021,Malaga,2021-08-08,Europe,Very Low
2171,50.8505,4.3488,223.64,0.01,0.06,40.05,0.08,4.85,8.59,0.0,3,2021-04-02,12,2021,Bruselas,2021-04-02,Europe,Very Low
12062,36.7202,-4.4203,181.91,0.01,0.04,33.62,0.01,0.5,0.53,0.04,1,2022-05-21,15,2022,Malaga,2022-05-21,Europe,Very Low
12210,50.2584,19.0275,181.91,0.0,0.13,115.87,1.68,86.42,445.49,0.0,5,2022-05-27,19,2022,Katowice,2022-05-27,Europe,Very high
1654,39.4698,-0.3774,243.66,0.0,0.38,25.03,0.12,7.08,10.59,0.68,1,2021-03-11,22,2021,Valencia,2021-03-11,Europe,Very Low
10754,54.9733,-1.614,220.3,0.0,0.04,36.48,0.08,14.13,19.37,0.0,1,2022-03-28,3,2022,Newcastle,2022-03-28,UK,Low
