# Data Preprocessing:
# Objectives:
1. Explore data
   - Connect to the server
   - Connect to the database(airquality)
   - Connect to nairobi collection
   - Count, find, distinct, aggregate documents
2. Export data

In [1]:
# libraries
import sys
import pymongo
import pprint
import pandas as pd
import pytz # working with timezone
import pathlib

from pymongo import MongoClient
from pprint import PrettyPrinter
from pathlib import Path # working with paths

In [2]:
# library verstions
print("Platfrom: ", sys.platform)
print("Python Version: ", sys.version)
print()
print("Pymongo Version: ", pymongo.__version__)
print("Pandas Version: ", pd.__version__)

Platfrom:  win32
Python Version:  3.13.2 (tags/v3.13.2:4f8bb39, Feb  4 2025, 15:23:48) [MSC v.1942 64 bit (AMD64)]

Pymongo Version:  4.12.0
Pandas Version:  2.2.3


Instantiating `PrettyPrinter`

In [3]:
pp = PrettyPrinter(indent=2)

The first step is to connect with `MongoDB` server

In [4]:
client = MongoClient(host="localhost", port=27017, tz_aware=True)
print(type(client))
print(client)

<class 'pymongo.synchronous.mongo_client.MongoClient'>
MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=True, connect=True)


Let us see how many databases are available.

In [5]:
for db in client.list_databases():
    print(db["name"])

AirQuality
admin
config
local


In [6]:
# Also
client.list_database_names()

['AirQuality', 'admin', 'config', 'local']

We will use `AirQuality` database. Let us connect to that database

In [7]:
AirQuality = client["AirQuality"]
print(type(AirQuality))
print(AirQuality)

<class 'pymongo.synchronous.database.Database'>
Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=True, connect=True), 'AirQuality')


Let us now see different collections available in `AirQuality` database.

In [8]:
for collection in AirQuality.list_collections():
    print(collection["name"])

Juja
Nairobi
Kisumu
Nakuru
Mombasa


In [9]:
AirQuality.list_collection_names()

['Juja', 'Nairobi', 'Kisumu', 'Nakuru', 'Mombasa']

In our case we are interested in working with `Nairobi` AirQuality data.So therefore, we will connect to `Nairobi` collection in *AirQuality* database

In [10]:
Nairobi = AirQuality["Nairobi"]
print(type(Nairobi))
print(Nairobi)

<class 'pymongo.synchronous.collection.Collection'>
Collection(Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=True, connect=True), 'AirQuality'), 'Nairobi')


Now let us see how many `documents` are available in Nairobi collection

In [11]:
documents_count = Nairobi.count_documents({})
print(f"Total documents: {documents_count}.")

Total documents: 446968.


Let us `find` the first document in Nairobi collection

In [12]:
Nairobi.find_one({})

{'_id': ObjectId('6806979f55fcce47f5780944'),
 'sensor_id': 4899,
 'sensor_type': 'DHT22',
 'location': 3966,
 'lat': -1.311,
 'lon': 36.817,
 'timestamp': datetime.datetime(2025, 1, 1, 0, 2, 30, 504000, tzinfo=FixedOffset(datetime.timedelta(0), 'UTC')),
 'value_type': 'humidity',
 'value': 74.5}

Distinct `value_type`

In [13]:
distinct_measure = Nairobi.distinct("value_type")
distinct_location = Nairobi.distinct("location")
print(type(distinct_measure))
print(type(distinct_location))
print(f"Distinct value type: {distinct_measure}")
print(f"Distinct location: {distinct_location}")

<class 'list'>
<class 'list'>
Distinct value type: ['P0', 'P1', 'P2', 'humidity', 'temperature']
Distinct location: [3573, 3966, 3967]


We are insterested in `P2` which is p2.5 for particles

In [14]:
# aggregation
result = Nairobi.aggregate(
    [
        {
            "$group":{
                "_id": "$location",
                "Counts": {"$sum": 1}
            }
        }
    ]
)
pp.pprint(list(result))

[ {'Counts': 40030, '_id': 3966},
  {'Counts': 399589, '_id': 3573},
  {'Counts': 7349, '_id': 3967}]


We will choose site(location) `3573` since it has more documents compared to the rest

In [15]:
# match for location 3573 
result = Nairobi.aggregate(
    [
        {"$match": {"location": 3573}},
        {"$group": {
            "_id": "$value_type",
            "counts": {
                "$sum":1
            }
        }}
    ]
)
pp.pprint(list(result))

[ {'_id': 'temperature', 'counts': 79925},
  {'_id': 'P1', 'counts': 79913},
  {'_id': 'P0', 'counts': 79913},
  {'_id': 'humidity', 'counts': 79925},
  {'_id': 'P2', 'counts': 79913}]


Let us find `P2` and location `3573`

In [16]:
result = Nairobi.find(
    {"location": 3573, "value_type": "P2"},
    projection = {"timestamp":1, "value":1, "_id":0}
)
pp.pprint(next(iter(result)))

{ 'timestamp': datetime.datetime(2025, 1, 1, 10, 56, 30, 946000, tzinfo=FixedOffset(datetime.timedelta(0), 'UTC')),
  'value': 8.5}


Dataframe. `timestamp` and `value_type`

In [17]:
# wrangel function
def wrangle(collection):
    # find pipeline
    result = collection.find(
        {"location": 3573, "value_type": "P2"},
        projection = {"timestamp":1, "value":1, "_id":0}
    )
    # data into dataframe
    df = pd.DataFrame(result).set_index("timestamp")
    # localize timezone from `UTC` to `Africa/Nairobi`
    df.index = df.index.tz_convert("Africa/Nairobi")
    # return
    return df

In [18]:
df = wrangle(Nairobi)
print(type(df))
print(df.shape)
print(df.info())

<class 'pandas.core.frame.DataFrame'>
(79913, 1)
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 79913 entries, 2025-01-01 13:56:30.946000+03:00 to 2025-02-01 02:55:14.252000+03:00
Data columns (total 1 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   value   79913 non-null  float64
dtypes: float64(1)
memory usage: 1.2 MB
None


In [19]:
df.head()

Unnamed: 0_level_0,value
timestamp,Unnamed: 1_level_1
2025-01-01 13:56:30.946000+03:00,8.5
2025-01-01 13:57:02.958000+03:00,12.0
2025-01-01 13:57:34.922000+03:00,11.0
2025-01-01 13:58:06.946000+03:00,10.6
2025-01-01 13:58:39.800000+03:00,13.4


Creating a `filepath` object

In [20]:
file_path = Path.cwd() / "wrangled_data"
print(type(file_path))
print(file_path)

<class 'pathlib._local.WindowsPath'>
C:\Users\MY PC\Desktop\Projects\Time-series\Air-Quality-Nairobi\wrangled_data


Creating the path `directory`

In [21]:
file_path.mkdir(exist_ok=True)

In [22]:
file_location = str(file_path / "cleaned_csv")

In [23]:
df.to_csv(file_location, index=True)

Reading the csv file

In [24]:
df = pd.read_csv(file_location, index_col=0, parse_dates=True)

df.shape

(79913, 1)

In [25]:
df.head()

Unnamed: 0_level_0,value
timestamp,Unnamed: 1_level_1
2025-01-01 13:56:30.946000+03:00,8.5
2025-01-01 13:57:02.958000+03:00,12.0
2025-01-01 13:57:34.922000+03:00,11.0
2025-01-01 13:58:06.946000+03:00,10.6
2025-01-01 13:58:39.800000+03:00,13.4


--- The End --- 