# Calculating Carbon Footprint

## Preparing the Notebook 

We will first import all of the necessary libraries.

In [1]:
import requests
import pandas as pd
import numpy as np 
from datetime import datetime
import pytz

We will now set up our jupyter notebook to make queries from VictoriaMetrics.

In [3]:
token = 'FBmZiWNJxiiwGfHzNvX436VdWqYtRLv6hb7FfiMyjgV9FAhEktpwJM2GHPJKCif6'
proxies = {
    'http': 'socks5://localhost:49152',
    'https': 'socks5://localhost:49152'
}
url = 'http://128.232.227.113:8427/api/v1/query_range'
headers = {
    'Authorization': f'Bearer {token}',
    'Accept': 'application/json'
}

## Loading in the Data

We have saved the processed Slurm data in csv files, which we will now load into DataFrames.

In [4]:
# We will now read the .csv file containing the Slurm data for June into a DataFrame
sSlurmDataPath = 'dfSacctFinal.csv'
dfSacct = pd.read_csv(sSlurmDataPath, index_col=0, parse_dates=['Start', 'End'], infer_datetime_format=True)

# We will now read the .csv file containing the extended Slurm data for June into a DataFrame. 
sSlurmExtendedPath = 'dfSacctExtended.csv'
dfSacctExtended = pd.read_csv(sSlurmExtendedPath, index_col=0, parse_dates=['Start', 'End'], infer_datetime_format=True)

In [5]:
dfSacctExtended[dfSacctExtended.index.value_counts() == 1].iloc[800000]

  dfSacctExtended[dfSacctExtended.index.value_counts() == 1].iloc[800000]


JobName                 5d3c85e3f26d6cfe24edd5043806e4ee1573ad96de5c1c...
Partition                                                          cclake
ElapsedRaw                                                            853
Account                 e14914f6ebb1765cfacdab633295f51cbc9a39bc1057f8...
State                                                           COMPLETED
NodeList                                                        cpu-p-399
User                    3f325dc5f73f566e06e2f7f1cb447d78fc3c2aeeab7860...
QOS                                                                  cpu1
Start                                                 2023-07-24 10:22:37
End                                                   2023-07-24 10:36:50
Timelimit                                                        01:00:00
Suspended                                                        00:00:00
ExclusiveCPU                                                        False
ExclusiveOverlapping                  

## Ensuring the Time is in UTC

We must first ensure that all times are in UTC so that we access the correct power readings from Victoria Metrics. 

We will first create a function that converts a series of times from local time to UTC.

We will create a function to convert all times in the DataFrame to UTC. 

In [6]:
def dfToUTC(df):
    """
    Returns a pd DataFrame containing columns for the start and end times in UTC.

    Parameters
    ----------
    df: pdDataFrame 
        The pd DataFrame containing all of the job data. This DataFrame must contain 
        a 'Start' and 'End' column of pd DateTime64 objects. 
    
    Returns
    ----------
    df: pdDataFrame
        The pd DataFrame that was passed in as a parameter with two new columns:
        'StartUTC' and 'EndUTC' of pd DateTime64 objects, which contain the original 
        start and end times in UTC rather than local time. 
    """

    df['UTCStart'] = df['Start'].dt.tz_localize('Europe/London')
    df['UTCEnd'] = df['End'].dt.tz_localize('Europe/London')

    df['UTCStart'] = df['UTCStart'].dt.tz_convert(pytz.utc)
    df['UTCEnd'] = df['UTCEnd'].dt.tz_convert(pytz.utc)

    return df 
    

We will now apply our function to the *dfSacctExtended* DataFrame in order to get the times in UTC rather than local time. 

In [7]:
dfSacctExtended = dfToUTC(dfSacctExtended)

## Querying Victoria Metrics

Rather than constantly querying Victoria Metrics, we will query Victoria Metrics once at the start and store all of the power data for all of the nodes across the entire time period in a .csv file called *VMPowerJune.csv*.

We will now create a function that checks whether the file *VMPowerJune.csv* exists. If it exists, the function will return a DataFrame containing the data in the .csv file. If it does not exist, the function will query Victoria Metrics and will create the file *VMPowerJune.csv* to store the data. The function will then return a DataFrame containig the power data. 

In [8]:
def getPowerDataMonth(dfJobs):
    """
    Returns a DataFrame containing the Victoria Metrics power data for the specified month.

    Checks whether the .csv file containing the Victoria Metrics power data for the specified 
    month (with the format 'VMPower<Month>.csv') exists. If the file exists the DataFrame 
    containing the power data is returned. If the file does not exist, Victoria Metrics is 
    queried and the .csv file is created; the function then returns the DataFrame containing
    the power data. If there is a problem while querying Victoria Metrics, the function will 
    return None.

    Parameters 
    ----------
    dfJobs: pdDataFrame
        The DataFrame containing all of the job data that is to be processed. This DataFrame 
        must have already been processed to contain UTCStart and UTCEnd columns. This can be 
        done using the dfToUTC() function. 
    
    Returns
    ----------
    dfPower: pdDataFrame
        The DataFrame containing all of the power data. 
    None: NoneType
        Returned if there is a problem while querying Victoria Metrics.

    """

    # We start by obtaining the file name from the input string
    sMonth = dfJobs['UTCStart'].dt.month_name(locale='English').value_counts().index[0]
    sFileName = 'VMPower' + sMonth + '.csv'

    # We will now open the file and check whether it is empty. If the file is 
    # empty we will query Victoria Metrics and obtain the data. If the file is 
    # not empty we will load in the data and return a DataFrame. 
    fPowerData = open(sFileName, 'a+')
    fPowerData.seek(0)

    if len(fPowerData.read()) == 0:
        # We will now obtain the start and end dates and times for our query.
        start = dfJobs.iloc[0]['UTCStart'].strftime("%Y-%m-%dT%H:%M:%SZ")
        end = dfJobs.iloc[-1]['UTCEnd'].strftime("%Y-%m-%dT%H:%M:%SZ")

        # We will now query Victoria Metrics to obtain the power data
        data = {
            'query': f'amperageProbeReading{{amperageProbeLocationName="System Board Pwr Consumption"}}',
            'start': start,
            'end' : end,
            'step': '30s'
        }

        response = requests.put(
            url, 
            data=data,
            proxies=proxies,
            headers=headers,
            timeout=10
        )  

        # We will now ensure that the request was successfull 
        if (response.status_code != 200):
            return None

        # We will now create a DataFrame containing the power data
        dNodePowers = {}
        lTicks = []

        for dNodeData in response.json()['data']['result']:
            sNode = dNodeData['metric']['alias']
            lData = dNodeData['values']

            dNodePowers[sNode] = lData

            for lDataPoint in lData:
                lTicks.append(lDataPoint[0])

        lTicks.sort()
        setTicksOrdered = set(lTicks)

        dfJobPower = pd.DataFrame(
            response.json()['data']['result'][0]['values'],
            columns=[
                'Timestamp',
                response.json()['data']['result'][0]['metric']['alias']
            ]
        )

    fPowerData.close()

    return setTicksOrdered



In [None]:
getPowerDataMonth(dfSacctExtended)

## Obtaining the Power Reading 

We first need to floor the start times and ceil the end times to the nearest 30s to ensure that we do not miss out any of the Victoria metrics data (as the data is sampled every 30s).

We will now write a function to get the power reading for a given job from VictoriaMetrics.

In [140]:
def getPowerJob(jobID, df):
    """
    Returns a dictionary of the total power usage of the job for each 30 minute time period. 

    Returns a dictionary of the total power usage of the job (across each node) for each 30 
    minute time period of the job duration. The keys of the dictionary are a string of the 
    format "YYYY-MM-DD PERIOD" where PERIOD represents the 30 minute time period for that day 
    as an integer from 1-48. The values of the dictionary are total power consumptions of the 
    job, in Wh, for each 30 minute period.

    Parameters 
    ----------
    jobID: string
        A string representing the Job ID for the specified job. 
    df: pdDataFrame
        The pandas DataFrame containing all the job data.

    Returns
    ----------
    dIntervalPowers: dictionary 
        A dictionary of the total power usage of the job, in Wh, for each 30 minute interval. 

    """

    jobID = int(jobID)
    
    if df.index.value_counts().loc[jobID] <= 1:
        sNode = df.loc[jobID, 'NodeList']
        return getPowerNode(jobID, df, sNode)
    
    lNodeList = list(df.loc[jobID, 'NodeList'])

    dTotalWattHours = {}

    for sNode in lNodeList:
        dWattHours = getPowerNode(jobID, df, sNode)

        lPeriods = list(dWattHours.keys())

        for sPeriod in lPeriods:
            if dWattHours[sPeriod] is None:
                return None
            
            if sPeriod in dTotalWattHours.keys():
                dTotalWattHours[sPeriod] += dWattHours[sPeriod]
            else:
                dTotalWattHours[sPeriod] = dWattHours[sPeriod]

    return dTotalWattHours

def getPowerNode(jobID, df, sNode):
    """
    Returns a dictionary of the power usage of a single node for each 30 minute time period.

    Returns a dictionary of the power usage of the job for a single node for each 30 minute
    time period of the job duration. The keys of the dictionary are a string of the format 
    "YYYY-MM-DD PERIOD" where PERIOD represents the 30 minute time period for that day as an 
    integer from 1-48. The values of the dictionary are total power consumptions of the job, 
    in Wh, for each 30 minute period.

    Parameters 
    ----------
    jobID: string
        A string representing the Job ID for the specified job. 
    df: pdDataFrame
        The pandas DataFrame containing all the job data.
    sNode: string
        The node name that the job is running on. 

    Returns
    ----------
    dIntervalPowers: dictionary 
        A dictionary of the power usage of the job for a single node, in Wh, for each 30 
        minute interval. 

    """

    jobID = int(jobID)

    start = df[df['NodeList'] == sNode].loc[jobID, 'UTCStart'].strftime("%Y-%m-%dT%H:%M:%SZ")
    end = df[df['NodeList'] == sNode].loc[jobID, 'UTCEnd'].strftime("%Y-%m-%dT%H:%M:%SZ")
 
    data = {
        'query': f'amperageProbeReading{{alias="{sNode}", amperageProbeLocationName="System Board Pwr Consumption"}}',
        'start': start,
        'end' : end,
        'step': '30s'
    }

    response = requests.put(
        url, 
        data=data,
        proxies=proxies,
        headers=headers,
        timeout=10
    )  


    if len(response.json()['data']['result']) < 1:
        return None

    dfJobPower = pd.DataFrame(
        response.json()['data']['result'][0]['values'],
        columns=[
            'Timestamp',
            response.json()['data']['result'][0]['metric']['alias']
        ]
    )

    # The following is used as test data

    lPowerTest = []

    for tTick in list(range(1690355610, 1690360500, 30)):
        if tTick < (1690355610 + 1800):
            lPowerTest.append([tTick, 300])
        elif (1690355610 + 1800) < tTick and tTick < (1690355610 + 3089):
            lPowerTest.append([tTick, 150])
        elif tTick > (1690355610 + 3089):
            lPowerTest.append([tTick, 300])

    if jobID == 0:
        dfJobPower = pd.DataFrame(
            lPowerTest,
            columns=['Timestamp', 'Test']
        )

    dfJobPower['Timestamp'] = pd.to_datetime(dfJobPower['Timestamp'], unit='s', utc=True)
    dfJobPower['Date'] = dfJobPower['Timestamp'].dt.strftime('%Y-%m-%d')
    dfJobPower['Date'] = dfJobPower['Date'].str.cat((((dfJobPower['Timestamp']).dt.hour * 2) + ((dfJobPower['Timestamp']).dt.minute//30) + 1).astype(str), sep=" ")
    dfJobPower.set_index('Timestamp', inplace=True)

    dIntervalPowers = {}

    print(dfJobPower)

    for interval in dfJobPower['Date'].unique():
        bIntervalMask = dfJobPower['Date'] == interval

        dfIntervalPower = dfJobPower[bIntervalMask]
        dfIntervalPower[sNode] = pd.to_numeric(dfIntervalPower[sNode])
        dfIntervalPower = dfIntervalPower.resample('30S').interpolate()

        iJoules = np.trapz(dfIntervalPower[sNode].astype(int), dx=30)
        iWattHour = iJoules/3600

        dIntervalPowers[interval] = iWattHour

    return dIntervalPowers


In order to get the carbon intensity from the carbonintensity.org API, we need to know the time periods that we are intersted in. These time periods are the half hour settlement periods between 1-48 for each day. We will now write a function to return the time period(s) that we are interested in. 

In [27]:
def getTimePeriods(start, end):
    """
    Returns a dictionary containing the 30 minute time periods for each day within an interval. 

    Returns a dictionary whose keys are the dates, with the format 'YYYY-MM-DD', within the 
    given interval and values are lists of the 30 minute time intervals (which are integers
    from 1-48) that are included in the given time period for each day.

    Parameters
    ----------
    sStart: string
        A string representing the start date and time of the interval. 
    sEnd: string
        A string representing the end date and time of the interval. 

    Returns 
    ----------
    dPeriods: dictionary 
        A dictionary containing the 30 minute intervals included in each day of the specified
        time period. 
    
    """

    # We first convert the string variables representing the start and end times to DataTime objects. 

    # We then create variables for the start and end dates. 
    startDate = start.date()
    endDate = end.date()

    iStartSegment = (start.hour * 2) + (start.minute//30) + 1
    iEndSegment = (end.hour * 2) + (end.minute//30) + 1

    iDaySeparation = (endDate - startDate).days

    dPeriods = {startDate.strftime("%Y-%m-%d") : list(range(iStartSegment, 49)), endDate.strftime("%Y-%m-%d") : list(range(1, iEndSegment + 1))}

    if iDaySeparation == 0:
        return {startDate.strftime("%Y-%m-%d") : list(range(iStartSegment, iEndSegment + 1))}
    else:
        for iCount in range(iDaySeparation):
            day = startDate + pd.Timedelta(iCount, 'days')
            
            if day != startDate:
                dPeriods[day.strftime("%Y-%m-%d")] = list(range(1, 49))
            

    return dPeriods

We will now create a function that returns a list of all the carbon intensities that we are interested in.

In [28]:
def getCarbonIntensities(start, end):
    """ 
    Returns a dictionary of the carbon intensities for each 30 minute time period in the interval.

    Returns a dictionary whose keys are the dates, in the format 'YYYY-MM-DD', within the given
    interval and whose values are lists of the carbon intensities, in gCO2/kWh, for each 30 minute 
    time period for each day within the interval. 

    Parameters
    ----------
    sStart: string
        A string representing the start date and time of the interval. 
    sEnd: string
        A string representing the end date and time of the interval. 

    Returns
    ----------
    dIntensities: dictionary 
        A dictionary containing the carbon intensity, in gCO2/kWh, for each 30 minute interval within
        the specified time range. 
    """
    dPeriods = getTimePeriods(start, end)

    lDates = list(dPeriods.keys())
    lDates.sort()

    dIntensities = {}

    for date in lDates:
        for period in dPeriods[date]:
            intensity = requests.get(f'https://api.carbonintensity.org.uk/intensity/date/{date}/{period}')
            dIntensities[str(date) + " " + str(period)] = (intensity.json()['data'][0]['intensity']['actual'])

    return dIntensities
    

We will now write a function to calculate the carbon footprint given the Watt Hours.

In [111]:
def getCarbonFootprint(jobID, df):
    """
    Returns the carbon footprint, in g, of the specified job. 

    Parameters 
    ----------
    jobID: string
        The job ID of the job whose carbon footprint is calculated.
    df: pdDataFrame
        The pandas DataFrame containing all the job data.

    Returns
    ----------
    iTotalCarbon: integer
        The total carbon released, in g, due to the energy consumption of the job. 
    """

    jobID = int(jobID)

    start = df.loc[jobID, 'UTCStart'].unique()[0]
    end = df.loc[jobID, 'UTCEnd'].unique()[0]
    
    dIntervalPowers = getPowerJob(jobID, df)
    dIntervalIntensities = getCarbonIntensities(start, end)

    iTotalCarbon = 0

    for interval in dIntervalPowers.keys():
        iWattHour = dIntervalPowers[interval]
        iKWattHours = iWattHour/1000

        iIntensity = dIntervalIntensities[interval]

        iCarbon = iIntensity * iKWattHours

        iTotalCarbon += iCarbon

    return iTotalCarbon

In [112]:
print(getCarbonFootprint('24177267', dfSacctExtended))


15.13365


In [113]:
print(response.json())

NameError: name 'response' is not defined

In order to test the carbon footprint code above, I am going to create some test power reading data which will have the following structure: 

    - A power of 300 W for the first 1800 seconds.
    - A power of 150 W for the next 1289 seconds. 
    - A power of 300 W for the next 1800 seconds.

The total energy consumed by this test job should be 735 450 J or 204.292 wh.

In [144]:
dfTest = dfSacctExtended[dfSacctExtended.index.value_counts() == 1].iloc[-1].to_frame().transpose().reset_index(drop=True)

dfTest['End'] = pd.to_datetime("2023-07-27 09:35:05")
dfTest['UTCEnd'] = pd.to_datetime("2023-07-27 08:35:05+00:00")
dfTest['NodeList'] = 'Test'

  dfTest = dfSacctExtended[dfSacctExtended.index.value_counts() == 1].iloc[-1].to_frame().transpose().reset_index(drop=True)


In [135]:
dfTest

Unnamed: 0,JobName,Partition,ElapsedRaw,Account,State,NodeList,User,QOS,Start,End,Timelimit,Suspended,ExclusiveCPU,ExclusiveOverlapping,Exclusive,SharedSameUser,UTCStart,UTCEnd
0,171f8e926c82dd83fc75053ec9b110f092aa32b41d5c98...,ampere,29,99fdc1f587a9423c1abc5a1ce22053628b94a5226d3c0f...,COMPLETED,cpu-p-399,dd68b7c728069b005e5dac9c3e9d59a7379b1347fa4e6f...,gpu2,2023-07-27 08:13:36,2023-07-27 09:35:05,02:00:00,00:00:00,False,False,False,True,2023-07-27 07:13:36+00:00,2023-07-27 08:35:05+00:00


In [130]:
lPowerTest = []

for tTick in list(range(1690355610, 1690360500, 30)):
    if tTick < (1690355610 + 1800):
        lPowerTest.append([tTick, 300])
    elif (1690355610 + 1800) < tTick and tTick < (1690355610 + 3089):
        lPowerTest.append([tTick, 150])
    elif tTick > (1690355610 + 3089):
        lPowerTest.append([tTick, 300])


In [145]:
getPowerJob(0, dfTest)

ConnectionError: SOCKSHTTPConnectionPool(host='128.232.227.113', port=8427): Max retries exceeded with url: /api/v1/query_range (Caused by NewConnectionError('<urllib3.contrib.socks.SOCKSConnection object at 0x0000022D6046A4C0>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it'))