# Introduction

Achmad Dhani

Objective: AQI (Air Quality Index) is an indicator for air pollution, as air pollution level rises so does the AQI. Air pollution poses health risks for the residents living in the area. ETL (Extract, Transform, Load) helps by collecting air quality data from various sources, transforming it for analysis, and loading it into databases or dashboards. This process enables environmental agencies and the public to monitor and respond to changing air quality conditions effectively, aiding in decision-making for public health and environmental policies.

# Import Libraries and Set Connection

In [1]:
# sql import
import psycopg2
from sqlalchemy import create_engine
# tools
import pandas as pd
import numpy as np
import requests
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from great_expectations.data_context import FileDataContext


In [3]:
# connecting psql container
engine = create_engine('postgresql+psycopg2://achmaddhani:container@localhost:5434/milestone')

# Functions

In [4]:
def clean_data(df):
    '''
    A function to do a data cleaning of a dataframe

    Args:
        df (DataFrame): input argument is a dataframe that needed to be cleaned

    Returns:
        DataFrame: Returns a dataframe that has been cleaned
    '''
    data_types= {
        'aqi_value':'int64',
        'co_aqi_value':'int64',
        'ozone_aqi_value':'int64',
        'no2_aqi_value':'int64',
        'pm25_aqi_value':'int64',
        'lat':'float64',
        'lng':'float64'
    }
    df.columns = df.columns.map(str.lower)
    trans_table = str.maketrans(' ', '_', '.') 
    df.columns = [col.translate(trans_table) for col in df.columns] # replacing spaces with underscore and removing dot
    df.drop_duplicates(subset='city', keep='first', inplace=True)
    df.drop_duplicates(inplace=True)
    df= df.astype(data_types)

    # # a different kind of function for calling API
    # missing =df[df.isna().any(axis=1)]
    # # mappings
    # city_country_mapping = {}
    # # itterate through the unique cities of the missing values
    # for city in missing['city'].unique():
    #     country = get_country_from_city(city)
    #     if country:
    #         city_country_mapping[city] = country

    # # mapping and imputing the missing values
    # df['country'] = df['city'].map(city_country_mapping).fillna(df['country'])
    # calling function to impute missing values with an api
    
    df['country'] = df.apply(lambda row: get_country_from_city(row['city']) if pd.isnull(row['country']) else row['country'], axis=1)
    df.dropna(inplace=True)
    return df
    
# check for missing values
def check_missing(df):
    '''
    A function to check missing values within a dataframe

    Args:
        df (DataFrame): Input a dataframe that needed to be checked
    '''
    missing = df.isna().sum()
    total_missing_values = missing.sum()
    
    # if function for missing values
    if total_missing_values > 0:
        columns_with_missing_values = missing[missing > 0] # the index represents the names of the columns
        missing_percentage = (columns_with_missing_values / df.shape[0]) * 100
        
        print('Total missing values in the dataset:', total_missing_values) # total
        print('Columns with missing values:', columns_with_missing_values.index.tolist())
        print('')
        print('Number of missing values per column:')
        print(columns_with_missing_values)
        print('')
        print('Missing data percentage (%):')
        print(missing_percentage)
    else:
        print('No missing values found.')

def get_country_from_city(city):
    '''
    A function to call the API and return the name of the country depending on the city.

    Args:
        city (string): Input the name of a city

    Returns:
        string: return the name of the country of the argument city
    '''
    url = f"http://api.geonames.org/searchJSON?q={city}&maxRows=1&username=achmaddhani" # only works using personal username
    response = requests.get(url)
    response.raise_for_status()

    data = response.json()

    if 'geonames' in data and len(data['geonames']) > 0:
        geoname = data['geonames'][0]
        return geoname.get('countryName')
    else:
        return None


# Data Export

In [5]:
# reading csv file
df = pd.read_csv('P2M3_Dhani_data_raw.csv')
df.head(5)

Unnamed: 0,Country,City,AQI Value,AQI Category,CO AQI Value,CO AQI Category,Ozone AQI Value,Ozone AQI Category,NO2 AQI Value,NO2 AQI Category,PM2.5 AQI Value,PM2.5 AQI Category,lat,lng
0,Russian Federation,Praskoveya,51,Moderate,1,Good,36,Good,0,Good,51,Moderate,44.7444,44.2031
1,Brazil,Presidente Dutra,41,Good,1,Good,5,Good,1,Good,41,Good,-5.29,-44.49
2,Brazil,Presidente Dutra,41,Good,1,Good,5,Good,1,Good,41,Good,-11.2958,-41.9869
3,Italy,Priolo Gargallo,66,Moderate,1,Good,39,Good,2,Good,66,Moderate,37.1667,15.1833
4,Poland,Przasnysz,34,Good,1,Good,34,Good,0,Good,20,Good,53.0167,20.8833


In [6]:
# exporting data to sql container
df.to_sql(name='table_m3', con= engine, index=False,)

695

# Data Loading

In [165]:
# importing data from sql container
data = pd.read_sql_table('table_m3', engine)

In [166]:
# make a copy for testing the function
test= data.copy()

In [147]:
data.head(10)

Unnamed: 0,Country,City,AQI Value,AQI Category,CO AQI Value,CO AQI Category,Ozone AQI Value,Ozone AQI Category,NO2 AQI Value,NO2 AQI Category,PM2.5 AQI Value,PM2.5 AQI Category,lat,lng
0,Russian Federation,Praskoveya,51,Moderate,1,Good,36,Good,0,Good,51,Moderate,44.7444,44.2031
1,Brazil,Presidente Dutra,41,Good,1,Good,5,Good,1,Good,41,Good,-5.29,-44.49
2,Brazil,Presidente Dutra,41,Good,1,Good,5,Good,1,Good,41,Good,-11.2958,-41.9869
3,Italy,Priolo Gargallo,66,Moderate,1,Good,39,Good,2,Good,66,Moderate,37.1667,15.1833
4,Poland,Przasnysz,34,Good,1,Good,34,Good,0,Good,20,Good,53.0167,20.8833
5,United States of America,Punta Gorda,54,Moderate,1,Good,14,Good,11,Good,54,Moderate,16.1005,-88.8074
6,United States of America,Punta Gorda,54,Moderate,1,Good,14,Good,11,Good,54,Moderate,26.8941,-82.0513
7,Belgium,Puurs,64,Moderate,1,Good,29,Good,7,Good,64,Moderate,51.0761,4.2803
8,Russian Federation,Pyatigorsk,54,Moderate,1,Good,41,Good,1,Good,54,Moderate,44.05,43.0667
9,China,Qinzhou,68,Moderate,2,Good,68,Moderate,1,Good,58,Moderate,21.95,108.6167


In [148]:
# summary
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 16695 entries, 0 to 16694
Data columns (total 14 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   Country             16393 non-null  object 
 1   City                16695 non-null  object 
 2   AQI Value           16695 non-null  int64  
 3   AQI Category        16695 non-null  object 
 4   CO AQI Value        16695 non-null  int64  
 5   CO AQI Category     16695 non-null  object 
 6   Ozone AQI Value     16695 non-null  int64  
 7   Ozone AQI Category  16695 non-null  object 
 8   NO2 AQI Value       16695 non-null  int64  
 9   NO2 AQI Category    16695 non-null  object 
 10  PM2.5 AQI Value     16695 non-null  int64  
 11  PM2.5 AQI Category  16695 non-null  object 
 12  lat                 16695 non-null  float64
 13  lng                 16695 non-null  float64
dtypes: float64(2), int64(5), object(7)
memory usage: 1.8+ MB


The dataset consist of 16695 entries with 14 columns. There are 7 object columns and 7 numeric columns. The names of the columns are still dirty meaning it's not normalized yet.

In [149]:
# calling function for missing data
check_missing(data)

Total missing values in the dataset: 302
Columns with missing values: ['Country']

Number of missing values per column:
Country    302
dtype: int64

Missing data percentage (%):
Country    1.808925
dtype: float64


There are a small number of missing values in the dataset, consisting only 1.8% of the entire data. The missing values are located in 'Country' column with 302 missing values. For the dataset to be use for analyzation, data cleaning has to be done before hand

In [150]:
# making all columns lowercase
data.columns = data.columns.map(str.lower)

Normalization starts with lowering all the letters to lowercase. This is done so it's efficient and convinient when using the dataset

In [151]:
# removing spaces, turning it into underscore while removing dot
trans_table = str.maketrans(' ', '_', '.')
data.columns = [col.translate(trans_table) for col in data.columns] # replacing spaces with underscore and removing dot

Removing `.` and replacing spaces with underscore `_` to avoid errors when using the dataset

In [152]:
# selecting only the int datatype
data.select_dtypes(include='int64').columns.to_list()

['aqi_value',
 'co_aqi_value',
 'ozone_aqi_value',
 'no2_aqi_value',
 'pm25_aqi_value']

In [153]:
# selecting only the float datatype
data.select_dtypes(include='float64').columns.to_list()

['lat', 'lng']

Making sure that the numeric columns uses the correct datatype

In [154]:
# checking the value count of the city
data['city'].value_counts()

city
Santa Cruz      17
San Fernando    16
Santa Ana       15
San Juan        14
Washington      12
                ..
Hegang           1
Herxheim         1
Onalaska         1
Ostfildern       1
Westerville      1
Name: count, Length: 14229, dtype: int64

It appears there are duplicate cities. The reason for this might come from taking aqi samples from different sources within the city. To reduce redundancy, these duplicates will be remove and only keep one to represents the rest

In [155]:
# counting the duplicated cities
duplicated_city = data[data['city'].duplicated(keep=False)]['city'].unique()
len(duplicated_city)

1343

There are 1343 cities that has duplicates

In [156]:
data = data.drop_duplicates(subset='city', keep='first')

# Handling Missing Values

In [157]:
missing =data[data.isna().any(axis=1)]
missing.head(5)

Unnamed: 0,country,city,aqi_value,aqi_category,co_aqi_value,co_aqi_category,ozone_aqi_value,ozone_aqi_category,no2_aqi_value,no2_aqi_category,pm25_aqi_value,pm25_aqi_category,lat,lng
385,,Granville,30,Good,1,Good,30,Good,3,Good,25,Good,48.8381,-1.5869
560,,Kingstown,163,Unhealthy,0,Good,25,Good,0,Good,163,Unhealthy,13.1578,-61.225
623,,Nanakuli,30,Good,0,Good,27,Good,0,Good,30,Good,21.3892,-158.1445
629,,Lavagna,55,Moderate,1,Good,38,Good,2,Good,55,Moderate,44.3167,9.3333
664,,Ladispoli,48,Good,1,Good,48,Good,2,Good,34,Good,41.9544,12.0742


The missing values seems to have a pattern where it is dependant towards the column `city` therefor deemend MNAR (*Missing Not At Random*)

In [158]:
# checking city unique data
len(data['city'].unique())

14229

In [159]:
# checking the missing values city unique data
len(missing['city'].unique())

273

There are relatively high number of cities missing and imputing them manually will be taxing 

In [86]:
# calling function to impute missing values with an api
data['country'] = data.apply(lambda row: get_country_from_city(row['city']) if pd.isnull(row['country']) else row['country'], axis=1)

```python

# mappings (2nd way)
city_country_mapping = {}
# itterate through the unique cities of the missing values
for city in missing['city'].unique():
    country = get_country_from_city(city)
    if country:
        city_country_mapping[city] = country

# mapping and imputing the missing values
data['country'] = data['city'].map(city_country_mapping).fillna(data['country'])

Using a function that calls an API specifically use to get the country name of a city is more efficient. Thanks to [GeoNames](https://www.geonames.org/) open source APIthis is made possible

In [162]:
check_missing(data)

Total missing values in the dataset: 1
Columns with missing values: ['country']

Number of missing values per column:
country    1
dtype: int64

Missing data percentage (%):
country    0.007028
dtype: float64


The missing values after imputing means the city is not registered in GeoName hence will be dropped

In [88]:
# dropping the cities that is not registered in the geoname API
data.dropna(inplace=True)

In [89]:
# saving cleaned data
data.to_csv('P2M3_dhani_data_clean.csv', index=False)

# Testing the function

In [167]:
# columns before cleaning
test.columns

Index(['Country', 'City', 'AQI Value', 'AQI Category', 'CO AQI Value',
       'CO AQI Category', 'Ozone AQI Value', 'Ozone AQI Category',
       'NO2 AQI Value', 'NO2 AQI Category', 'PM2.5 AQI Value',
       'PM2.5 AQI Category', 'lat', 'lng'],
      dtype='object')

In [168]:
# checking missing function
check_missing(test)

Total missing values in the dataset: 302
Columns with missing values: ['Country']

Number of missing values per column:
Country    302
dtype: int64

Missing data percentage (%):
Country    1.808925
dtype: float64


In [170]:
# cleaning the data and see if it's cleaned
test_clean= clean_data(test)
test_clean.columns

Index(['country', 'city', 'aqi_value', 'aqi_category', 'co_aqi_value',
       'co_aqi_category', 'ozone_aqi_value', 'ozone_aqi_category',
       'no2_aqi_value', 'no2_aqi_category', 'pm25_aqi_value',
       'pm25_aqi_category', 'lat', 'lng'],
      dtype='object')

In [171]:
# check missing data
check_missing(test_clean)

No missing values found.


This proves that the function for cleanining works

# Elastic Search

In [19]:
# connecting to elastic search
es = Elasticsearch('http://localhost:9200')

In [15]:
# testing the connection
print(es.ping())

True


In [20]:
# exporting data to elastic search in bulk
df = pd.read_csv('P2M3_dhani_data_clean.csv')
actions = [
    {
        "_index": "p2m3_dhani_data_clean",
        "_source": row.to_dict()
    }
    for i, row in df.iterrows()
]
bulk(es, actions)

(14228, [])

This is the function to connect and to export data into the elasticsearch nosql database

# Functions for DAG

All these functions and the documentation will be in their own seperate module.py file in ./functions directory.

```python
# loading DAG
def loading_data():
    engine = create_engine('postgresql+psycopg2://achmaddhani:container@postgres:5432/milestone') # using 5432 port since airflow is in container
    df = pd.read_sql_table('table_m3', engine)
    df.to_csv('/opt/airflow/data/P2M3_Dhani_data_raw.csv', index=False) # saving raw data

# required function for cleaning
def get_country_from_city(city):
    url = f"http://api.geonames.org/searchJSON?q={city}&maxRows=1&username=achmaddhani"
    response = requests.get(url)
    data = response.json()
    if 'geonames' in data and len(data['geonames']) > 0:
        geoname = data['geonames'][0]
        return geoname.get('countryName')
    else:
        print(f"No results for {city}: {data}")
        return None

def cleaning_data():
    df= pd.read_csv('/opt/airflow/data/P2M3_Dhani_data_raw.csv') # read the csv
    data_types= {
        'aqi_value':'int64',
        'co_aqi_value':'int64',
        'ozone_aqi_value':'int64',
        'no2_aqi_value':'int64',
        'pm25_aqi_value':'int64',
        'lat':'float64',
        'lng':'float64'
    } # columns of each data types
    df.columns = df.columns.map(str.lower) # turning all columns to lowercase
    trans_table = str.maketrans(' ', '_', '.')
    df.columns = [col.translate(trans_table) for col in df.columns] # replacing spaces with underscore and removing dot
    df.drop_duplicates(subset='city', keep='first', inplace=True) # removing city duplicates and keeping only one 
    df.drop_duplicates(inplace=True) # removing duplicates overall
    df= df.astype(data_types) # fixing the data type
    df['country'] = df.apply(lambda row: get_country_from_city(row['city']) if pd.isnull(row['country']) else row['country'], axis=1) # imputing missing value with api
    df.dropna(inplace=True) # drop missing values that's not registered on geo name
    df.to_csv('/opt/airflow/data/P2M3_dhani_data_clean.csv', index=False) # saving the cleaned data

# exporting to elasticsearch DAG
def export_elastic():
    es = Elasticsearch(hosts=["http://elasticsearch:9200"])

    df = pd.read_csv('/opt/airflow/data/P2M3_dhani_data_clean.csv')
    actions = [
        {
            "_index": "milestone3_dhani_data_clean",
            "_source": row.to_dict()
        }
        for i, row in df.iterrows()
    ]
    bulk(es, actions)

# Validation With Great Expectations

Validation is checked to ensure the cleaned that is proven cleaned

In [15]:
context = FileDataContext.create(project_root_dir='/Users/achmaddhani/projects/air-quality-index-etl-kibana')

In [16]:
# initializing datasource
datasource_name = 'air_quality_source'
datasource = context.sources.add_pandas(datasource_name)

# 
asset_name = 'cleaned_data'
path_to_data = '/Users/achmaddhani/projects/air-quality-index-etl-kibana/P2M3_dhani_data_clean.csv'
asset = datasource.add_csv_asset(asset_name, filepath_or_buffer=path_to_data)

# Build batch request
batch_request = asset.build_batch_request()

In [17]:
# Creat an expectation suite
expectation_suite_name = 'expectation-air_quality-dataset'
context.add_or_update_expectation_suite(expectation_suite_name)

# Create a validator using above expectation suite
validator = context.get_validator(
    batch_request = batch_request,
    expectation_suite_name = expectation_suite_name
)

# Check the validator
validator.head()

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Unnamed: 0,country,city,aqi_value,aqi_category,co_aqi_value,co_aqi_category,ozone_aqi_value,ozone_aqi_category,no2_aqi_value,no2_aqi_category,pm25_aqi_value,pm25_aqi_category,lat,lng
0,Russian Federation,Praskoveya,51,Moderate,1,Good,36,Good,0,Good,51,Moderate,44.7444,44.2031
1,Brazil,Presidente Dutra,41,Good,1,Good,5,Good,1,Good,41,Good,-5.29,-44.49
2,Italy,Priolo Gargallo,66,Moderate,1,Good,39,Good,2,Good,66,Moderate,37.1667,15.1833
3,Poland,Przasnysz,34,Good,1,Good,34,Good,0,Good,20,Good,53.0167,20.8833
4,United States of America,Punta Gorda,54,Moderate,1,Good,14,Good,11,Good,54,Moderate,16.1005,-88.8074


In [28]:
# expectation values within the city column are all unique
print('city unique values validation')
unique_city=validator.expect_column_values_to_be_unique('city')
print(f"Expectation is {unique_city['success']}", '\n')

city unique values validation


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Expectation is True 



In [29]:
# expectations values min and max within the aqi columns
for col in ['aqi_value', 'co_aqi_value', 'ozone_aqi_value', 'no2_aqi_value', 'pm25_aqi_value']:
    print(col, 'expectation')
    result=validator.expect_column_values_to_be_between(
        column=col, min_value=0, max_value=500
        )
    print(f"Expectation is {result['success']}", '\n')

aqi_value expectation


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Expectation is True 

co_aqi_value expectation


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Expectation is True 

ozone_aqi_value expectation


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Expectation is True 

no2_aqi_value expectation


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Expectation is True 

pm25_aqi_value expectation


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Expectation is True 



In [30]:
# expectation values are in the set or contain one of the following category
for cat in ['aqi_category', 'co_aqi_category', 'ozone_aqi_category', 'no2_aqi_category', 'pm25_aqi_category']:
    print(cat, 'in set validation')
    result=validator.expect_column_values_to_be_in_set(cat,
                                                ['Moderate', 'Good', 'Unhealthy', 'Very Unhealthy',
                                                'Unhealthy for Sensitive Groups', 'Hazardous']
    )
    print(f"Expectation is {result['success']}", '\n')

aqi_category in set validation


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Expectation is True 

co_aqi_category in set validation


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Expectation is True 

ozone_aqi_category in set validation


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Expectation is True 

no2_aqi_category in set validation


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Expectation is True 

pm25_aqi_category in set validation


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Expectation is True 



In [31]:
# expectation values of numeric columns has to be either interger or float
numeric_list=['aqi_value', 'co_aqi_value', 'ozone_aqi_value', 'no2_aqi_value','pm25_aqi_value', 'lat', 'lng']
for num in numeric_list:
    print(num, 'numeric type validation')
    result=validator.expect_column_values_to_be_in_type_list(num, ['int64', 'float'])
    print(f"Expectation is {result['success']}", '\n')

aqi_value numeric type validation


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Expectation is True 

co_aqi_value numeric type validation


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Expectation is True 

ozone_aqi_value numeric type validation


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Expectation is True 

no2_aqi_value numeric type validation


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Expectation is True 

pm25_aqi_value numeric type validation


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Expectation is True 

lat numeric type validation


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Expectation is True 

lng numeric type validation


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Expectation is True 



In [32]:
# expectation values that was null not to be null
print('country not null validation')
not_null= validator.expect_column_values_to_not_be_null('country')
print(f"Expectation is {not_null['success']}")

country not null validation


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Expectation is True


In [33]:
# expectation combination values between city and country are unique
print('country and city unique combination validation')
unique_comb= validator.expect_compound_columns_to_be_unique(['country', 'city'])
print(f"Expectation country is {unique_comb['success']}")

country and city unique combination validation


Calculating Metrics:   0%|          | 0/7 [00:00<?, ?it/s]

Expectation country is True


In [34]:
# expectation that the country and city names are only letters, '.', ',' and spaces
for val in ['country', 'city']:
    print(val, 'not match regex list validation')
    pattern_val=validator.expect_column_values_to_not_match_regex_list(
    column=val,
    regex_list=[r'.*[@\\"0-9].*']
)
    print(f"Expectation is {pattern_val['success']}", '\n')

country not match regex list validation


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Expectation is True 

city not match regex list validation


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Expectation is True 



In [35]:
validator.save_expectation_suite(discard_failed_expectations=False)

In [36]:
# Create a checkpoint

checkpoint_1 = context.add_or_update_checkpoint(
    name = 'checkpoint_1',
    validator = validator,
)
# Run a checkpoint

checkpoint_result = checkpoint_1.run()

Calculating Metrics:   0%|          | 0/101 [00:00<?, ?it/s]

In [37]:
context.build_data_docs()

{'local_site': 'file:///Users/achmaddhani/projects/air-quality-index-etl-kibana/gx/uncommitted/data_docs/local_site/index.html'}

# Conclusion

The dataset cleaning consist of normalizing the column names, handling the missing values and duplicates, and making sure the columns have the right data type. The missing values located in country column are imputed with Geoname API since the missing values are missing not at random. There are duplicates of cities with different latitude and longitude and to reduce redundancy, duplicates will be dropped, keeping only one. A function is created to make loading data, cleaning data and exporting data autonomously using DAG Airflow. Docker is used to facilitate autonomous ETL scheduled for 6:30AM WIB. The function is proven to be working since the cleaned data is validated with great expectations. A suggestion for the future would be to reduce missing values as much as possible especially related to location since imputing it takes a while and dependent on having a good connection for the API.