In [14]:
import requests
import json



class EnedisAPI:

    def __init__(self) -> None:
        pass

    
    def get_access_token(self, credential_path):
        """
        Retrieve the access token from the Enedis API
        """

        with open(credential_path, 'r') as json_file:
            credential_json = json.load(json_file)

        client_ID = credential_json['client_ID']
        client_secret = credential_json['client_secret']


        url = "https://gw.ext.prod-sandbox.api.enedis.fr/oauth2/v3/token"
        headers = {
            "Content-Type": "application/x-www-form-urlencoded",
        }

        data = {
            "grant_type": "client_credentials",
            "client_id": client_ID,  # Replace with your actual client ID
            "client_secret": client_secret  # Replace with your actual client secret
        }

        response = requests.post(url, headers=headers, data=data)

        # To get the response data
        if response.status_code == 200:
            token = response.json()  # Extract the JSON response
            print("Access Token:", token)
            self.access_token = token['access_token']
        else:
            print(f"Failed to get token, Status code: {response.status_code}, Response: {response.text}")



    def collect_consumption_data(self,
                                 usage_point_id,
                                 start,
                                 end):
        """
        Collect the consumption load curve from the API. This should be after that
        we obtain the access token
        Args:
            usage_point_id (str) : PRM of the Linky device
            start (str) : start data of the sampling (format YYYY-MM-DD)
            end (str) : start data of the sampling (format YYYY-MM-DD)
        """

        url = "https://gw.ext.prod-sandbox.api.enedis.fr/metering_data_clc/v5/consumption_load_curve"

        # Replace these placeholders with actual values
        params = {
            "start": start,  # e.g., "2023-10-01"
            "end": end,  # e.g., "2023-10-02"
            "usage_point_id": usage_point_id
        }

        headers = {
            "Accept": "application/json",
            "Authorization": f"Bearer {self.access_token}"  # Replace with the actual access token
        }

        # Send the GET request with parameters and headers
        response = requests.get(url, headers=headers, params=params)

        # Check for successful request
        if response.status_code == 200:
            consumption_load_curve = response.json()  # Extract the JSON response
            print("Response Data:", consumption_load_curve)
            self.consumption_load_curve = consumption_load_curve
        else:
            print(f"Failed to fetch data. Status code: {response.status_code}, Response: {response.text}")

In [15]:
credential_path = '/Users/adrienguenard/Desktop/EF/0.Input/Credential/credential_Enedis.json'

#usage_point_id = '24880057139941'
usage_point_id = '16401220101758'
start = '2024-10-10'
end = '2024-10-12'


EnedisData = EnedisAPI()

EnedisData.get_access_token(credential_path)



Access Token: {'access_token': 'eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsIng1dCI6Ik5tSTROV00wWkdWa01HUmxZakZtWXpsbE5tTTRNREUzWVdFek1UVXlZMlUxWkdabU9XVmtNUT09In0.eyJzdWIiOiJEYXRhQ29ubmVjdEBjYXJib24uc3VwZXIiLCJpc3MiOiJodHRwczpcL1wvYXBpZ2lsZS1hbGV4LWlzLXpjaS1wcm9kLmVuZWRpcy5mcjo2NDNcL29hdXRoMlwvdG9rZW4iLCJ0aWVySW5mbyI6eyJCcm9uemUiOnsidGllclF1b3RhVHlwZSI6InJlcXVlc3RDb3VudCIsInN0b3BPblF1b3RhUmVhY2giOnRydWUsInNwaWtlQXJyZXN0TGltaXQiOjUsInNwaWtlQXJyZXN0VW5pdCI6InNlYyJ9fSwia2V5dHlwZSI6IlNBTkRCT1giLCJzdWJzY3JpYmVkQVBJcyI6W3sic3Vic2NyaWJlclRlbmFudERvbWFpbiI6ImNhcmJvbi5zdXBlciIsIm5hbWUiOiJkb25uZWVzX2RlX2NvbXB0X2VsZWNfZGMiLCJjb250ZXh0IjoiXC9tZXRlcmluZ19kYXRhX2RjXC92NSIsInB1Ymxpc2hlciI6IkhZODFBNjVOIiwidmVyc2lvbiI6InY1Iiwic3Vic2NyaXB0aW9uVGllciI6IkJyb256ZSJ9LHsic3Vic2NyaWJlclRlbmFudERvbWFpbiI6ImNhcmJvbi5zdXBlciIsIm5hbWUiOiJkb25uZWVzX2RlX2NvbXB0X2VsZWNfcGxjIiwiY29udGV4dCI6IlwvbWV0ZXJpbmdfZGF0YV9wbGNcL3Y1IiwicHVibGlzaGVyIjoiSFk4MUE2NU4iLCJ2ZXJzaW9uIjoidjUiLCJzdWJzY3JpcHRpb25UaWVyIjoiQnJvbnplIn0seyJzdWJzY3Jp

In [16]:
print(EnedisData.access_token)
EnedisData.collect_consumption_data(usage_point_id, start, end)

eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsIng1dCI6Ik5tSTROV00wWkdWa01HUmxZakZtWXpsbE5tTTRNREUzWVdFek1UVXlZMlUxWkdabU9XVmtNUT09In0.eyJzdWIiOiJEYXRhQ29ubmVjdEBjYXJib24uc3VwZXIiLCJpc3MiOiJodHRwczpcL1wvYXBpZ2lsZS1hbGV4LWlzLXpjaS1wcm9kLmVuZWRpcy5mcjo2NDNcL29hdXRoMlwvdG9rZW4iLCJ0aWVySW5mbyI6eyJCcm9uemUiOnsidGllclF1b3RhVHlwZSI6InJlcXVlc3RDb3VudCIsInN0b3BPblF1b3RhUmVhY2giOnRydWUsInNwaWtlQXJyZXN0TGltaXQiOjUsInNwaWtlQXJyZXN0VW5pdCI6InNlYyJ9fSwia2V5dHlwZSI6IlNBTkRCT1giLCJzdWJzY3JpYmVkQVBJcyI6W3sic3Vic2NyaWJlclRlbmFudERvbWFpbiI6ImNhcmJvbi5zdXBlciIsIm5hbWUiOiJkb25uZWVzX2RlX2NvbXB0X2VsZWNfZGMiLCJjb250ZXh0IjoiXC9tZXRlcmluZ19kYXRhX2RjXC92NSIsInB1Ymxpc2hlciI6IkhZODFBNjVOIiwidmVyc2lvbiI6InY1Iiwic3Vic2NyaXB0aW9uVGllciI6IkJyb256ZSJ9LHsic3Vic2NyaWJlclRlbmFudERvbWFpbiI6ImNhcmJvbi5zdXBlciIsIm5hbWUiOiJkb25uZWVzX2RlX2NvbXB0X2VsZWNfcGxjIiwiY29udGV4dCI6IlwvbWV0ZXJpbmdfZGF0YV9wbGNcL3Y1IiwicHVibGlzaGVyIjoiSFk4MUE2NU4iLCJ2ZXJzaW9uIjoidjUiLCJzdWJzY3JpcHRpb25UaWVyIjoiQnJvbnplIn0seyJzdWJzY3JpYmVyVGVuYW50RG9tYWluIjoiY2FyYm9u