# Processing Data from Sensor.Community in Near-Real Time Setup (AD4GD, Pilot 3)

This notebook demonstrates how to process near real-time air quality data from the SDS011 sensor on the Sensor.Community network. The workflow includes downloading raw data, standardizing the format, and applying corrections using meteorological and model data.

### Step 1: Setup and Imports
We import the necessary Python modules and helper functions for downloading and processing the data. Make sure the *data_processing* module is in your Python path.

In [None]:
import subprocess
from datetime import datetime
from pathlib import Path
from data_processing.standardize_sensor_community import StandardizeData
from data_processing.download_data_nrt import (get_sensor_community_urls,
                                               process_sensor_community_nrt,
                                               download_ecmwf_data)
from data_processing.iot_qa_hour import Corrector
from data_processing.kriging_only import KrigingIoT

ModuleNotFoundError: No module named 'data_processing'

### Step 2: Get Download URLs for NRT Data
This step generates a list of URLs pointing to raw data for a specific sensor and date. This list is saved to a text file and can be used for bulk downloading. For fast downloading, rather use *aria2c* than *wget*
> aria2c -x6 -i ./sensor_community_urls.txt -d ./L0/

In [None]:
# get urls for downloading (NRT) data from sensor.community
date = datetime(2025, 5, 1)
sensor = 'sds011'
urlfn = Path('/scratch/ecm7934/ad4gd_pilot3_test2') / 'sensor_community_urls.txt'
get_sensor_community_urls(date, urlfn)

### Step 3: Merge Raw CSVs into a Single Parquet File
After downloading, all CSV files are merged into a single Parquet file for easier processing and consistency.

In [None]:
# now read downloaded csv files and merge them to one single parquet file for
# further processing
process_sensor_community_nrt(sensor=sensor, date=date, iotpath=Path('./L0/'))

### Step 4: Standardize to Hourly Averages
This step converts the raw sensor data into hourly-averaged format, which is essential for consistent comparison and later analysis.

In [None]:
# Standardize temporally uneven SDS011 data to hourly data
inputfn = Path('.', 'L1A', 'sds011', f"{sensor}_{date:%Y%m%d}.parquet")
outputfold = Path('.', 'L2')
Standard = StandardizeData(date, sensor, inputfn, outputfold)
Standard.run()

### Step 5: Apply Corrections and Outlier Filtering
In this step (partially shown), the standardized data is corrected using a machine learning model that integrates meteorological data (e.g. ERA5) and applies outlier detection based on CAMS Europe air quality model data. To this end, download download meteorological data (e.g. ERA5, ECMWF-IFS) and air quality model data (e.g. from CAMS Europe).

In [None]:
# Correct hourly SDS011 data using ML model with ERA5 data as input
# and apply outlier detection using CAMS data. To this end, download
# download meteorological data (e.g. ERA5, ECMWF-IFS) and air quality model data
# (e.g. from CAMS Europe).
meteofold = Path('.', 'meteo')
if not meteofold.exists():
    meteofold.mkdir(parents=True)
download_ecmwf_data(date, meteofold)

: 

In [None]:
# correct standardized SDS011 data using ML together with meteorological and CAMS data
scomfn = Path('.', 'L2', f'SDS011_PM2.5_hourly_{date:%Y%m%d}.nc')
pollutant = 'pm25'
meteofn = Path(meteofold, f'meteo_{date:%Y%m%d}.nc')
camsfn = Path(meteofold, f'cams_{date:%Y%m%d}.nc')
outputfolder = Path('.', 'L2B')
corr_outfn = Path(outputfolder, 'pm25', f'iot_hour_corr_pm25_{date:%Y%m%d}.nc')

Corr = Corrector(scomfn,
                date,
                pollutant,
                meteofn,
                camsfn,
                outputfolder,
                outfn=corr_outfn)
Corr.run()

### Step 6: Regrid Corrected IoT Data Using Kriging
In this final step, the corrected hourly IoT data is interpolated onto a regular latitude-longitude grid using a kriging algorithm. This spatial interpolation is crucial for integrating sensor data into structured gridded datasets, making it easier to:
- Compare with satellite or model data,
- Visualize on maps,
- Integrate into further data processing workflows.

In [None]:
l3fold = Path('.', 'L3')
if not l3fold.exists():
    l3fold.mkdir(parents=True)

outfn = Path(l3fold, f'iotonly_hour_pm25_gridded_{date:%Y%m%d}.nc')
Kriging = KrigingIoT(timeliness='hourly',
                     date=date,
                     iotmeasfn=corr_outfn,
                     outfn=outfn)
Kriging.run()

2025-07-07 13:51:58.096567 Reading IoT data
2025-07-07 13:52:23.309341 Processing 2024-12-01T00:00:00.000000000
2025-07-07 13:52:23.317219 Identifying clusters
2025-07-07 13:52:27.415335 Checking distance to nearest IoT station
2025-07-07 13:52:27.415466 Create and query KDTree
2025-07-07 13:52:39.716482 Calculate distances
2025-07-07 13:52:39.873968 Performing global kriging
