In [1]:
from datetime import datetime, timedelta
import calendar

In [2]:
def is_dst(dt, timezone):

    if timezone in ['PJM', 'ERCOT', 'SPP', 'NYISO', 'WECC', 'CAISO']:
        # DST starts on the second Sunday in March and ends on the first Sunday in November
        dst_start = datetime(dt.year, 3, (14 - datetime(dt.year, 3, 1).weekday()) % 7 + 1)
        dst_end = datetime(dt.year, 11, (7 - datetime(dt.year, 11, 1).weekday()) % 7 + 1)
        return dst_start <= dt < dst_end
        
    return False

In [3]:
def calculate_holidays(year):

    holidays = ['01-01', '07-04', '12-25']  # New Year's Day, Independence Day, Christmas Day
    
    # Memorial Day: Last Monday in May
    last_monday_may = max(week[0] for week in calendar.monthcalendar(year, 5) if week[0] != 0)
    holidays.append(f"05-{last_monday_may:02d}")
    
    # Labor Day: First Monday in September
    first_monday_september = min(week[0] for week in calendar.monthcalendar(year, 9) if week[0] != 0)
    holidays.append(f"09-{first_monday_september:02d}")
    
    # Thanksgiving: Fourth Thursday in November
    fourth_thursday_november = [week[3] for week in calendar.monthcalendar(year, 11) if week[3] != 0][3]
    holidays.append(f"11-{fourth_thursday_november:02d}")
    
    return holidays

In [4]:
def parse_period(period):

    if '-' in period:  # Daily
        start_date = datetime.strptime(period, "%Y-%m-%d")
        end_date = start_date
    elif len(period) == 7:  # Monthly
        start_date = datetime.strptime(period, "%Y%b")
        end_date = start_date.replace(day=calendar.monthrange(start_date.year, start_date.month)[1])
    elif period[-2] == 'Q':  # Quarterly
        year = int(period[:4])
        quarter = int(period[5])
        start_month = (quarter - 1) * 3 + 1
        start_date = datetime(year, start_month, 1)
        end_month = start_month + 2
        end_date = datetime(year, end_month, calendar.monthrange(year, end_month)[1])
    elif period[-1] == 'A':  # Annually
        year = int(period[:4])
        start_date = datetime(year, 1, 1)
        end_date = datetime(year, 12, 31)
    else:
        raise ValueError("Invalid period format")
    
    return start_date, end_date

In [5]:
def get_hours(iso, peak_type, period):
    # Define peak type hours for each ISO
    peak_hours = {
        'PJM': {
            'onpeak': list(range(8, 24)),  
            'offpeak': list(range(1, 8)) + list(range(24, 25)),  
            'flat': list(range(1, 25)),  
            '2x16H': list(range(8, 24)),  
            '7x8': list(range(1, 8)) + list(range(24, 25))  
        },
        'MISO': {
            'onpeak': list(range(7, 23)),  
            'offpeak': list(range(1, 7)) + list(range(23, 25)),  
            'flat': list(range(1, 25)),  
            '2x16H': list(range(7, 23)),  
            '7x8': list(range(1, 7)) + list(range(23, 25))  
        },
        'ERCOT': {
            'onpeak': list(range(7, 23)), 
            'offpeak': list(range(1, 7)) + list(range(23, 25)), 
            'flat': list(range(1, 25)),  
            '2x16H': list(range(7, 23)),  
            '7x8': list(range(1, 7)) + list(range(23, 25))  
        },
        'SPP': {
            'onpeak': list(range(7, 23)),  
            'offpeak': list(range(1, 7)) + list(range(23, 25)),  
            'flat': list(range(1, 25)),  
            '2x16H': list(range(7, 23)),  
            '7x8': list(range(1, 7)) + list(range(23, 25)) 
        },
        'NYISO': {
            'onpeak': list(range(8, 24)),  
            'offpeak': list(range(1, 8)) + list(range(24, 25)), 
            'flat': list(range(1, 25)),  
            '2x16H': list(range(8, 24)),  
            '7x8': list(range(1, 8)) + list(range(24, 25))  
        },
        'WECC': {
            'onpeak': list(range(7, 23)), 
            'offpeak': list(range(1, 7)) + list(range(23, 25)), 
            'flat': list(range(1, 25)), 
            '2x16H': list(range(7, 23)), 
            '7x8': list(range(1, 7)) + list(range(23, 25))  
        },
        'CAISO': {
            'onpeak': list(range(7, 23)),  
            'offpeak': list(range(1, 7)) + list(range(23, 25)),  
            'flat': list(range(1, 25)),  
            '2x16H': list(range(7, 23)),  
            '7x8': list(range(1, 7)) + list(range(23, 25)) 
        }
    }

    # Parse the period
    start_date, end_date = parse_period(period)

    nerc_holidays = calculate_holidays(start_date.year)

    num_hours = 0
    current_date = start_date

    while current_date <= end_date:
        
        is_weekend = current_date.weekday() >= 5
        is_holiday = current_date.strftime("%m-%d") in nerc_holidays
        in_dst = is_dst(current_date, iso)
        
        if iso in peak_hours:

            hours = peak_hours[iso][peak_type]

            if peak_type == 'onpeak' and (is_weekend or is_holiday):
                if iso == 'CAISO' and current_date.weekday() == 5:
                    hours = peak_hours[iso][peak_type]
                else:
                    current_date += timedelta(days=1)
                    continue

            elif peak_type == 'offpeak' and (is_weekend or is_holiday):
                if iso == 'CAISO' and current_date.weekday() == 5:
                    hours = peak_hours[iso][peak_type]
                else: 
                    hours = peak_hours[iso]['flat']

            elif peak_type == '2x16H' and not (is_weekend or is_holiday):
                current_date += timedelta(days=1)
                continue
                
            elif peak_type not in ['onpeak', 'offpeak', 'flat', '2x16H', '7x8']:
                raise ValueError("Peak type not recognized")

            # Handle DST adjustments
            if in_dst:
                if current_date.month == 3 and current_date.weekday() == 6 and 8 <= current_date.day <= 14:
                    # Start of DST (skip hour)
                    hours = [h for h in hours if h != 3]
                elif current_date.month == 11 and current_date.weekday() == 6 and 1 <= current_date.day <= 7:
                    # End of DST (repeat hour)
                    if 2 in hours:
                        hours = hours + [2]  # Repeating 2 AM
            
        else:
            raise ValueError("ISO not recognized")
        
        num_hours += len(hours)
        current_date += timedelta(days=1)

    return {
        'iso': iso,
        'peak_type': peak_type.upper(),
        'startdate': start_date.strftime("%Y-%m-%d"),
        'enddate': end_date.strftime("%Y-%m-%d"),
        'num_hours': num_hours
    }

In [6]:
results = get_hours("WECC", "onpeak", "2025A")
results

{'iso': 'WECC',
 'peak_type': 'ONPEAK',
 'startdate': '2025-01-01',
 'enddate': '2025-12-31',
 'num_hours': 4080}