In [46]:
PROJECT_ID = "sound-berm-458313-a6"
DATASET = "project_m1"
TABLE = "air_meteo"
import gcsfs
WAQI_TOKEN = "692110e399843a3a0a99149e1ffbad4bf56e35a7"
OWM_KEY = "fa328cd88b0933b1bdb3d91b77473ff3"
import pyarrow as pa
import pyarrow.parquet as pq

cities = [
    {"city": "Paris", "country": "FR"},
    {"city": "Nairobi", "country": "KE"},
    {"city": "New Delhi", "country": "IN"},
    {"city": "Tokyo", "country": "JP"},
    {"city": "Ottawa", "country": "CA"},
    {"city": "Brasília", "country": "BR"},
    {"city": "Canberra", "country": "AU"},
    {"city": "Cairo", "country": "EG"},
    {"city": "Bangkok", "country": "TH"},
    {"city": "Washington", "country": "US"}
]
import requests
import pandas as pd
from datetime import datetime

def get_air(city):
    url = f"https://api.waqi.info/feed/{city}/?token={WAQI_TOKEN}"
    r = requests.get(url)
    if r.status_code == 200 and r.json().get("status") == "ok":
        d = r.json()["data"]
        return {
            "city": d.get("city", {}).get("name"),
            "timestamp": d.get("time", {}).get("iso"),
            "aqi": d.get("aqi"),
            "pm25": d.get("iaqi", {}).get("pm25", {}).get("v"),
            "pm10": d.get("iaqi", {}).get("pm10", {}).get("v")
        }
    return None

def get_weather(city, country):
    url = f"http://api.openweathermap.org/data/2.5/weather?q={city},{country}&appid={OWM_KEY}"
    r = requests.get(url)
    if r.status_code == 200:
        d = r.json()
        return {
            "city": d.get("name"),
            "timestamp": datetime.utcfromtimestamp(d.get("dt")).isoformat(),
            "temperature_c": d["main"]["temp"] - 273.15
        }
    return None

def extract_data(cities):
    air_data, weather_data = [], []
    for c in cities:
        a = get_air(c["city"])
        w = get_weather(c["city"], c["country"])
        if a: air_data.append(a)
        if w: weather_data.append(w)
        # Écriture des DataFrames en fichiers JSON
    df_air = pd.DataFrame(air_data)
    df_weather = pd.DataFrame(weather_data)
    print('df_air')
    print(df_air)  
    print('df_weather')
    print(df_weather)
    # GCS paths
    air_path = "gs://etl_meteo_air/data/air.parquet"#
    weather_path = "gs://etl_meteo_air/data/meteo.parquet"

    # Connexion GCS via gcsfs


        # Connexion GCS via gcsfs

    service_account_path = "sound-berm-458313-a6-def7ae220229.json"
    credentials = service_account.Credentials.from_service_account_file(service_account_path)

    fs = gcsfs.GCSFileSystem(token=service_account_path)
    # Sauvegarde en parquet sur  gcs
        # Conversion d'abord en pyarrow.Table
    table_air = pa.Table.from_pandas(df_air)
    table_weather = pa.Table.from_pandas(df_weather)

    # Sauvegarde sur GCS
    with fs.open(air_path, 'wb') as f:
        pq.write_table(table_air, f)

    with fs.open(weather_path, 'wb') as f:
        pq.write_table(table_weather, f)
    print("Données extraites et sauvegardées en Parquet sur GCS")

    return df_air, df_weather



In [47]:
import pandas as pd
import gcsfs
from google.oauth2 import service_account

def transform_data():
    #  Authentification


    service_account_path = "sound-berm-458313-a6-def7ae220229.json"

    fs = gcsfs.GCSFileSystem(token=service_account_path)
    # Lecture des fichiers Parquet depuis GCS
    # Lecture des fichiers Parquet depuis GCS
    df_air = pd.read_parquet(
        "gs://etl_meteo_air/data/air.parquet",
        storage_options={"token": service_account_path}
    )

    df_weather = pd.read_parquet(
        "gs://etl_meteo_air/data/meteo.parquet",
        storage_options={"token": service_account_path}
    )

    # Remplacement des noms de villes
    replacements = {
        "Nairobi US Embassy, Kenya": "Nairobi",
        "Major Dhyan Chand National Stadium, Delhi, Delhi, India": "New Delhi",
        "Meguro (目黒)": "Tokyo",
        "Paulínia, São Paulo, Brazil": "Brasília",
        "Civic, Canberra": "Canberra",
        "Cairo US Embassy, Egypt": "Cairo",
        "Aurora Hills Visitor Center, Northern Virginia, USA": "Washington"
    }

    df_air["city"] = df_air["city"].replace(replacements)

    #  Jointure sur city
    df = pd.merge(df_air, df_weather.rename(columns={"timestamp": "weather_timestamp"}), on="city", how="inner")

    #  Ajout de la colonne alert
    df["alert"] = (
        (df["aqi"] > 100) |
        (df["pm25"] > 50) |
        (df["pm10"] > 80) |
        (df["temperature_c"] > 35)
    )


    with fs.open("gs://etl_meteo_air/data/final_data.parquet", 'wb') as f:
        df.to_parquet(f, index=False)

    print(" Transformation terminée : fichier Parquet écrit dans gs://etl_meteo_air/data/final_data.parquet")


In [48]:
import pandas as pd
from google.oauth2 import service_account
from pandas_gbq import to_gbq

def load_data():
    
    #key_path = "complete-welder-455715-q5-b941efb07785.json"
    #credentials = service_account.Credentials.from_service_account_file(key_path)
    # Authentification
    service_account_path = "sound-berm-458313-a6-def7ae220229.json"
    credentials = service_account.Credentials.from_service_account_file(service_account_path)
    # lire le fichier Parquet directement depuis GCS avec Pandas

    df = pd.read_parquet(
        "gs://etl_meteo_air/data/final_data.parquet",
        storage_options={"token": service_account_path}
    )


    # Envoyer dans BigQuery
    to_gbq(
        dataframe=df,
        destination_table="project_m1.air_meteo",
        project_id="sound-berm-458313-a6",
        credentials=credentials,
        if_exists="append"  
    )

    print("Données envoyées dans BigQuery !")




In [49]:


def etl_air_quality():
    df_air, df_weather = extract_data(cities)
    df_final = transform_data()
    load_data()
    return "ETL terminé et envoyé dans BigQuery"


if __name__ == "__main__":
    print(etl_air_quality())

df_air
                                                city  \
0                                              Paris   
1                          Nairobi US Embassy, Kenya   
2  Major Dhyan Chand National Stadium, Delhi, Del...   
3                                        Meguro (目黒)   
4                                             Ottawa   
5                        Paulínia, São Paulo, Brazil   
6                                    Civic, Canberra   
7                            Cairo US Embassy, Egypt   
8                                            Bangkok   
9  Aurora Hills Visitor Center, Northern Virginia...   

                   timestamp  aqi   pm25  pm10  
0  2025-05-07T12:00:00+02:00   62   62.0  29.0  
1  2025-02-07T19:00:00+03:00    5    5.0   NaN  
2  2025-05-07T18:00:00+05:30  137  137.0  78.0  
3  2025-05-07T22:00:00+09:00   35    1.0  10.0  
4  2025-05-07T09:00:00-05:00   21   17.0   NaN  
5  2025-05-07T09:00:00-03:00   27    NaN  27.0  
6  2025-05-07T23:00:00+10:00    9

_request out of retries on exception: ('invalid_grant: Invalid grant: account not found', {'error': 'invalid_grant', 'error_description': 'Invalid grant: account not found'})
Traceback (most recent call last):
  File "C:\Users\PC2\anaconda3\lib\site-packages\gcsfs\retry.py", line 132, in retry_request
    return await func(*args, **kwargs)
  File "C:\Users\PC2\anaconda3\lib\site-packages\gcsfs\core.py", line 461, in _request
    headers=self._get_headers(headers),
  File "C:\Users\PC2\anaconda3\lib\site-packages\gcsfs\core.py", line 438, in _get_headers
    self.credentials.apply(out)
  File "C:\Users\PC2\anaconda3\lib\site-packages\gcsfs\credentials.py", line 188, in apply
    self.maybe_refresh()
  File "C:\Users\PC2\anaconda3\lib\site-packages\gcsfs\credentials.py", line 182, in maybe_refresh
    self.credentials.refresh(req)
  File "C:\Users\PC2\anaconda3\lib\site-packages\google\oauth2\service_account.py", line 448, in refresh
    access_token, expiry, _ = _client.jwt_grant(
  Fil

RefreshError: ('invalid_grant: Invalid grant: account not found', {'error': 'invalid_grant', 'error_description': 'Invalid grant: account not found'})