Author: Adafaly Matthieu </br>

### To use this notebook, simply run it. The necessary data will be automatically downloaded and placed in a folder named Data — create this folder if it does not already exist.
### You do not need to extract any files — the Data Import notebook is designed to process them directly.
### You only need to run this notebook once. It will generate a dataset that can be used in the other notebooks.

# Importation of the libraries


In [2]:
import json
import glob
import pandas as pd
from tqdm import tqdm
import numpy as np
import glob
import zipfile
import json
from rudi_node_read.rudi_node_reader import RudiNodeReader
import os

# Data Importation

In [8]:
rudi_node_url = 'https://opendata.fenix.rudi-univ-rennes1.fr'
rudi_node_info = RudiNodeReader(server_url=rudi_node_url)
meta_id ='1d67d073-b831-4206-b673-aa9a47978a44'
dwnld_tag = 'Downloading'
dwnld_dir = './Data'
print(dwnld_tag, f"media for metadata '{meta_id}':", rudi_node_info.download_files_for_metadata(meta_id, dwnld_dir))

Downloading media for metadata '1d67d073-b831-4206-b673-aa9a47978a44': {'downloaded': [{'media_name': 'dump_dev_aqmoinfra_data_2020_08_01-00h00to2020_09_01-00h00.json.zip', 'media_id': '22682787-348f-4b4b-9bcc-0bc37e373bec', 'media_url': 'https://opendata.fenix.rudi-univ-rennes1.fr/storage/download/22682787-348f-4b4b-9bcc-0bc37e373bec', 'file_type': 'application/zip', 'created': '2025-05-19T08:59:04.769Z', 'updated': '2025-05-19T08:59:04.769Z', 'file_path': '/udd/madafaly/Projet pollution/Projet/Data/dump_dev_aqmoinfra_data_2020_08_01-00h00to2020_09_01-00h00.json.zip'}, {'media_name': 'dump_dev_aqmoinfra_data_2020_12_01-00h00to2021_01_01-00h00.json.zip', 'media_id': 'c4609776-3097-4e58-9a0b-157a79a767c0', 'media_url': 'https://opendata.fenix.rudi-univ-rennes1.fr/storage/download/c4609776-3097-4e58-9a0b-157a79a767c0', 'file_type': 'application/zip', 'created': '2025-05-19T08:58:49.447Z', 'updated': '2025-05-19T08:58:49.447Z', 'file_path': '/udd/madafaly/Projet pollution/Projet/Data/dump

# Concatenation of the data (This process may take several minutes.)

This code allow to concatenate fson file which are in the repertory Data where the AQMo data stand.

In [9]:
# Get all .json.zip files in the Data directory
files = glob.glob('Data/*.json.zip')
merged_data = []
for file in files:
    with zipfile.ZipFile(file, 'r') as archive:
        for name in tqdm(archive.namelist()):
            # Ignore macOS metadata files
            if name.startswith('__MACOSX/') or os.path.basename(name).startswith('._'):
                continue

            with archive.open(name) as f:
                try:
                    content = f.read().decode('utf-8').strip()
                    if not content:
                        print(f"⚠️ Empty file in {file} : {name}")
                        continue
                    data = json.loads(content)
                    merged_data.append(data)
                except json.JSONDecodeError as e:
                    print(f"❌ JSON format error in {file} > {name} : {e}")
                except UnicodeDecodeError as e:
                    print(f"❌ Encoding error in {file} > {name} : {e}")
# Write merged data to a single JSON file
with open('./Data/merged_data.json', 'w', encoding='utf-8') as outfile:
    json.dump(merged_data, outfile, ensure_ascii=False, indent=2)
print('End')

100%|█████████████████████████████████████████████| 2/2 [00:03<00:00,  1.54s/it]
100%|█████████████████████████████████████████████| 2/2 [00:03<00:00,  1.67s/it]
100%|█████████████████████████████████████████████| 2/2 [00:22<00:00, 11.27s/it]
100%|█████████████████████████████████████████████| 2/2 [00:03<00:00,  1.88s/it]
100%|█████████████████████████████████████████████| 2/2 [00:03<00:00,  1.53s/it]
100%|█████████████████████████████████████████████| 2/2 [00:20<00:00, 10.02s/it]
100%|█████████████████████████████████████████████| 2/2 [00:05<00:00,  2.82s/it]
100%|█████████████████████████████████████████████| 2/2 [00:03<00:00,  1.98s/it]
100%|█████████████████████████████████████████████| 2/2 [00:20<00:00, 10.33s/it]


End


# Creation of the clean dataframe for visualisation (This process may take several minutes.)

Formatting of the data of the dataframe create just before. This new dataframe will be a pickle one.

Creation of all hourly variables to facilitate the following analyses.


In [3]:
# Load the merged JSON data create just before into a DataFrame
df = pd.read_json("Data/merged_data.json")

# Extract the 'features' column, which contains nested structures
data = df['features']

merged_list = []
# List of column names that will later be used to extract values
columns = ['OPC_N3:04', 'OPC_N3:12', 'OPC_N3:05', 'OPC_N3:16', 'OPC_N3:01', 
           'OPC_N3:03', 'OPC_N3:09', 'OPC_N3:17', 'OPC_N3:14', 'OPC_N3:20']

# Flatten all elements inside the 'features' lists into a single list
for row in data:
    merged_list.extend(row)

# Convert the flattened list of feature dictionaries into a DataFrame
df = pd.DataFrame(merged_list)

# Normalize nested fields from each feature (like _id, geometry, properties)
df_conc = pd.concat([
    pd.json_normalize(df["_id"]).rename(columns={"$oid": "id"}),  # Extract _id field
    pd.json_normalize(df["geometry"]).rename(columns={"type": "geo_type", "coordinates": "geo_coords"}),  # Extract geometry
    pd.json_normalize(df["properties"])  # Extract properties
], axis=1)

# Function to extract the first non-empty list or value from a predefined list of columns
def get_non_na_value(row):
    """
    Parameters:
    row: A row of the DataFrame.

    Returns:
    The first non-empty list or non-NaN value found in the specified columns.
    Returns NaN if all values are empty or NaN.
    """
    for col in columns:
        cell = row[col]
        if isinstance(cell, list) and cell:  # If it's a non-empty list
            return cell
        elif pd.notna(cell):  # If it's a non-null value
            return cell
    return np.nan  # Return NaN if no valid value is found
# Apply the function to each row to get a representative value and store it in a new column
df_conc['concatenated_values'] = df_conc.apply(get_non_na_value, axis=1)

# Function to extract the value of a specified key from a list of dictionaries
def extract_value(opc_list, key):
    """
    Parameters:
    opc_list (list): A list of dictionaries containing sensor data.
    key: The key to extract the value for (['p', 'bu', 'dc', 'SDN:L20', 'v', 't', 'bn').

    Returns:
    The value associated with the key from the first dictionary where the key exists.
    Returns None if the key is not found in any dictionary.
    """
    if isinstance(opc_list, list):
        for item in opc_list:
            if key in item:
                return item[key]
    return None
# List of keys to extract from the 'concatenated_values' column
keys = ['p', 'bu', 'dc', 'SDN:L20', 'v', 't', 'bn']
# For each key, create a new column in the DataFrame by extracting values from the 'concatenated_values' column
for key in tqdm(keys):
    col_name = f"{key}"
    df_conc[col_name] = df_conc.apply(
        lambda row: extract_value(row['concatenated_values'], key) 
                    if pd.isna(row.get(col_name)) else row[col_name], axis=1)
# Extract latitude and longitude from 'geo_coords' only if the geometry type is 'Point'
df_conc.loc[df_conc['geo_type'] == 'Point', 'longitude'] = df_conc['geo_coords'].apply(
    lambda x: x[0] if isinstance(x, list) else None)
df_conc.loc[df_conc['geo_type'] == 'Point', 'latitude'] = df_conc['geo_coords'].apply(
    lambda x: x[1] if isinstance(x, list) else None)
# Extract latitude and longitude from 'geo_coords' only if the geometry type is 'LineString'
df_conc.loc[df_conc['geo_type'] == 'LineString', 'longitude'] = df_conc['geo_coords'].apply(
    lambda x: sum(coord[0] for coord in x) / len(x) if isinstance(x, list) and all(isinstance(coord, list) for coord in x) else None)
df_conc.loc[df_conc['geo_type'] == 'LineString', 'latitude'] = df_conc['geo_coords'].apply(
    lambda x: sum(coord[1] for coord in x) / len(x) if isinstance(x, list) and all(isinstance(coord, list) for coord in x) else None)
# Convert date strings to datetime objects
df_conc["station.date"] = pd.to_datetime(df_conc["station.date"], errors='coerce')
df_conc["station.from_date"] = pd.to_datetime(df_conc["station.from_date"], errors='coerce')
print("End")

100%|█████████████████████████████████████████████| 7/7 [04:01<00:00, 34.51s/it]


End


In [5]:
display(df_conc)

Unnamed: 0,id,geo_type,geo_coords,station.name,station.date,station.from_date,station.radius,station.policy,OPC_N3:12,OPC_N3:04,...,concatenated_values,p,bu,dc,SDN:L20,v,t,bn,longitude,latitude
0,5f4dd4a08138160014a73e49,Point,"[-1.643674109, 48.110235135]",parautarin02,2020-08-31 17:16:47+00:00,2020-08-31 17:16:47+00:00,0,mobileGps,,,...,,,,,,,,,-1.643674,48.110235
1,5f4dd4b88138160014a73e51,Point,"[-1.643674109, 48.110235135]",parautarin02,2020-08-31 17:16:47+00:00,2020-08-31 17:16:47+00:00,0,mobileGps,,,...,,,,,,,,,-1.643674,48.110235
2,5f4dd4b88138160014a73e52,Point,"[-1.643674109, 48.110235135]",parautarin02,2020-08-31 17:16:47+00:00,2020-08-31 17:16:47+00:00,0,mobileGps,"[{'bn': 'OPC_N3:12', 'bu': 'ug/m3', 'dc': 0}, ...",,...,"[{'bn': 'OPC_N3:12', 'bu': 'ug/m3', 'dc': 0}, ...",24.226,ug/m3,0.0,0.0,1.61,2020-09-01 04:57:28 UTC,OPC_N3:12,-1.643674,48.110235
3,5f4dd4e68138160014a73e5e,Point,"[-1.643674109, 48.110235135]",parautarin02,2020-08-31 17:16:47+00:00,2020-08-31 17:16:47+00:00,0,mobileGps,,,...,,,,,,,,,-1.643674,48.110235
4,5f4dd8108138160014a73fdd,Point,"[-1.643601215, 48.110021621]",parautarin33,2020-08-31 17:07:05+00:00,2020-08-31 17:07:05+00:00,0,mobileGps,,,...,,,,,,,,,-1.643601,48.110022
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2808114,5f4dd56a8138160014a73e96,Point,"[-1.643686376, 48.110172833]",parautarin02,2020-09-01 03:00:25+00:00,2020-09-01 03:00:25+00:00,0,mobileGps,"[{'bn': 'OPC_N3:12', 'bu': 'ug/m3', 'dc': 0}, ...",,...,"[{'bn': 'OPC_N3:12', 'bu': 'ug/m3', 'dc': 0}, ...",44.083,ug/m3,0.0,0.0,2.00,2020-09-01 05:00:26 UTC,OPC_N3:12,-1.643686,48.110173
2808115,5f4dd56a8138160014a73e97,Point,"[-1.643686376, 48.110172833]",parautarin02,2020-09-01 03:00:25+00:00,2020-09-01 03:00:25+00:00,0,mobileGps,,,...,,,,,,,,,-1.643686,48.110173
2808116,5f4dd53e8138160014a73e80,Point,"[-1.643686376, 48.110172833]",parautarin02,2020-09-01 02:59:40+00:00,2020-09-01 02:59:40+00:00,0,mobileGps,"[{'bn': 'OPC_N3:12', 'bu': 'ug/m3', 'dc': 0}, ...",,...,"[{'bn': 'OPC_N3:12', 'bu': 'ug/m3', 'dc': 0}, ...",44.259,ug/m3,0.0,0.0,2.35,2020-09-01 04:59:42 UTC,OPC_N3:12,-1.643686,48.110173
2808117,5f4dd53e8138160014a73e81,Point,"[-1.643686376, 48.110172833]",parautarin02,2020-09-01 02:59:40+00:00,2020-09-01 02:59:40+00:00,0,mobileGps,,,...,,,,,,,,,-1.643686,48.110173


In [6]:
df_clean=df_conc
df_clean["day_week"] = df_clean["station.date"].dt.day_name()
df_clean['day'] = df_clean['station.date'].dt.day
df_clean['hour'] = df_clean['station.date'].dt.hour
df_clean['month'] = df_clean['station.date'].dt.month
df_clean['year'] = df_clean['station.date'].dt.year
df_clean['hour_minute_second'] = df_clean['station.date'].dt.strftime('%H:%M:%S')

We remove duplicate or unnecessary variables.

In [7]:
df_clean = df_clean.drop(columns=["bu",'dc','SDN:L20','concatenated_values','t','station.radius'])
df_clean = df_clean.drop(columns=df_clean.filter(like='OPC').columns)

First, we remove the sensor 'parautarin 36' because it recorded only one measurement that had associated pollution values. Then, we remove the sensor 'standalone-LOPY-AQ05' because it has the same issue(not enough value). Finally, we drop the fixed measurement from the 'parautarin35' sensor, which is normally a mobile one.

In [8]:
df_clean = df_clean[~df_clean['station.name'].isin(['parautarin36'])]
df_clean = df_clean[~df_clean['station.name'].isin(['standalone-LOPY-AQ05'])]
df_clean = df_clean[~((df_clean['station.name'] == 'parautarin35') & (df_clean['station.policy'] == 'fixedGps'))]

We rename the variables to improve clarity.

In [9]:
df_clean.rename(columns={"v": "PM_2.5",
    "station.name": "sensor_name",
    "station.date": "measure_date",
    "station.from_date": "start_date",
    "station.policy": "sensor_type",
}, inplace=True)

We explicitly define the data types of each column in the DataFrame to ensure correct data handling and avoid type-related issues during analysis.

In [10]:
# Display a detailed summary of columns and their data types
df_clean = df_clean.astype({
    'id': 'string',
    'geo_type': 'string',
    'bn': 'string',
    'sensor_type': 'string',
    'day_week': 'string',
    'longitude': 'float',
    'latitude': 'float',
    'hour_minute_second': 'string'
})

In [None]:
df_clean.dtypes

In [None]:
df_clean = df_clean.replace(["NaN",np.nan,None], pd.NA)

In [None]:
df_clean = df_clean.dropna(subset=["p", "PM_2.5", "bn"])

In [None]:
df_clean = df_clean.astype({
    'PM_2.5': 'float',
    'p': 'float',
})

In [None]:
df_clean.dtypes

In [None]:
df_clean = df_clean.set_index(["sensor_name", "measure_date"])

In [None]:
display(df_clean)

This returns all rows whose index appears more than once in the DataFrame.

In [None]:
duplicated = df_clean.index.duplicated(keep=False)
df_clean[duplicated]

In [None]:
# Sort the DataFrame by index and by ascending 'p' values
df_sorted = df_clean.sort_values(by=['sensor_name', 'measure_date', 'p'])

# Keep the first occurrence for each index (the one with the smallest 'p' comes first)
df_unique = df_sorted[~df_sorted.index.duplicated(keep='first')]


In [None]:
df_unique = df_unique.sort_index(level="measure_date")
df_unique = df_unique.sort_index(level="sensor_name")

In [None]:
display(df_unique)

Save the DataFrame as a pickle (.pkl) file

In [None]:
df_unique.to_pickle('Data/pollution_rennes.pkl')  # Save the complete data
print("Files loaded")

Save the DataFrame as a csv file

In [None]:
# Save as CSV
df_unique.to_csv("Data/pollution_rennes.csv", index=True)  # Use index=False to exclude the index as a column