In [41]:
import boto3
import json
import pandas as pd
from io import BytesIO

In [None]:
# Function to load the secrets from the local file
def load_secrets(file_path):
    with open(file_path, 'r') as file:
        return json.load(file)
secrets = load_secrets('secrets.json')

aws_access_key_id = secrets['AWS_ACCESS_KEY_ID']
aws_secret_access_key = secrets['AWS_SECRET_ACCESS_KEY']
aws_region = secrets['AWS_REGION']

In [None]:
s3 = boto3.client('s3', 
                  aws_access_key_id=aws_access_key_id, 
                  aws_secret_access_key=aws_secret_access_key, 
                  region_name=aws_region)

In [45]:
bucket_name = 'greencoop-airbyte'
file_key = "greencoop-airbyte/Stations_meteorologiques_du_reseau_InfoClimat_(Bergues,_Hazebrouck,_Armentieres,_Lille-Lesquin)/2025_03_14_1741977939508_0.jsonl"

s3_object = s3.get_object(Bucket=bucket_name, Key=file_key)
file_content = s3_object['Body'].read().decode('utf-8')

In [46]:
# Read the JSONL file from S3
response = s3.get_object(Bucket=bucket_name, Key=file_key)
content = response["Body"].read().decode("utf-8")

# Convert JSONL to Pandas DataFrame
json_list = [json.loads(line) for line in content.splitlines()]
df = pd.DataFrame(json_list)

# Show first rows
df.head()


Unnamed: 0,_airbyte_raw_id,_airbyte_extracted_at,_airbyte_meta,_airbyte_generation_id,_airbyte_data
0,5e7a90f2-1e3e-42a6-8c76-ee59f1a8f45b,1741977942000,"{'sync_id': 1, 'changes': []}",1,"{'status': 'OK', 'errors': [], 'data': [], 'st..."


In [47]:
import pandas as pd
from pandas import json_normalize

# Assuming `df_clean` is the output of json_normalize
df_clean = pd.json_normalize(df["_airbyte_data"])

# Expand the 'stations' field
stations_df = pd.json_normalize(df_clean["stations"].explode())

# Expand the 'hourly' fields
hourly_dfs = {}
hourly_cols = [col for col in df_clean.columns if col.startswith("hourly.")]
for col in hourly_cols:
    hourly_dfs[col] = pd.json_normalize(df_clean[col].explode())

# Display the extracted DataFrames
print("Stations Data:")
print(stations_df.head())

for col, hourly_df in hourly_dfs.items():
    print(f"\nHourly Data ({col}):")
    print(hourly_df.head())


Stations Data:
           id           name  latitude  longitude  elevation    type  \
0       00052    Armentières    50.689      2.877         16  static   
1       000R5        Bergues    50.968      2.441         17  static   
2       07015  Lille-Lesquin    50.575      3.092         47   synop   
3  STATIC0010     Hazebrouck    50.734      2.545         31  static   

       license.license                                        license.url  \
0                CC BY    https://creativecommons.org/licenses/by/2.0/fr/   
1                CC BY    https://creativecommons.org/licenses/by/2.0/fr/   
2  Etalab Open License  https://www.etalab.gouv.fr/licence-ouverte-ope...   
3                CC BY    https://creativecommons.org/licenses/by/2.0/fr/   

                   license.source  \
0                   infoclimat.fr   
1                   infoclimat.fr   
2  Meteo-France via infoclimat.fr   
3                   infoclimat.fr   

                                 license.metadonnees

In [48]:
data_to_insert = df_clean.to_dict(orient="records")
data_to_insert[0].keys()

dict_keys(['status', 'errors', 'data', 'stations', 'metadata.temperature', 'metadata.pression', 'metadata.humidite', 'metadata.point_de_rosee', 'metadata.visibilite', 'metadata.vent_moyen', 'metadata.vent_rafales', 'metadata.vent_direction', 'metadata.pluie_3h', 'metadata.pluie_1h', 'metadata.neige_au_sol', 'metadata.nebulosite', 'metadata.temps_omm', 'hourly.07015', 'hourly.00052', 'hourly.000R5', 'hourly.STATIC0010', 'hourly._params'])

In [73]:
data_to_insert

[{'status': 'OK',
  'errors': [],
  'data': [],
  'stations': [{'id': '00052',
    'name': 'Armentières',
    'latitude': 50.689,
    'longitude': 2.877,
    'elevation': 16,
    'type': 'static',
    'license': {'license': 'CC BY',
     'url': 'https://creativecommons.org/licenses/by/2.0/fr/',
     'source': 'infoclimat.fr',
     'metadonnees': 'https://www.infoclimat.fr/stations/metadonnees.php?id=00052'}},
   {'id': '000R5',
    'name': 'Bergues',
    'latitude': 50.968,
    'longitude': 2.441,
    'elevation': 17,
    'type': 'static',
    'license': {'license': 'CC BY',
     'url': 'https://creativecommons.org/licenses/by/2.0/fr/',
     'source': 'infoclimat.fr',
     'metadonnees': 'https://www.infoclimat.fr/stations/metadonnees.php?id=000R5'}},
   {'id': '07015',
    'name': 'Lille-Lesquin',
    'latitude': 50.575,
    'longitude': 3.092,
    'elevation': 47,
    'type': 'synop',
    'license': {'license': 'Etalab Open License',
     'url': 'https://www.etalab.gouv.fr/licence-ou

In [49]:
data_to_insert[0]["stations"]

[{'id': '00052',
  'name': 'Armentières',
  'latitude': 50.689,
  'longitude': 2.877,
  'elevation': 16,
  'type': 'static',
  'license': {'license': 'CC BY',
   'url': 'https://creativecommons.org/licenses/by/2.0/fr/',
   'source': 'infoclimat.fr',
   'metadonnees': 'https://www.infoclimat.fr/stations/metadonnees.php?id=00052'}},
 {'id': '000R5',
  'name': 'Bergues',
  'latitude': 50.968,
  'longitude': 2.441,
  'elevation': 17,
  'type': 'static',
  'license': {'license': 'CC BY',
   'url': 'https://creativecommons.org/licenses/by/2.0/fr/',
   'source': 'infoclimat.fr',
   'metadonnees': 'https://www.infoclimat.fr/stations/metadonnees.php?id=000R5'}},
 {'id': '07015',
  'name': 'Lille-Lesquin',
  'latitude': 50.575,
  'longitude': 3.092,
  'elevation': 47,
  'type': 'synop',
  'license': {'license': 'Etalab Open License',
   'url': 'https://www.etalab.gouv.fr/licence-ouverte-open-licence',
   'source': 'Meteo-France via infoclimat.fr',
   'metadonnees': 'https://donneespubliques.mete

In [50]:
data_to_insert[0]['hourly.07015'][0:3]

[{'id_station': '07015',
  'dh_utc': '2024-10-05 00:00:00',
  'temperature': '7.6',
  'pression': '1020.7',
  'humidite': '89',
  'point_de_rosee': '5.9',
  'visibilite': '6000',
  'vent_moyen': '3.6',
  'vent_rafales': '7.2',
  'vent_direction': '90',
  'pluie_3h': '0',
  'pluie_1h': '0',
  'neige_au_sol': None,
  'nebulosite': '',
  'temps_omm': None},
 {'id_station': '07015',
  'dh_utc': '2024-10-05 01:00:00',
  'temperature': '7.5',
  'pression': '1020.6',
  'humidite': '92',
  'point_de_rosee': '6.3',
  'visibilite': '7000',
  'vent_moyen': '3.6',
  'vent_rafales': '7.2',
  'vent_direction': '30',
  'pluie_3h': None,
  'pluie_1h': '0',
  'neige_au_sol': None,
  'nebulosite': '',
  'temps_omm': None},
 {'id_station': '07015',
  'dh_utc': '2024-10-05 02:00:00',
  'temperature': '7.4',
  'pression': '1020.3',
  'humidite': '93',
  'point_de_rosee': '6.3',
  'visibilite': '4400',
  'vent_moyen': '7.2',
  'vent_rafales': '7.2',
  'vent_direction': '40',
  'pluie_3h': None,
  'pluie_1h'

In [51]:
for i in data_to_insert[0]['hourly.07015'][3].keys():
    j = data_to_insert[0]['hourly.07015'][3][i]
    print(type(j),"est le type de",j)

<class 'str'> est le type de 07015
<class 'str'> est le type de 2024-10-05 03:00:00
<class 'str'> est le type de 6.2
<class 'str'> est le type de 1020.1
<class 'str'> est le type de 95
<class 'str'> est le type de 5.5
<class 'str'> est le type de 2500
<class 'str'> est le type de 3.6
<class 'str'> est le type de 7.2
<class 'str'> est le type de 60
<class 'str'> est le type de 0
<class 'str'> est le type de 0
<class 'NoneType'> est le type de None
<class 'str'> est le type de 
<class 'str'> est le type de 10


In [52]:
from pymongo import MongoClient
import pandas as pd
import json

# ✅ 1. Connect to MongoDB
client = MongoClient("mongodb://localhost:27017/")  # Update if needed
db = client["weather_database"]  # Your database name
collection = db["weather_data"]  # Collection name

# ✅ 2. Load the JSON data (assuming it's in a file or a variable)
weather_data = data_to_insert

# ✅ 3. Extract Relevant Data
stations = weather_data[0].get("stations", [])  # Extract station metadata
hourly_data = {
    key: weather_data[0][key]
    for key in weather_data[0]
    if key.startswith("hourly.") and key != "hourly._params"
}

# ✅ 4. Flatten Hourly Data
flattened_data = []
for station_id, records in hourly_data.items():
    for record in records:
        flattened_data.append(record)  # Convert list of dicts into single list



In [53]:
flattened_data[1]

{'id_station': '07015',
 'dh_utc': '2024-10-05 01:00:00',
 'temperature': '7.5',
 'pression': '1020.6',
 'humidite': '92',
 'point_de_rosee': '6.3',
 'visibilite': '7000',
 'vent_moyen': '3.6',
 'vent_rafales': '7.2',
 'vent_direction': '30',
 'pluie_3h': None,
 'pluie_1h': '0',
 'neige_au_sol': None,
 'nebulosite': '',
 'temps_omm': None}

In [62]:
from datetime import datetime
import copy

def process_data(document):
    document = copy.deepcopy(document)
    # Convert strings to appropriate types
    document['dh_utc'] = datetime.strptime(document['dh_utc'], '%Y-%m-%d %H:%M:%S')  # Convert to datetime, must start as str
    # Ensure all fields are converted properly, handling missing keys
    document['temperature'] = float(document['temperature']) if document.get('temperature') else None
    document['pression'] = float(document['pression']) if document.get('pression') else None
    document['humidite'] = int(document['humidite']) if document.get('humidite') else None
    document['point_de_rosee'] = float(document['point_de_rosee']) if document.get('point_de_rosee') else None
    document['visibilite'] = int(document.get('visibilite', 0)) if document.get('visibilite') else None
    document['vent_moyen'] = float(document['vent_moyen']) if document.get('vent_moyen') else None
    document['vent_rafales'] = float(document['vent_rafales']) if document.get('vent_rafales') else None
    document['vent_direction'] = int(document['vent_direction']) if document.get('vent_direction') else None
    document['pluie_3h'] = float(document['pluie_3h']) if document.get('pluie_3h') else None
    document['pluie_1h'] = float(document['pluie_1h']) if document.get('pluie_1h') else None
    document['neige_au_sol'] = float(document.get('neige_au_sol', 0)) if document.get('neige_au_sol') else None
    document['nebulosite'] = str(document.get('nebulosite', '')) if document.get('nebulosite') else ''
    document['temps_omm'] = float(document.get('temps_omm', 0)) if document.get('temps_omm') else None
    

    return document

In [None]:
def rename_columns(document):
    # Column renaming and translation mapping
    column_mapping = {
        'dh_utc': 'datetime',
        'temperature': 'temperature_°C',  # In Celsius
        'pression': 'pressure_hPa',  # In hPa
        'humidite': 'humidity_%',
        'point_de_rosee': 'dew_point_°C',  # In Celsius
        'visibilite': 'visibility_m',  # Assuming meters for visibility
        'vent_moyen': 'wind_speed_kph',  # In km/h
        'vent_rafales': 'wind_gust_kph',  # In km/h
        'vent_direction': 'wind_dir',  # Wind direction (unchanged)
        'pluie_3h': 'precip_rate_mm/hr (3hrs)',  # In mm/hr
        'pluie_1h': 'precip_rate_mm/hr',  # In mm/hr
        'neige_au_sol': 'snow_depth_mm',  # In mm
        'nebulosite': 'cloud_coverage',  # General cloud coverage (string or percentage)
        'temps_omm': 'solar_w/m²'  # Assuming temperature is related to solar irradiance
    }

    # Rename keys based on the mapping
    for old_key, new_key in column_mapping.items():
        if old_key in document:
            document[new_key] = document.pop(old_key)

    return document

 

In [74]:
document1 = process_data(flattened_data[4])
document1

{'id_station': '07015',
 'dh_utc': datetime.datetime(2024, 10, 5, 4, 0),
 'temperature': 5.5,
 'pression': 1019.9,
 'humidite': 96,
 'point_de_rosee': 4.9,
 'visibilite': 500,
 'vent_moyen': 3.6,
 'vent_rafales': 3.6,
 'vent_direction': 90,
 'pluie_3h': None,
 'pluie_1h': 0.0,
 'neige_au_sol': None,
 'nebulosite': '',
 'temps_omm': 44.0,
 '_id': ObjectId('67e055cb4faba757ff531dc8')}

In [79]:
rename_columns(document1)

{'id_station': '07015',
 '_id': ObjectId('67e055cb4faba757ff531dc8'),
 'datetime': datetime.datetime(2024, 10, 5, 4, 0),
 'temperature_°C': 5.5,
 'pressure_hPa': 1019.9,
 'humidity_%': 96,
 'dew_point_°C': 4.9,
 'visibility_m': 500,
 'wind_speed_kph': 3.6,
 'wind_gust_kph': 3.6,
 'wind_dir': 90,
 'precip_rate_mm/hr': 0.0,
 'snow_depth_mm': None,
 'cloud_coverage': '',
 'solar_w/m²': 44.0}

In [None]:
for i in document1.keys():
    j = document1[i]
    print(type(j),"est le type de",j)

<class 'str'> est le type de 07015
<class 'datetime.datetime'> est le type de 2024-10-05 04:00:00
<class 'float'> est le type de 5.5
<class 'float'> est le type de 1019.9
<class 'int'> est le type de 96
<class 'float'> est le type de 4.9
<class 'int'> est le type de 500
<class 'float'> est le type de 3.6
<class 'float'> est le type de 3.6
<class 'int'> est le type de 90
<class 'NoneType'> est le type de None
<class 'float'> est le type de 0.0
<class 'NoneType'> est le type de None
<class 'str'> est le type de 
<class 'float'> est le type de 44.0
<class 'bson.objectid.ObjectId'> est le type de 67e055cb4faba757ff531dc8


In [66]:
document1['dh_utc']

datetime.datetime(2024, 10, 5, 4, 0)

In [86]:
# ✅ 5.1. Convert into correct types
for i in flattened_data:
    i = process_data(i)

processed_data = [process_data(doc) for doc in flattened_data]

# ✅ 5.1.5 Convert the column names into the general ones
processed_data = [rename_columns(doc) for doc in processed_data] 

# ✅ 5.2. Insert into MongoDB
if flattened_data:
    collection.insert_many(processed_data)
    print(f"✅ Inserted {len(processed_data)} records into MongoDB!")



✅ Inserted 1143 records into MongoDB!


In [70]:
type(flattened_data)

list

In [59]:
flattened_data[0]

{'id_station': '07015',
 'dh_utc': '2024-10-05 00:00:00',
 'temperature': '7.6',
 'pression': '1020.7',
 'humidite': '89',
 'point_de_rosee': '5.9',
 'visibilite': '6000',
 'vent_moyen': '3.6',
 'vent_rafales': '7.2',
 'vent_direction': '90',
 'pluie_3h': '0',
 'pluie_1h': '0',
 'neige_au_sol': None,
 'nebulosite': '',
 'temps_omm': None,
 '_id': ObjectId('67e055cb4faba757ff531dc4')}

In [84]:
# ✅ 6. Verify Data
print("Sample Data from MongoDB:")
pipeline = [
    {
        "$group": {
            "_id": "$id_station",  # Group by id_station
            "first_document": {"$first": "$$ROOT"}  # Take the first document for each id_station
        }
    },
    {
        "$replaceRoot": {"newRoot": "$first_document"}  # Replace the root document with the first document
    }
]
# Execute the aggregation
result = collection.aggregate(pipeline)

# Fetch the results
distinct_documents = list(result)

# Print the result
for doc in distinct_documents:
    print(doc)

Sample Data from MongoDB:
{'_id': ObjectId('67e055cb4faba757ff531dc4'), 'id_station': '07015', 'datetime': datetime.datetime(2024, 10, 5, 0, 0), 'temperature_°C': 7.6, 'pressure_hPa': 1020.7, 'humidity_%': 89, 'dew_point_°C': 5.9, 'visibility_m': 6000, 'wind_speed_kph': 3.6, 'wind_gust_kph': 7.2, 'wind_dir': 90, 'precip_rate_mm/hr': 0.0, 'snow_depth_mm': None, 'cloud_coverage': '', 'solar_w/m²': None}
{'_id': ObjectId('67e055cb4faba757ff531f69'), 'id_station': '000R5', 'datetime': datetime.datetime(2024, 10, 5, 0, 0), 'temperature_°C': 8.4, 'pressure_hPa': 1019.3, 'humidity_%': 86, 'dew_point_°C': 6.2, 'visibility_m': None, 'wind_speed_kph': 1.4, 'wind_gust_kph': None, 'wind_dir': 113, 'precip_rate_mm/hr': 0.0, 'snow_depth_mm': None, 'cloud_coverage': '', 'solar_w/m²': None}
{'_id': ObjectId('67e055cb4faba757ff531e00'), 'id_station': '00052', 'datetime': datetime.datetime(2024, 10, 5, 0, 0), 'temperature_°C': 6.4, 'pressure_hPa': 1019.1, 'humidity_%': 95, 'dew_point_°C': 5.6, 'visibili