# Labeling Performance Analysis for ALL SITES

This document runs a all sites through a performance ratio analsys.

It uses this analyses to label the site (on that day) based on Level 1 faults.

# 1. Libraries Import

In [None]:
# ========================================================
# = Libraries import
# ========================================================

from collections import defaultdict
import matplotlib.pyplot as plt
import matplotlib
import pandas as pd
import numpy as np
import boto3
import pytz
# import math
# from zoneinfo import ZoneInfo
import datetime
# import geopy.distance
# from scipy import stats
import warnings
warnings.filterwarnings('ignore')
# import seaborn as sns
from datetime import timedelta
import os

# 2. AWS credentials

In [None]:
# ========================================================
# = AWS Credentials
# ========================================================

PROD_AWS_PROFILE = "gsesami-prod"
AWS_REGION = "ap-southeast-2"

prod_session = boto3.session.Session(profile_name=PROD_AWS_PROFILE)

prod_client = prod_session.client(
    "timestream-query", region_name=AWS_REGION)

# 3. Defining the Site ID, and dates:

## 3.1. Reading site list and monitor list

In [None]:
# Reading all sites
sites_list = pd.read_csv('./input_data/Site_List.csv')
# Reading all monitors
monitors_list = pd.read_csv('./input_data/Monitors_List.csv')

## 3.2. Defining start and end date

In [None]:
# Time period
date_start = '2020-01-01'

# Setting date_end to today
today = datetime.date.today().strftime('%Y-%m-%d')
date_end = today

## 3.3. Metrics to be read

In [None]:
# Defining metrics to be read
clear_sky = 'EnergyYield.kWh.Daily'
expected = 'Irrad.kWh.m2.Daily'
measured = 'Production.kWh.Daily'

## 3.4. Support functions

In [None]:
def get_site_id(df, sequence):
    site_id = df['source'].loc[sequence].removeprefix('SITE|')
    return site_id

def get_site_id_full(df, sequence):
    site_id_full = df['source'].loc[sequence]
    return site_id_full

def get_site_name(df, sequence):
    site_name = df['name'].loc[sequence]
    return site_name

In [None]:
def get_site_info(df, sequence):
    site_id = get_site_id(df, sequence)
    site_id_full = get_site_id_full(df, sequence)
    # site_name = get_site_name(df, sequence)
    return site_id, site_id_full

In [None]:
def performance_check(row):
    if row['Performance.perc.Daily'] >= 80:
        val = 'ok'
    elif row['Performance.perc.Daily'] >=60:
        val = 'medium'
    else:
        val = 'under'
    return val

# 4. Defining thresholds

In [None]:
# ========================================================
# = Thresholds
# ========================================================

# Cloudiness
# Define the threshold for low cloudiness days:
threshold_low_cloudiness = 80

# Parameters for checking major underperformance
window_size_major_und = 3
threshold_performance_major_und = 60

# Parameters for checking minor underperformance
window_size_minor_und = 7
threshold_performance_minor_und = 80

# Parameters for weekend and weekdays underperormance
threshold_performance_weekends_weekdays = 20

# Parameters for checking seasonal underperformance
threshold_performance_seasonal = 20

# 5. Functions to fetch data from AWS

In [None]:
def read_metric_site(date_start, date_end, measure_name, site_id):
    timeid = []
    data_values = []
    ##----------------- read the Performance  --------------##
    query = """SELECT date, max_by(measure_value::double, time) as prod_val
                FROM "DiagnoProd"."DiagnoProd"
                WHERE measure_name = '""" + measure_name + """'
                AND siteId = '""" + site_id + """'
                AND date BETWEEN '""" + date_start + """'
                AND '""" + date_end + """'
                GROUP BY date
                ORDER BY date """
    
    client = prod_client
    paginator = client.get_paginator("query")
    page_iterator = paginator.paginate(QueryString=query,)
    i = 1
    for page in page_iterator:
        # print(page)
        try:
            timeid_page = [f[0]['ScalarValue'] for f in pd.DataFrame(page["Rows"])['Data']]
            data_values_page = [f[1]['ScalarValue'] for f in pd.DataFrame(page["Rows"])['Data']]
            timeid = timeid + timeid_page
            data_values = data_values + data_values_page
        except KeyError:
            print('Page {%d} has no data available:'%i)
        i = i+1
    return timeid, data_values

In [None]:
def build_dataframe(timeid, measure_name, data_values):
    # ============== Check if there is data available for the pv system =============
    if len(timeid)!=0:
        timeid = pd.to_datetime(timeid)
        if timeid.tzinfo is None:
            print('this is not tz-aware')
            if timezone_value is not None:
                timeid = timeid.tz_localize('UTC').tz_convert(timezone_value)
                # timeid = timeid.tz_localize(timezone_list[i])
            else:
                print('no timezone in the table')
                timeid = timeid.tz_localize('UTC').tz_convert('Australia/Sydney')
                # timeid = timeid.tz_localize('Australia/Sydney')
        else:
            print('this is tz-aware')
        
        timesort = timeid.sort_values()
        data = pd.DataFrame(data={'time':timeid, measure_name: data_values})
        data.sort_values('time', inplace=True)
        data.set_index('time', inplace=True)
        data[measure_name] = data[measure_name].astype(float)
    else:
        data = pd.DataFrame(data_values, index=timeid, columns=[measure_name])
    
    return data

In [None]:
# ========================================================
# = Reading EnergyYield.kWh.Daily from AWS TimeStream
# ========================================================

def readClear(date_start, date_end, measure_name, site_id):
    timeid = []
    data_values = []
    ##----------------- read the Performance  --------------##
    query = """SELECT date, max_by(measure_value::double, time) as prod_val
                FROM "DiagnoProd"."DiagnoProd"
                WHERE measure_name = '""" + measure_name + """'
                AND siteId = '""" + site_id + """'
                AND date BETWEEN '""" + date_start + """'
                AND '""" + date_end + """'
                GROUP BY date
                ORDER BY date """
    
    client = prod_client
    paginator = client.get_paginator("query")
    page_iterator = paginator.paginate(QueryString=query,)
    i = 1
    for page in page_iterator:
        # print(page)
        try:
            timeid_page = [f[0]['ScalarValue'] for f in pd.DataFrame(page["Rows"])['Data']]
            data_values_page = [f[1]['ScalarValue'] for f in pd.DataFrame(page["Rows"])['Data']]
            timeid = timeid + timeid_page
            data_values = data_values + data_values_page
        except KeyError:
            print('Page {%d} has no data available:'%i)
        i = i+1
    return timeid, data_values

In [None]:
# ========================================================
# = Reading Irrad.kWh.m2.Daily from AWS TimeStream
# ========================================================


def readExpected(date_start, date_end, measure_name, site_id):
    timeid = []
    data_values = []
    ##----------------- read the Performance  --------------##
    query = """SELECT date, max_by(measure_value::double, time) as prod_val
                FROM "DiagnoProd"."DiagnoProd"
                WHERE measure_name = '""" + measure_name + """'
                AND siteId = '""" + site_id + """'
                AND date BETWEEN '""" + date_start + """'
                AND '""" + date_end + """'
                GROUP BY date
                ORDER BY date """
    
    client = prod_client
    paginator = client.get_paginator("query")
    page_iterator = paginator.paginate(QueryString=query,)
    i = 1
    for page in page_iterator:
        # print(page)
        try:
            timeid_page = [f[0]['ScalarValue'] for f in pd.DataFrame(page["Rows"])['Data']]
            data_values_page = [f[1]['ScalarValue'] for f in pd.DataFrame(page["Rows"])['Data']]
            timeid = timeid + timeid_page
            data_values = data_values + data_values_page
        except KeyError:
            print('Page {%d} has no data available:'%i)
        i = i+1
    return timeid, data_values

In [None]:
# ========================================================
# = Reading Production.kWh.Daily from AWS TimeStream
# ========================================================

def readMeasured(date_start, date_end, measure_name, site_id):
    timeid = []
    data_values = []
    ##----------------- read the Performance  --------------##
    query = """SELECT date, max_by(measure_value::double, time) as prod_val
                FROM "DiagnoProd"."DiagnoProd"
                WHERE measure_name = '""" + measure_name + """'
                AND siteId = '""" + site_id + """'
                AND date BETWEEN '""" + date_start + """'
                AND '""" + date_end + """'
                GROUP BY date
                ORDER BY date """
    
    client = prod_client
    paginator = client.get_paginator("query")
    page_iterator = paginator.paginate(QueryString=query,)
    i = 1
    for page in page_iterator:
        # print(page)
        try:
            timeid_page = [f[0]['ScalarValue'] for f in pd.DataFrame(page["Rows"])['Data']]
            data_values_page = [f[1]['ScalarValue'] for f in pd.DataFrame(page["Rows"])['Data']]
            timeid = timeid + timeid_page
            data_values = data_values + data_values_page
        except KeyError:
            print('Page {%d} has no data available:'%i)
        i = i+1
    return timeid, data_values

# 6. Functions to label level 1 faults based on performance

In [None]:
def check_major_underperformance(df, window_size, threshold):
    # create a boolean mask to identify the rows where the 'Performance.perc.Daily' column is below 60 (or threshold)
    # we'll use this to filter out the dataframe
    mask = df['Performance.perc.Daily'] < threshold

    # create a new column 'majpr_underperformance' with the default value 'FALSE'
    df['major_underperformance'] = 'FALSE'

    # label the rows where the 'Performance.perc.Daily' column has dropped below 60 for 3 consecutive days or more
    # 1st, loop through the df:
    for i in range((window_size-1), len(df)):
        # check if the mask apply for N consecutive days (window_size):
        if all(mask.iloc[i - j] for j in range(window_size)):
            # Label it as such:
            # Mark all days in the sequence as 'Minor underperformance'
            ## Using window_size to do this retroactively:
            for j in range(window_size):
                df.loc[df.index[i-j], 'major_underperformance'] = 'Major underperformance'

    return df

In [None]:
def check_minor_underperformance(df, window_size, threshold):
    # create a boolean mask to identify the rows where the 'Performance.perc.Daily' column is below the threshold
    mask = df['Performance.perc.Daily'] < threshold

    # create a new column 'minor_underperformance' with the default value 'FALSE'
    df['minor_underperformance'] = 'FALSE'

    # label the rows where the 'Performance.perc.Daily' column has dropped below the threshold for the specified number of consecutive days or more
    # 1st, loop through the df:
    for i in range((window_size-1), len(df)):
        # check if the mask apply for N consecutive days (window_size):        
        if mask.iloc[i] and all(mask.iloc[i-window_size+1:i+1]):
            # Label it as such:
            # Mark all days in the sequence as 'Minor underperformance'
            ## Using window_size to do this retroactively:
            for j in range(window_size):
                df.loc[df.index[i-j], 'minor_underperformance'] = 'Minor underperformance'

    return df

In [None]:
# Checking if:
# The performance of days in the current week (weekdays or weekends) 
# falls below the performance of the opposite type of days (weekends or weekdays) 
# from the previous week by a certain threshold. 
# If so, it labels them as underperforming. 


def check_week_performance(df, threshold, performance_col='Performance.perc.Daily'):
    # create new columns for week and week performance labels
    # using datetime to add week
    df['Week'] = df.index.to_period('W').astype(str)
    # initialising weekday average
    df['Prev Weekday Avg'] = 0
    # initialising weekend average
    df['Prev Weekend Avg'] = 0
    # initialising default week performance value (FALSE)
    # this might be changed to either 'weekday underperformance' or 'weekend underperformance' (or remain unchanged)
    df['week_underperformance'] = 'FALSE'

    # calculating the weekday and weekend averages for each week
    unique_weeks = df['Week'].unique()
    for i, week in enumerate(unique_weeks):
        if i == 0:  # skip the first week as there is no previous week to compare
            continue

        prev_week = unique_weeks[i - 1]
        prev_week_df = df[df['Week'] == prev_week]

        # for weekdays:
        prev_weekday_avg = prev_week_df.loc[~prev_week_df['is_weekend'], performance_col].mean()
        # for weekends:
        prev_weekend_avg = prev_week_df.loc[prev_week_df['is_weekend'], performance_col].mean()

        # label the rows where the performance is below the average for their respective day types against the previous week's daytypes

        # For each day in the current week, the function checks:
        ## If it's a weekday and its performance is below the previous week's weekend average minus the threshold, it labels it as 'Weekday underperformance'
        ## If it's a weekend and its performance is below the previous week's weekday average minus the threshold, it labels it as 'Weekend underperformance'.
        df.loc[(df['Week'] == week) & (~df['is_weekend']) & (df[performance_col] < prev_weekend_avg - threshold), 'week_underperformance'] = 'Weekday underperformance'
        df.loc[(df['Week'] == week) & (df['is_weekend']) & (df[performance_col] < prev_weekday_avg - threshold), 'week_underperformance'] = 'Weekend underperformance'

        # store the previous week's averages in the dataframe
        df.loc[df['Week'] == week, 'Prev Weekday Avg'] = prev_weekday_avg
        df.loc[df['Week'] == week, 'Prev Weekend Avg'] = prev_weekend_avg

    return df

In [None]:
def check_seasonal_performance(df, threshold):
    # Initialize the 'seasonal_fault' column with False
    df['seasonal_underperformance'] = 'FALSE'
    
    # We want to check Seasonal underperfromance only after we have 1 year worth of data.
    # We'll get check if the dataset has at leat 1 year worth of data:
    ## Get the date of the oldest record in the dataset
    oldest_date = df.index.min()
    ## Get the current date and make it timezone-aware
    current_date = df.index.max()
    # Check if the oldest record is more than a year old
    if (current_date - oldest_date) < timedelta(days=365):
        print("Oldest data is less than a year old. The function needs data older than one year.")
        return df
    

    
    # Get the average for each season
    season_avg = df.groupby('season')['Performance.perc.Daily'].mean()
    
    # Create a new column 'season_avg_performance' with the average value for the corresponding season
    df['season_avg_performance'] = df['season'].apply(lambda x: season_avg[x])
    
    # Calculate the sum of the averages for the other seasons
    other_season_sum = {
        'summer': (season_avg['autumn'] + season_avg['winter'] + season_avg['spring']) / 3,
        'winter': (season_avg['autumn'] + season_avg['summer'] + season_avg['spring']) / 3
    }
    
    # Check for underperformance and update the 'seasonal_fault' column accordingly
    for index, row in df.iterrows():
        season = row['season']
        performance = row['Performance.perc.Daily']

        # Only start labeling after a year has passed
        if (index - oldest_date) < timedelta(days=365):
            continue

        if season == 'summer' and performance < other_season_sum['summer'] - threshold:
            df.loc[index, 'seasonal_underperformance'] = 'summer underperformance'
        elif season == 'winter' and performance < other_season_sum['winter'] - threshold:
            df.loc[index, 'seasonal_underperformance'] = 'winter underperformance'
    
    return df

# 7. Labelling

## 7.1. Support functions

In [None]:
# Function to create the array for the new column
def create_array(row):
    return [value for value in row[4:] if value != 'FALSE']

In [None]:
# Dictionary to map the month to the season
# Note that this has been done for Australia (Southern hemisphere)

seasons = {1: 'summer', 
        2: 'summer', 
        3: 'autumn', 
        4: 'autumn', 
        5: 'autumn', 
        6: 'winter', 
        7: 'winter', 
        8: 'winter', 
        9: 'spring', 
        10: 'spring', 
        11: 'spring', 
        12: 'summer'
        }

## 7.2. Running the labelling function

In [None]:
for i in range(len(sites_list)):
    try:
        site_id, site_id_full = get_site_info(sites_list, i)
        # Checking
        print("Checking tz-aware for: ", site_id_full)

        # Checking timezone
        timezone_value = 'Australia/Sydney'
        timezone_value = sites_list[sites_list['source'] == site_id_full].iloc[0]['timezone']

        time_starttz = pytz.timezone('UTC').localize(datetime.datetime.strptime(date_start, '%Y-%m-%d'))
        time_endtz = pytz.timezone('UTC').localize(datetime.datetime.strptime(date_end, '%Y-%m-%d'))

        # ========================================================
        # = Getting Clear sky 
        # ========================================================

        measure_name = 'EnergyYield.kWh.Daily'
        timeid, data_values = read_metric_site(date_start, date_end, measure_name, site_id)
        df_clear = build_dataframe(timeid, measure_name, data_values)
        df_clear['EnergyYield.kWh.Daily'] = df_clear['EnergyYield.kWh.Daily'].astype(float)

        # ========================================================
        # = Getting expected generation
        # ========================================================

        measure_name = 'Irrad.kWh.m2.Daily'
        timeid, data_values = read_metric_site(date_start, date_end, measure_name, site_id)
        df_expected = build_dataframe(timeid, measure_name, data_values)
        # Fixing it as a float:
        df_expected['Irrad.kWh.m2.Daily'] = df_expected['Irrad.kWh.m2.Daily'].astype(float)

        # ========================================================
        # = Merging clear skies and expected
        # ========================================================

        def merge_clear_expe(df1, df2):
            df_merged = df1.join(df2)
            df_merged['expected_over_clear'] =  (df_merged['Irrad.kWh.m2.Daily'] / df_merged['EnergyYield.kWh.Daily'] * 100).round(0)
            df_merged['date'] =  df_merged.index
            return df_merged

        df_merged = merge_clear_expe(df_clear, df_expected)

        # ========================================================
        # = Getting low cloudiness days
        # ========================================================

        df_merged.loc[df_merged['expected_over_clear'] >= threshold_low_cloudiness, 'is_low_clousdiness_day'] = True 
        df_merged.loc[df_merged['expected_over_clear'] < threshold_low_cloudiness, 'is_low_clousdiness_day'] = False

        
        # ==================================================
        # = Reading Production.kWh.Daily from AWS TimeStream
        # ==================================================

        measure_name = 'Production.kWh.Daily'
        timeid, data_values = read_metric_site(date_start, date_end, measure_name, site_id)
        df_production = build_dataframe(timeid, measure_name, data_values)
        # Fixing it as float
        df_production['Production.kWh.Daily'] = df_production['Production.kWh.Daily'].astype(float)

        # ========================================================
        # = Merging it and getting a % of performance daily
        # ========================================================

        df_performance = df_production.join(df_merged)
        df_performance['Performance.perc.Daily'] = (df_performance['Production.kWh.Daily'] / df_performance['Irrad.kWh.m2.Daily'] * 100).round(0)
        
        df_performance['performancelabel'] = df_performance.apply(performance_check, axis=1)

        # extract the day of the week using the weekday() method from dataframe
        df_performance['day_of_week'] = df_performance['date'].apply(lambda x: x.weekday())

        # create a binary indicator variable for weekends vs weekdays
        df_performance['is_weekend'] = df_performance['day_of_week'].apply(lambda x: x in [5, 6])

        # ========================================================
        # = Adding weekend and seasonal info
        # ========================================================

        # extract the day of the week using the weekday() method from dataframe
        df_performance['day_of_week'] = df_performance['date'].apply(lambda x: x.weekday())

        # create a binary indicator variable for weekends vs weekdays
        df_performance['is_weekend'] = df_performance['day_of_week'].apply(lambda x: x in [5, 6])

        # Get the month using the datetime.month attribute
        df_performance['month'] = df_performance['date'].dt.month

        # Getting seasons using the dictionary created before
        df_performance['season'] = df_performance['month'].apply(lambda x: seasons[x])

        # ========================================================
        # = Keeping only low cloudiness
        # ========================================================

        df_LC = df_performance[df_performance['is_low_clousdiness_day'] == True]

        print("Checking level 1 performance-related faults for: " + str(site_id_full))

        # ========================================================
        # = Labeling level 1 faults based on performance
        # ========================================================
                
        check_major_underperformance(df_LC, window_size_major_und, threshold_performance_major_und)
        check_minor_underperformance(df_LC, window_size_minor_und, threshold_performance_minor_und)
        check_week_performance(df_LC, threshold_performance_weekends_weekdays)
        check_seasonal_performance(df_LC, threshold_performance_seasonal)

        # ========================================================
        # = Cleaning up the dataframe
        # ========================================================

        # Keep only the performance and label columns
        level1_sites_labels_df = df_LC[[
            'EnergyYield.kWh.Daily','Irrad.kWh.m2.Daily','Production.kWh.Daily','Performance.perc.Daily',
            'major_underperformance', 'minor_underperformance', 'week_underperformance', 'seasonal_underperformance'
            ]]
        # Create the new 'level1-labels-site' column
        level1_sites_labels_df['level1-labels-site'] = level1_sites_labels_df.apply(create_array, axis=1)

        # Drop rows with empty arrays in 'level1-labels-site'
        level1_sites_labels_df = level1_sites_labels_df.loc[level1_sites_labels_df['level1-labels-site'].apply(len) > 0]

        level1_sites_labels_df.to_csv('./1B_site_results/individual_sites/' + str(site_id) + '.csv')

        print(f'Analysis concluded for: {site_id_full}')

    except Exception as e:
        
        print(e)

# 8. Outputting a single CSV with all the faulty sites

## 8.1. Cleaning

In [None]:
# Combining last rows:
input_folder = './1B_site_results/individual_sites/'

# List all files in the folder
file_list = os.listdir(input_folder)
csv_files = [file for file in file_list if file.endswith('.csv')]

# Read the last row of each CSV file, add the 'site_id' column, and store it in a list
last_rows = []
for file in csv_files:
    file_path = os.path.join(input_folder, file)
    df = pd.read_csv(file_path)

    # If the DataFrame is empty, skip this file
    if df.empty:
        continue
    
    # Extract site_id from the file name (assuming site_id is the entire file name without the .csv extension)
    site_id = os.path.splitext(file)[0]

    last_row = df.iloc[-1]
    last_row['site_id'] = site_id
    last_rows.append(last_row)

# Create a DataFrame from the list of last rows
combined_df = pd.DataFrame(last_rows)

In [None]:
# Filtering dataframe just for TODAY:
combined_df['time'] = pd.to_datetime(combined_df['time'], utc=True)
combined_df['time'] = combined_df['time'].dt.tz_convert('Australia/Sydney')
combined_df['time'] = combined_df['time'].dt.strftime('%Y-%m-%d')

df_today_faults = combined_df[combined_df['time'] == today]
df_today_faults.to_csv(f'./1B_site_results/aggregate/{today}_today_sites_level1faults.csv', index=False)

df_today_faults

In [None]:
# Filtering dataframe for a week ago:

today_full = datetime.date.today()
week_ago = today_full - datetime.timedelta(days=7)

combined_df['time'] = pd.to_datetime(combined_df['time'], utc=True)
combined_df['time'] = combined_df['time'].dt.tz_convert('Australia/Sydney')
combined_df['time'] = combined_df['time'].dt.date

# Make sure that 'today' is a date object, not a string.
today = today_full  

df_week_faults = combined_df[(combined_df['time'] >= week_ago) & (combined_df['time'] <= today)]

# Ensure the directory exists
output_dir = './1B_site_results/aggregate/'
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Save the DataFrame to CSV
df_week_faults.to_csv(f'{output_dir}{today}_lastWeekAggregate_sites_level1faults.csv', index=False)

df_week_faults

In [None]:
'''# Filtering dataframe for a week ago:

today_full = datetime.date.today()
week_ago = today_full - datetime.timedelta(days=7)

combined_df['time'] = pd.to_datetime(combined_df['time'], utc=True)
combined_df['time'] = combined_df['time'].dt.tz_convert('Australia/Sydney')
combined_df['time'] = combined_df['time'].dt.date

# Make sure that 'today' is a date object, not a string.
today = today_full  

df_week_faults = combined_df[(combined_df['time'] >= week_ago) & (combined_df['time'] <= today)]

# Ensure the directory exists
output_dir = './1B_site_results/aggregate/'
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Save the DataFrame to CSV
df_week_faults.to_csv(f'{output_dir}{today}_lastWeekAggregate_sites_level1faults.csv', index=False)

df_week_faults'''