In [16]:
import datetime as dt
import sqlalchemy as sa
import os
import sys
import urllib.parse as url
import pandas as pd
import pandas_market_calendars as mcal

In [17]:
# Generate the date range of 3 years back and 6 years forward for the calendar
start_date, end_date = get_dates_for_years(3, 6)

In [19]:
def get_market_calendar(exchange, s_date, e_date, timezone):

    """
    Fetch the market calendar schedule for a given exchange.

    Args:
        exchange (str): The exchange name (e.g., 'NYSE').
        s_date (str): Start date in 'YYYY-MM-DD' format.
        e_date (str): End date in 'YYYY-MM-DD' format.
        timezone (str): Timezone for the calendar.

    Returns:
        DataFrame: DataFrame with columns Date, Open_Time, and Close_Time.
    """
    
    try:
        # Get the calendar for the specified exchange
        nyse = mcal.get_calendar(exchange)
        # Fetch the schedule for the date range
        df_tmp = nyse.schedule(start_date=s_date, end_date=e_date, tz=timezone)
        
        # Extract date, open time, and close time from market_open and market_close
        df_tmp['Date'] = pd.to_datetime(df_tmp['market_open']).dt.date
        df_tmp['Open_Time'] = pd.to_datetime(df_tmp['market_open']).dt.time
        df_tmp['Close_Time'] = pd.to_datetime(df_tmp['market_close']).dt.time
        
        # Select only relevant columns
        df_tmp = df_tmp[['Date', 'Open_Time', 'Close_Time']]
        
        return df_tmp
    
    except Exception as ex:
        print(f"Error fetching calendar data: {ex}")
        raise

In [20]:
# Fetch the calendar data for the NYSE
df_dates = get_calendar('NYSE', start_date, end_date, 'America/New_York')

In [21]:
# Setup connection parameters
comp = os.environ["COMPUTERNAME"]  # Get the computer name from environment variables
dbase = "Financial_Securities"     # Define the name of the database

username = os.getlogin()
external_folder_path = 'C:/Users/' + username + '/Documents/Projects/Financial_Securities/Custom_Python_Functions'
sys.path.append(external_folder_path)
from custom_python_functions import create_connection, clear_table, get_dates_for_years


# Create a connection to the database
s, e = create_connection(comp, dbase, "", "")
s1 = s()  # Instantiate a session object


In [22]:
Base = sa.orm.declarative_base()

class Market_Calendar(Base):
    
    """
    SQLAlchemy ORM class representing the 'Market_Calendar' table in the 'Equities' schema.

    Attributes:
        __tablename__ (str): The name of the table.
        __table_args__ (dict): Additional arguments for the table, including schema name.
        Country (Column): Country or exchange code.
        Date (Column): The date of the market schedule.
        Open_Time (Column): Market open time.
        Close_Time (Column): Market close time.
    """    
    
    __tablename__ = 'Market_Calendar'
    __table_args__ = {'schema': 'Equities'}
    Country = sa.Column(sa.String, primary_key=True)
    Date = sa.Column(sa.Date, primary_key=True)
    Open_Time = sa.Column('Open_Time', sa.Time)
    Close_Time = sa.Column('Close_Time', sa.Time)

In [23]:
country = "United States" # Set the country to be United States

# Process each row in the DataFrame
try:
    for index, row in df_dates.iterrows():
        
        # Query the 'Market_Calendar' table to find records where the 'Country" column matches country variable and the 
        # 'Date' column matches the value in the DataFrame's 'Date' row
        q1 = s1.query(Market_Calendar).filter(Market_Calendar.Country == country, Market_Calendar.Date == row['Date'])
        
        # Check if any records were found with the specified 'Country' and 'Date'
        if (q1.count() >= 1):
            # If one or more records are found, get the first matching record
            q1 = s1.query(Market_Calendar).filter(Market_Calendar.Country == country, Market_Calendar.Date == row['Date']).first()
            # Update existing record
            q1.Open_Time = row['Open_Time']
            q1.Close_Time = row['Close_Time']
        else:
            # Create new record
            q1 = Market_Calendar(
                Country=country,
                Date=row['Date'],
                Open_Time=row['Open_Time'],
                Close_Time=row['Close_Time']
            )
            s1.add(q1)

# Handle SQLAlchemy errors
except sa.exc.SQLAlchemyError as e:
    message = f"Issue with updating Market_Calendar database table for Date: {row['Date']}. Error: {e}"
    print(message)
    s1.rollback()  # Rollback changes on error
    raise

# Commit the changes to the database
s1.commit()

print("Database data load is complete")


In [24]:
# Execute SQL query to count the number of records in the Market_Calendar table
sql_stat = """SELECT COUNT(*) FROM [Financial_Securities].[Equities].[Market_Calendar]"""
try:
    result = e.execute(sql_stat)
    cnt_recs = result.scalar()
    
# Handle errors querying the Market_Calendar table
except sa.exc.SQLAlchemyError as e:
    print(f"Issue querying Market_Calendar database table for count! Error: {e}")
    
# Compare the number of records in the database with the number of records in the DataFrame
if cnt_recs < len(df_dates):
    print(f"Only {cnt_recs} of {len(df_dates)} records were loaded into Market_Calendar database table!")
else:
    print(f"All {cnt_recs} records were loaded into Market_Calendar database table!")  

All 2511 records were loaded into Market_Calendar database table!


In [None]:
s1.close()  # Close the session