In [None]:
import os
import holidays
import pandas as pd

from config import LLM_MODEL_SUBNAME, EXP_YEARLY_PATH, YEARLY_FILE_TEMPLATE, \
    WEEKEND_BY_COUNTRY, COUNTRY_CODE, COUNTRY_CODE_TO_NAME, COUNTRIES, \
    START_TIME, END_TIME, TIME_STEP

print(f"LLM Model Selected in Config.py is {LLM_MODEL_SUBNAME}")

# Gregorian Date Dataframe Based on Duration

### Functions

In [2]:
def generate_time_dataframe(start_datetime, end_datetime, time_step='h', output_filename='time_data'):
    """
    Generates a DataFrame with a time range and additional columns for temporal and holiday information.
    
    Parameters:
    - start_datetime: Start date and time
    - end_datetime: End date and time
    - time_step: The frequency of time steps ('h' for hours, 'MS' for month start, '2MS' for every 2 months, etc.)
    - output_filename: The name of the CSV file to save the DataFrame to
    """
    
    # Ensure start_datetime is less than end_datetime
    if pd.to_datetime(start_datetime) >= pd.to_datetime(end_datetime):
        raise ValueError("start_datetime must be less than end_datetime")
   
    # Generate datetime range using pandas date_range for variable frequency
    datetimes = pd.date_range(start=start_datetime, end=end_datetime, freq=time_step)
    
    # Create DataFrame
    df = pd.DataFrame({'datetime': datetimes})
    
    # Extract date components for additional columns
    df['year'] = df['datetime'].dt.year
    df['quarter'] = df['datetime'].dt.quarter
    df['month'] = df['datetime'].dt.month
    df['week'] = df['datetime'].dt.isocalendar().week
    df['day'] = df['datetime'].dt.day
    df['day_name'] = df['datetime'].dt.strftime('%a').str.upper()
    
    df['hour'] = df['datetime'].dt.hour
    # df['minute'] = df['datetime'].dt.minute
    # df['second'] = df['datetime'].dt.second

    # Dictionary to map day names to numbers (e.g., MON=1, TUE=2, etc.)
    day_name_to_number = {
        'MON': 1, 'TUE': 2, 'WED': 3, 'THU': 4, 'FRI': 5, 'SAT': 6, 'SUN': 7
    }

    # Add the day number column
    df['day_number'] = df['day_name'].map(day_name_to_number)

    def get_season(day, month, hemisphere='northern'):
        if (month == 12 and day >= 21) or (month in [1, 2]) or (month == 3 and day < 20):
            return 'Winter' if hemisphere == 'northern' else 'Summer'
        elif (month == 3 and day >= 20) or (month in [4, 5]) or (month == 6 and day < 21):
            return 'Spring' if hemisphere == 'northern' else 'Autumn'
        elif (month == 6 and day >= 21) or (month in [7, 8]) or (month == 9 and day < 23):
            return 'Summer' if hemisphere == 'northern' else 'Winter'
        elif (month == 9 and day >= 23) or (month in [10, 11]) or (month == 12 and day < 21):
            return 'Autumn' if hemisphere == 'northern' else 'Spring'
        else:
            raise ValueError("Invalid date or hemisphere. Ensure 'day' and 'month' are valid.")
    
    # Apply the function to determine the season
    df['season_northern'] = df.apply(lambda row: get_season(row['day'], row['month'],'northern'), axis=1)
    df['season_southern'] = df.apply(lambda row: get_season(row['day'], row['month'],'southern'), axis=1)

    # Save to CSV without modifying the datetime column
    df.to_csv(f'{output_filename}_base.csv', index=False, date_format='%Y-%m-%d %H:%M:%S')
    print(f"CSV file '{output_filename}_base.csv' has been created.")

    return df


def show_holidays_by_day(df):
    # Filter rows with holidays
    holidays_df = df[df['is_holiday'] != 0].copy()  # Create a copy to avoid the warning
    
    # Add a column for the date without time
    holidays_df.loc[:, 'date_only'] = holidays_df['datetime'].dt.date
    
    # Group by 'date_only' and take the first occurrence of the holiday per day
    holidays_per_day = holidays_df.groupby('date_only').first().reset_index()
    
    # Drop the temporary 'date_only' column after grouping
    holidays_per_day.drop(columns=['date_only'], inplace=True)
    
    return holidays_per_day


def display_temporal_stats(df, type='All'):
    if type == 'Gr':
        # Gregorian Calendar temporal columns
        temporal_columns = ['year', 'quarter', 'month', 'week', 'day', 'day_name', 'is_holiday']
    elif type == 'T':
        # Gregorian Calendar temporal columns
        temporal_columns = ['hour', 'minute', 'second']
    elif type == 'All':
        # All temporal columns
        temporal_columns = [col for col in df.columns if col not in ['datetime', 'holiday_desc']]
    else:
        raise ValueError("Invalid calendar type specified. Choose 'Gr' for Gregorian, 'T' for time, or 'All' for all columns.")

    # Loop through each temporal column to calculate and display stats
    for col in temporal_columns:
        value_counts = df[col].value_counts().sort_index()

        # Create a DataFrame for displaying unique values and their counts
        stats_df = pd.DataFrame({
            'Values': value_counts.index,
            'Counts': value_counts.values
        })

        # Display the table for each column
        print(f"\nStats for {col}:")
        display(stats_df.T)


def get_weekend_for_country_and_date(country_code, weekend_days_by_country, date):
    """
    This function returns the weekend days based on the given country code and date.
    It checks the weekend rules for the country and applies the correct weekend based on the date.
    If the country is not found in the dictionary, it defaults to ['SAT', 'SUN'] as the weekend.
    """
    # Retrieve weekend rules for the given country code
    weekend_rules = weekend_days_by_country.get(country_code.upper())
    
    # If no weekend rules are found for the given country, use the default ['SAT', 'SUN']
    if not weekend_rules:
        return ['SAT', 'SUN']
    
    # Find the correct weekend based on the date ranges
    for rule in weekend_rules:
        if rule['start'] <= date <= rule['end']:
            return rule['weekend']
    
    # Fallback to SAT-SUN if no specific date range matches
    return ['SAT', 'SUN']


def check_holidays(greg_date, country_code, language='en_US'):
    """
    Checks if a given date is a holiday for the specified country.
    """
    try:
        country_holidays = getattr(holidays, country_code.upper())(years=greg_date.year, language=language)
        if greg_date in country_holidays:
            return country_holidays[greg_date]
    except AttributeError:
        return "No Holiday"
    return "No Holiday"


def process_country_data(df, country_code, weekend_days_by_country, language='en_US', output_filename='time_data'):
    """
    Processes the DataFrame for a specific country by adding weekend and holiday information.
    Returns a master DataFrame and a country-specific DataFrame.
    """
    df = df.copy()
    
    if country_code == 'BR':
        df['season'] = df['season_southern']
    else:
        df['season'] = df['season_northern']
    
    df['is_weekend'] = df.apply(
        lambda row: 1 if row['day_name'] in get_weekend_for_country_and_date(country_code, weekend_days_by_country, 
                                                                             row['datetime']) else 0, axis=1
                                                                             )
    
    df['date_only'] = df['datetime'].dt.date
    
    # Optimize the holiday check by checking only unique dates
    unique_dates = df['date_only'].drop_duplicates()
    holiday_desc_dict = {date: check_holidays(pd.Timestamp(date), country_code) for date in unique_dates}
    holiday_status_dict = {date: (0 if desc == "No Holiday" else 1) for date, desc in holiday_desc_dict.items()}
    
    # Map the holiday description and status back to the full DataFrame
    df['is_holiday'] = df['date_only'].map(holiday_status_dict)
    df['holiday_desc'] = df['date_only'].map(holiday_desc_dict)

    df.drop(columns=['date_only', 'season_southern', 'season_northern'], inplace=True)
    
    # Save to CSV without modifying the datetime column
    df.to_csv(f'{output_filename}_{country_code}.csv', index=False, date_format='%Y-%m-%d %H:%M:%S')
    print(110*'-')
    print(f"CSV file '{output_filename}_{country_code}.csv' has been created.")

    return df

## Hour Time Step

In [None]:
if not os.path.exists(EXP_YEARLY_PATH):
    os.makedirs(EXP_YEARLY_PATH)

print(110*'-')
df = generate_time_dataframe(start_datetime=START_TIME, end_datetime=END_TIME, time_step=TIME_STEP, output_filename=YEARLY_FILE_TEMPLATE)
# df

In [None]:
country_processed = None

for country_code in COUNTRY_CODE:
    if COUNTRY_CODE_TO_NAME.get(country_code) in COUNTRIES:
        country_processed = COUNTRY_CODE_TO_NAME[country_code]
        print(f"\nProcessing data for {country_processed} ({country_code})")
        # Display the resulting DataFrame with holidays and weekends for each country
        specific_df = process_country_data(df, country_code=country_code, weekend_days_by_country=WEEKEND_BY_COUNTRY, output_filename=YEARLY_FILE_TEMPLATE)
        # display(specific_df)

        holidays_per_day_df = show_holidays_by_day(specific_df)
        # display(holidays_per_day_df)

        # Display stats based on selected temporal columns 'Gr', 'T', 'All'
        # display_temporal_stats(specific_df, type='All')
        # break

## Sample (Last Output)

In [None]:
if country_processed is None:
    raise ValueError("No valid country found in the COUNTRY_CODE_TO_NAME mapping.")

print(f"\n[Sample] Country-Specific DataFrame for {country_processed}:")
display(specific_df)
display(holidays_per_day_df)