In [54]:
import pandas as pd
from tqdm.asyncio import tqdm
import requests
from pydantic import BaseModel, field_validator
from time import time
from datetime import datetime
import asyncio
import httpx
import os
from typing import List, Optional
from pydantic import BaseModel
from datetime import datetime
from enum import Enum


In [129]:
class LoginInfos(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str


class FuConstituantProduct(BaseModel):
    name: str
    quantity: float
    unit: str
    unitId: int
    constituantType: Optional[int]


class HealthData(BaseModel):
    airRating: Optional[str]
    eCovFormaldehyde: Optional[str]
    eRadioactive: Optional[str]
    otherHealthInfo: Optional[str]
    isContactDrinkingWater: Optional[bool]
    isContactNotDrinkingWater: Optional[bool]
    healthNumber: Optional[str]
    infosDrinkingWater: Optional[str]
    infosNotDrinkingWater: Optional[str]


class ComfortData(BaseModel):
    comfortHygrothermal: Optional[str]
    comfortAcoustic: Optional[str]
    comfortVisual: Optional[str]
    comfortOlfactory: Optional[str]
    otherComfortInfo: Optional[str]


class ResponsibleOrganism(BaseModel):
    name: Optional[str]
    acronym: Optional[str]
    country: Optional[str]
    address: Optional[str]
    website: Optional[str]


class ResponsibleContact(BaseModel):
    lastName: Optional[str]
    firstName: Optional[str]
    phone: Optional[str]
    fax: Optional[str]
    email: Optional[str]


class IndicatorQuantity(BaseModel):
    indicatorId: int
    indicatorName: Optional[str] = None
    indicatorUnit: Optional[str] = None
    phaseId: int
    phaseName: Optional[str] = None
    quantity: float

    def populate_indicator_fields(self, indicators: List[dict]):
        for indicator in indicators:
            if indicator["id"] == self.indicatorId:
                self.indicatorName = indicator["nameFr"]
                self.indicatorUnit = indicator["unitName"]
                return

    def populate_phase_name(self, phases: List[dict]):
        for phase in phases:
            if phase["id"] == self.phaseId:
                self.phaseName = phase["nameFr"]
                return


class IndicatorSet(BaseModel):
    id: int
    name: Optional[str] = None
    indicatorQuantities: List[IndicatorQuantity]

    def populate_name(self, mapping: dict):
        self.name = mapping.get(self.id, "Autre norme")

    def populate_indicators(self, indicators: List[dict], phases: List[dict]):
        for i in range(len(self.indicatorQuantities)):
            self.indicatorQuantities[i].populate_indicator_fields(indicators)
            self.indicatorQuantities[i].populate_phase_name(phases)


class EpdShort(BaseModel):
    id: int
    serialIdentifier: str
    name: Optional[str]
    classificationIds: List[int]
    lastUpdate: Optional[datetime]
    isArchived: Optional[bool]


class Epd(BaseModel):
    id: int
    serialIdentifier: str
    name: str
    version: Optional[str]
    declarationType: Optional[int]
    declarationTypeName: Optional[str]
    responsibleOrganism: ResponsibleOrganism
    commercialReferences: Optional[str]
    dvt: Optional[int]
    ufQuantity: Optional[float]
    ufUnit: Optional[str]
    ufDescription: Optional[str]
    carbonBiogenicStorage: Optional[float]
    packagingCarbonBiogenicStorage: Optional[float]
    distanceTransportA4Km: Optional[float]
    productionPlace: Optional[str]
    productionRegionFr: List[str]
    fuConstituantProducts: List[FuConstituantProduct]
    indicatorSet: IndicatorSet


class EpdFull(Epd):
    statut: Optional[int]
    statutName: Optional[str]
    onlineDate: Optional[datetime]
    lastUpdateDate: Optional[datetime]
    expirationDate: Optional[datetime]
    isPep: Optional[bool]
    classificationId: Optional[int]
    classificationId2: Optional[int]
    classificationId3: Optional[int]
    issueDate: Optional[datetime]
    isVerified: Optional[bool]
    verificationDate: Optional[datetime]
    commercialBrands: Optional[str]
    commercialReferencesNumber: Optional[int]
    usageAbility: Optional[str]
    ufUnitId: Optional[int]
    implementationFallRate: Optional[float]
    maintenanceFrequency: Optional[float]
    contentDeclaration: Optional[str]
    characteristicsNotInUf: Optional[str]
    healthData: HealthData
    comfortData: ComfortData
    responsibleContact: ResponsibleContact
    isBtoB: Optional[bool]
    performanceUf: Optional[str]
    performanceUfQuantity: Optional[float]
    performanceUfUnit: Optional[str]
    performanceUfUnitId: Optional[int]
    distanceTransportC2DechetsRecyclesKm: Optional[float]
    distanceTransportC2DechetsValorisesKm: Optional[float]
    distanceTransportC2DechetsEliminesKm: Optional[float]
    registrationDate: Optional[datetime]

In [None]:
from typing import Dict


class IniesClient:
    def __init__(self, login_infos: LoginInfos = None, max_concurrent_tasks: int = 100):
        if not login_infos:
            self.login_infos = self.login()
        else:
            self.login_infos = login_infos
        self.login_infos_last_update = time()
        self.normes = self.get_normes()
        self.indicators, self.phases = self.get_all_indicators_and_phases()
        self.semaphore = asyncio.Semaphore(max_concurrent_tasks)
        self.client = httpx.AsyncClient()

    def login(self):
        url = "https://base-inies.fr/ws/Login"
        payload = {"email": os.getenv("API_LOGIN"), "apiKey": os.getenv("API_KEY")}
        headers = {"content-type": "application/json"}

        response = requests.post(url, json=payload, headers=headers)
        response.raise_for_status()

        return LoginInfos(**response.json())

    def get_normes(self):
        url = "https://base-inies.fr/ws/Norme"
        headers = {"authorization": f"Bearer {self.login_infos.access_token}"}

        response = requests.get(url, headers=headers)
        response.raise_for_status()

        normes = {}
        for resp in response.json():
            normes[resp["id"]] = resp["name"]

        return normes

    def get_all_indicators_and_phases(self):
        indicators: Dict[list] = {}
        phases: Dict[list] = {}
        if not self.normes:
            self.normes = self.get_normes()
        for norme_id in self.normes.keys():
            indicators[norme_id], phases[norme_id] = self.get_indicators_and_phases_for_norme(norme_id)

        return indicators, phases


    def get_indicators_and_phases_for_norme(self, norme_id: int):
        url = f"https://base-inies.fr/ws/Norme/{norme_id}"
        headers = {"authorization": f"Bearer {self.login_infos.access_token}"}

        response = requests.get(url, headers=headers)
        response.raise_for_status()
        response = response.json()

        return response["indicators"], response["phases"]



    async def refresh_token(self):
        url = "https://base-inies.fr/ws/RefreshToken"
        headers = {"content-type": "application/json"}
        payload = {
            "accessToken": self.login_infos.access_token,
            "refreshToken": self.login_infos.refresh_token,
        }

        response = await self.client.post(url, json=payload, headers=headers)

        response.raise_for_status()
        self.login_infos = LoginInfos(**response.json())
        self.login_infos_last_update = time()

    async def get_auth_headers(self):
        if not self.login_infos or time() - self.login_infos_last_update > 20 * 60:
            await self.refresh_token()
        return {"authorization": f"Bearer {self.login_infos.access_token}"}

    async def get_all_epds(self, since_date: datetime) -> List[Epd]:
        all_epds_short = await self.get_all_epds_short(since_date)
        print(f"EPDs to retrieve: {len(all_epds_short)}")

        all_epds = []
        tasks = [
            self.async_func_with_retries(
                async_func=self.get_epd, retries=3, epd_id=epd.id
            )
            for epd in all_epds_short
        ]
        for result in tqdm.as_completed(
            tasks,
            desc=f"Processing EPDs",
            unit="epd",
            total=len(all_epds_short),
        ):
            try:
                epd = await result
                all_epds.append(epd)
            except Exception as e:
                print(f"Error fetching EPD: {e}")

        return all_epds

    async def get_all_epds_short(self, since_date: datetime = None) -> List[EpdShort]:
        url = "https://base-inies.fr/ws/Epd"
        headers = await self.get_auth_headers()

        params = {"includeArchived": "false"}
        if since_date:
            params["referenceDateTime"] = since_date.strftime(r"%Y-%m-%d")

        response = await self.client.get(url, headers=headers, params=params)
        response.raise_for_status()

        return [EpdShort(**epd) for epd in response.json()]

    async def get_epd(self, epd_id: int) -> Epd:
        url = f"https://base-inies.fr/ws/Epd/{epd_id}"
        headers = await self.get_auth_headers()

        try:
            response = await self.client.get(url, headers=headers, timeout=60)
            response.raise_for_status()
            epd = Epd(**response.json())
            epd.indicatorSet.populate_name(self.normes)
            epd.indicatorSet.populate_indicators(self.indicators[epd.indicatorSet.id], self.phases[epd.indicatorSet.id])
            return epd
        except httpx.HTTPStatusError as e:
            print(f"HTTP error: {e.response.status_code} - {e.response.text}")
            raise
        except httpx.RequestError as e:
            print(f"Request error: {e.request.url} - {str(e)}")
            print(e)
            raise


    async def async_func_with_retries(self, async_func, retries: int, **kwargs) -> Epd:
        for attempt in range(retries):
            async with self.semaphore:
                try:
                    return await async_func(**kwargs)
                except Exception as e:
                    if attempt == retries - 1:
                        print(f"Attempt {attempt + 1} failed with args {kwargs}: {e}")
                        raise
                    await asyncio.sleep(1)

    async def close(self):
        await self.client.aclose()

In [None]:
client = IniesClient()
try:
    all_epds = await client.get_all_epds(datetime(2024, 12, 17))
    len(all_epds)
finally:
    await client.close()

In [247]:
df = pd.DataFrame(epd.model_dump() for epd in all_epds)
df["responsibleOrganism"] = df["responsibleOrganism"].apply(lambda x: x["name"])
df["norme"] = df["indicatorSet"].apply(lambda x: x["name"])

normalized_indicator_set = pd.DataFrame(pd.json_normalize(df.indicatorSet)["indicatorQuantities"])
normalized_indicator_set['id'] = df['id']
exploded = normalized_indicator_set.explode("indicatorQuantities").reset_index(drop=True)
normalized_quantities = pd.json_normalize(exploded["indicatorQuantities"])
normalized_quantities["id"] = exploded["id"]
normalized_quantities = normalized_quantities[normalized_quantities.indicatorId == 57]
normalized_quantities.drop_duplicates(subset=['id', 'phaseName'], inplace=True)

pivoted_quantities = normalized_quantities.pivot(
    index='id', columns='phaseName', values='quantity'
).reset_index()

df = df.merge(pivoted_quantities, on='id', how='left')
df.fuConstituantProducts = df.fuConstituantProducts.apply(lambda x: " ; ".join(f"{constituant["name"]} {constituant["quantity"]} {constituant["unit"]}" for constituant in x))
df.drop(columns="indicatorSet", inplace=True)
# df.productionRegionFr = df.productionRegionFr.apply(lambda regions: None if regions == [] else ", ".join(region for region in regions))
df.commercialReferences = df.commercialReferences.str.replace("\n", "; ")

df.shape

(54, 46)

In [246]:
df.sample(2)

Unnamed: 0,id,serialIdentifier,name,version,declarationType,declarationTypeName,responsibleOrganism,commercialReferences,dvt,ufQuantity,...,Fin de vie,Mise en oeuvre,Production,Total cycle de vie,Transport,Vie en oeuvre,Étape de fin de vie,Étape de production,Étape du processus de construction,Étape d’utilisation
36,40794,INIES_IQCO20241029_175347,Système OPTIMA Murs avec membrane et GR 32 Rou...,1.1,1,Individual,SAINT-GOBAIN ISOVER,Système OPTIMA Murs avec membrane et GR 32 Rou...,50,1.0,...,,,,15.1,,,1.49,11.5,2.06,0.0
4,41493,SCHN-01268-V01.01-FR,"Masterpact, unité de contrôle Micrologic 6.0 E...",1.1,1,Individual,SCHNEIDER ELECTRIC INDUSTRIES SAS,"33021, 33069, 33070, 33071, 33072, 33073, 3307...",10,1.0,...,0.568894,0.037703,15.258588,24.971655,0.028415,9.078056,,,,


In [248]:
df.to_csv("./export_inies.csv")