In [1]:
import os
import csv
import datetime
import re

def extract_storm_info(dat_file_path):
    """
    Extract storm name, start date, and end date from the .dat file's content.
    - The third column has the date in the format YYYYMMDDHH.
    - The storm name is in the 28th column.
    - The storm name returned is the last one in the dictionary of names.
    - If no name is found, label as "INVEST" or "DISTURBANCE".
    """
    storm_name = None
    storm_start_date = None
    storm_end_date = None
    dates = []
    storm_name_dict = {}  # Dictionary to hold all unique values from column 28

    with open(dat_file_path, 'r') as file:
        for line in file:
            columns = line.split()

            # Process the 28th column to capture storm names (if the file has enough columns)
            if len(columns) >= 28:
                potential_name = columns[27].strip().rstrip(',')  # Remove trailing commas
                # Add potential_name to the dictionary, count occurrences
                if potential_name:
                    storm_name_dict[potential_name] = storm_name_dict.get(potential_name, 0) + 1

            # Parse the date from the third column (format: YYYYMMDDHH)
            if len(columns) >= 3:
                date_str = columns[2].strip()
                date_str = re.sub(r'[^0-9]', '', date_str)

                if len(date_str) == 10:  # Ensure it's the correct length (YYYYMMDDHH)
                    try:
                        # Convert the string to a datetime object using the correct format
                        date = datetime.datetime.strptime(date_str, "%Y%m%d%H")
                        dates.append(date)
                    except ValueError:
                        print(f"Error parsing date: {date_str} in file {dat_file_path}")

        # Apply logic to determine the storm name from the dictionary
        print(f"Storm Name Dictionary: {storm_name_dict}")  # Debugging output

        # Remove irrelevant entries such as empty commas
        storm_name_dict = {k: v for k, v in storm_name_dict.items() if k not in [',']}

        # Determine the last storm name in the dictionary
        if storm_name_dict:
            storm_name = list(storm_name_dict.keys())[-1]  # Get the last key (storm name) in the dictionary
        else:
            storm_name = "DISTURBANCE"  # If no valid names, return DISTURBANCE

        # Deduce the start and end dates
        if dates:
            storm_start_date = min(dates).strftime("%Y-%m-%d %H:%M:%S")
            storm_end_date = max(dates).strftime("%Y-%m-%d %H:%M:%S")

    return storm_name, storm_start_date, storm_end_date

def process_dat_files(input_path, output_csv):
    """
    Process all .dat files in the input path, extract required information, 
    and save it in a .csv file.
    """
    data_rows = []
    
    # Loop over all files in the input path
    for file_name in os.listdir(input_path):
        if file_name.endswith('.dat'):
            # Extract Basin and Storm Number from the file name
            basin = file_name[1:3]  # Two letters after "a"
            storm_number = file_name[3:5]  # Two numbers after basin
            
            # Full path to the .dat file
            dat_file_path = os.path.join(input_path, file_name)
            
            # Extract Storm Name, Start Date, and End Date from the .dat file
            storm_name, storm_start_date, storm_end_date = extract_storm_info(dat_file_path)
            
            # Append the extracted information as a row
            data_rows.append([basin, storm_number, storm_name, storm_start_date, storm_end_date])
    
    # Write the collected data to a CSV file
    with open(output_csv, 'w', newline='') as csv_file:
        writer = csv.writer(csv_file)
        # Write the header
        writer.writerow(['Basin', 'Storm Number', 'Storm Name', 'Storm Start Date', 'Storm End Date'])
        # Write the data rows
        writer.writerows(data_rows)

    print(f"CSV file {output_csv} has been created successfully.")

In [64]:
# Example usage:
input_directory = "./test_forecast_data/"  # Replace with the actual path to the .dat files
output_csv_file = "storm_adeck_directory.csv"
process_dat_files(input_directory, output_csv_file)

Storm Name Dictionary: {'INVEST': 12, 'ONE': 6, 'ARLENE': 33}
Storm Name Dictionary: {'INVEST': 21, 'ONE': 12, 'ALBERTO': 39}
Storm Name Dictionary: {'INVEST': 12, 'ANDREA': 42}
Storm Name Dictionary: {'INVEST': 6, 'ARTHUR': 18}
Storm Name Dictionary: {'INVEST': 24, 'ONE': 15}
Storm Name Dictionary: {'INVEST': 54, 'ONE': 3, 'ALEX': 72}
Storm Name Dictionary: {'INVEST': 28, 'ARLENE': 70}
Storm Name Dictionary: {'INVEST': 14, 'ALBERTO': 77}
Storm Name Dictionary: {'INVEST': 56, 'ANDREA': 84}
Storm Name Dictionary: {'INVEST': 70, 'ONE': 21, 'ARTHUR': 112}
Storm Name Dictionary: {}
Storm Name Dictionary: {'INVEST': 21, 'FOUR': 15, 'DEAN': 102}
Storm Name Dictionary: {'INVEST': 87, 'DOLLY': 54}
Storm Name Dictionary: {'INVEST': 3, 'FOUR': 6, 'CLAUDETTE': 9}
Storm Name Dictionary: {'INVEST': 18, 'FOUR': 9, 'COLIN': 66}
Storm Name Dictionary: {'INVEST': 91, 'FOUR': 7, 'DON': 70}
Storm Name Dictionary: {'INVEST': 35, 'FIVE': 14, 'ERIN': 98}
Storm Name Dictionary: {}
Storm Name Dictionary: {'IN

In [66]:
import os
import csv
import datetime
import re

def extract_storm_info_by_carq(dat_file_path):
    """
    Extract storm name, start date, and end date from the .dat file's content, prioritizing rows where column 5 == "CARQ"
    and picking the storm name from the row closest to the storm's end date.
    """
    storm_name = None
    storm_start_date = None
    storm_end_date = None
    dates = []
    carq_rows = []

    with open(dat_file_path, 'r') as file:
        for line in file:
            columns = line.split()

            # Parse the date from the third column (format: YYYYMMDDHH)
            if len(columns) >= 3:
                date_str = columns[2].strip()
                date_str = re.sub(r'[^0-9]', '', date_str)

                if len(date_str) == 10:  # Ensure it's the correct length (YYYYMMDDHH)
                    try:
                        date = datetime.datetime.strptime(date_str, "%Y%m%d%H")
                        dates.append(date)

                        # Check if the 5th column == "CARQ"
                        if len(columns) >= 28 and columns[4].strip() == "CARQ":
                            # Store the row's date, the 28th column (storm name), and the row's date
                            carq_rows.append((date, columns[27].strip().rstrip(',')))

                    except ValueError:
                        print(f"Error parsing date: {date_str} in file {dat_file_path}")

        # If there are no valid dates or CARQ rows, return "DISTURBANCE"
        if not dates or not carq_rows:
            return "DISTURBANCE", None, None

        # Deduce the start and end dates
        storm_start_date = min(dates).strftime("%Y-%m-%d %H:%M:%S")
        storm_end_date = max(dates)

        # Find the row where CARQ is closest to the end date
        closest_carq_row = min(carq_rows, key=lambda x: abs(x[0] - storm_end_date))

        # The storm name is the 28th column in the closest CARQ row
        storm_name = closest_carq_row[1]

        # Convert the end date to string format
        storm_end_date = storm_end_date.strftime("%Y-%m-%d %H:%M:%S")

    return storm_name, storm_start_date, storm_end_date

def process_dat_files(input_path, output_csv):
    """
    Process all .dat files in the input path, extract required information, 
    and save it in a .csv file.
    """
    data_rows = []

    # Loop over all files in the input path
    for file_name in os.listdir(input_path):
        if file_name.endswith('.dat'):
            # Extract Basin and Storm Number from the file name
            basin = file_name[1:3]  # Two letters after "a"
            storm_number = file_name[3:5]  # Two numbers after basin

            # Full path to the .dat file
            dat_file_path = os.path.join(input_path, file_name)

            # Extract Storm Name, Start Date, and End Date from the .dat file
            storm_name, storm_start_date, storm_end_date = extract_storm_info_by_carq(dat_file_path)

            # Append the extracted information as a row
            data_rows.append([basin, storm_number, storm_name, storm_start_date, storm_end_date])

    # Write the collected data to a CSV file
    with open(output_csv, 'w', newline='') as csv_file:
        writer = csv.writer(csv_file)
        # Write the header
        writer.writerow(['Basin', 'Storm Number', 'Storm Name', 'Storm Start Date', 'Storm End Date'])
        # Write the data rows
        writer.writerows(data_rows)

    print(f"CSV file {output_csv} has been created successfully.")

In [67]:
# Example usage:
input_directory = "./test_forecast_data/"  # Replace with the actual path to the .dat files
output_csv_file = "storm_adeck_directory_carq.csv"
process_dat_files(input_directory, output_csv_file)

CSV file storm_adeck_directory_carq.csv has been created successfully.


In [None]:
import os
import csv
import datetime
import re
import pandas as pd

def extract_storm_info(dat_file_path):
    """
    Extract storm name, start date, and end date from the .dat file's content.
    """
    storm_name = None
    storm_start_date = None
    storm_end_date = None
    dates = []
    storm_name_dict = {}

    # List of known basin codes
    basin_codes = ["AL", "EP", "CP", "WP", "IO", "SH"]

    # Set of number words up to 20
    number_words_set = {
        'ONE', 'TWO', 'THREE', 'FOUR', 'FIVE', 'SIX', 'SEVEN', 'EIGHT', 'NINE', 'TEN',
        'ELEVEN', 'TWELVE', 'THIRTEEN', 'FOURTEEN', 'FIFTEEN', 'SIXTEEN', 'SEVENTEEN',
        'EIGHTEEN', 'NINETEEN', 'TWENTY'
    }

    with open(dat_file_path, 'r') as file:
        for line in file:
            columns = line.strip().split()

            if len(columns) >= 28:
                potential_name = columns[27].strip().rstrip(',').upper()

                # Add potential_name to the dictionary
                if potential_name:
                    storm_name_dict[potential_name] = storm_name_dict.get(potential_name, 0) + 1

            # Parse dates as before
            if len(columns) >= 3:
                date_str = columns[2].strip()
                date_str = re.sub(r'[^0-9]', '', date_str)
                if len(date_str) == 10:
                    try:
                        date = datetime.strptime(date_str, "%Y%m%d%H")
                        dates.append(date)
                    except ValueError:
                        print(f"Error parsing date: {date_str} in file {dat_file_path}")

    # Step 1: Remove basin codes from the dictionary keys
    storm_name_dict = {k: v for k, v in storm_name_dict.items() if k not in basin_codes}

    # Step 2: Remove entries that contain digits
    storm_name_dict = {k: v for k, v in storm_name_dict.items() if not any(char.isdigit() for char in k)}

    # Step 3: Remove entries that are in number_words
    storm_name_dict = {k: v for k, v in storm_name_dict.items() if k not in number_words_set}

    # Get list of storm names
    storm_names = list(storm_name_dict.keys())

    # Determine the storm name based on the cleaned list
    if len(storm_names) == 0:
        storm_name = 'DISTURBANCE'
    elif storm_names == ['INVEST']:
        storm_name = 'INVEST'
    else:
        # Remove 'INVEST' if other names are present
        if 'INVEST' in storm_names:
            storm_names.remove('INVEST')

        if len(storm_names) == 1:
            storm_name = storm_names[0]
        else:
            # Multiple names remain
            # Attempt to strip basin codes from ends
            cleaned_names = set()
            for name in storm_names:
                cleaned_name = name
                for basin_code in basin_codes:
                    if name.endswith(basin_code):
                        name_without_basin = name[:-len(basin_code)].strip()
                        # Ensure the name is non-empty and alphabetical
                        if name_without_basin.isalpha():
                            cleaned_name = name_without_basin
                            break  # Stop checking after removing basin code
                cleaned_names.add(cleaned_name)

            if len(cleaned_names) == 1:
                storm_name = cleaned_names.pop()
            else:
                # Names don't match, take the last one
                storm_name = storm_names[-1]

  # Deduce the start and end dates and extract the year
    if dates:
        storm_start_datetime = min(dates)
        storm_end_datetime = max(dates)
        storm_start_date = storm_start_datetime.strftime("%Y-%m-%d %H:%M:%S")
        storm_end_date = storm_end_datetime.strftime("%Y-%m-%d %H:%M:%S")
        storm_year = storm_start_datetime.year  # Extract the year
    else:
        storm_start_date = None
        storm_end_date = None
        storm_year = None

    return storm_name.title(), storm_start_date, storm_end_date, storm_year

def download_and_update_storm_data(file_url, save_path, csv_path):
    """
    Download the .dat file from the given URL, extract storm information,
    and update the CSV file with the new or updated storm data.
    """
    # Download the data
    download_file(file_url, save_path)
    
    # Process the .dat file to extract storm info
    storm_name, storm_start_date, storm_end_date, storm_year = extract_storm_info(save_path)
    
    # Extract Basin and Storm Number from the file name
    file_name = os.path.basename(save_path)
    basin = file_name[1:3]  # Adjust based on actual file naming convention
    storm_number = file_name[3:5]  # Adjust based on actual file naming convention

    # Read existing CSV or create a new DataFrame
    if os.path.exists(csv_path):
        df = pd.read_csv(csv_path)
    else:
        df = pd.DataFrame(columns=['Basin', 'Storm Number', 'Year', 'Storm Name', 'Storm Start Date', 'Storm End Date'])
    
    # Ensure storm_year is not None
    if storm_year is None:
        # If storm_year is not available, you may choose to skip this storm or handle it accordingly
        print(f"Storm year not found for {file_name}. Skipping update.")
        return
    
    # Convert data types for consistency
    storm_number = str(storm_number)
    basin = str(basin)
    storm_year = int(storm_year)

    # Check if the storm already exists in the CSV using Basin, Storm Number, and Year
    storm_exists = ((df['Basin'] == basin) & (df['Storm Number'] == storm_number) & (df['Year'] == storm_year)).any()
    
    if storm_exists:
        # Update the existing storm information
        df.loc[(df['Basin'] == basin) & (df['Storm Number'] == storm_number) & (df['Year'] == storm_year),
               ['Storm Name', 'Storm Start Date', 'Storm End Date']] = [storm_name, storm_start_date, storm_end_date]
        print(f"Updated existing storm data for {storm_name} ({basin}{storm_number}, {storm_year}) in {csv_path}.")
    else:
        # Append the new storm information
        new_row = {
            'Basin': basin,
            'Storm Number': storm_number,
            'Year': storm_year,
            'Storm Name': storm_name,
            'Storm Start Date': storm_start_date,
            'Storm End Date': storm_end_date
        }
        df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)
        print(f"Added new storm data for {storm_name} ({basin}{storm_number}, {storm_year}) to {csv_path}.")
    
    # Save the updated CSV
    df.to_csv(csv_path, index=False)
    print(f"Storm data has been updated in {csv_path}.")



TEST MAP MAKING

In [2]:
import pandas as pd
import plotly.graph_objects as go
import datetime
import os

def create_forecast_map(dat_file_path, selected_forecast_datetime):
    """
    Reads the .dat file and creates a Plotly map of the forecast tracks.

    Parameters:
    - dat_file_path: str, path to the .dat file.
    - selected_forecast_datetime: str, the forecast date and time in YYYYMMDDHH format.

    Returns:
    - fig: Plotly figure object.
    """
    # Check if the file exists
    if not os.path.exists(dat_file_path):
        print(f"File not found: {dat_file_path}")
        return None

    # Define column names based on ATCF A-deck format
    columns = [
        'Basin', 'CycloneNumber', 'DateTime', 'ModelNumber', 'ModelName', 'ForecastHour',
        'Latitude', 'Longitude', 'MaxWindSpeed', 'MinPressure',
        'WindRad1', 'WindRad2', 'WindRad3', 'WindRad4', 'StormType',
        'Quadrant1', 'Quadrant2', 'Quadrant3', 'Quadrant4',
        'Radius1', 'Radius2', 'Radius3', 'Radius4',
        'StormName', 'Unused1', 'Unused2'
    ]
    data = []

    # Read the .dat file line by line
    try:
        with open(dat_file_path, 'r') as file:
            for line_number, line in enumerate(file, start=1):
                # Remove leading/trailing whitespace
                line = line.strip()
                # Skip empty lines
                if not line:
                    continue
                # Split the line into fields
                fields = line.split(',')
                # Strip whitespace from each field
                fields = [field.strip() for field in fields]
                # Handle variable number of fields
                # Create a dictionary for this line
                record = {}
                num_fields = len(fields)
                for i in range(min(num_fields, len(columns))):
                    record[columns[i]] = fields[i]
                # If there are extra fields, add them as 'ExtraField1', 'ExtraField2', etc.
                if num_fields > len(columns):
                    for j in range(len(columns), num_fields):
                        record[f'ExtraField{j - len(columns) + 1}'] = fields[j]
                data.append(record)
    except Exception as e:
        print(f"Error reading .dat file at line {line_number}: {e}")
        return None

    # Convert the list of dictionaries to a DataFrame
    df = pd.DataFrame(data)

    # Filter data based on selected forecast date and time
    df = df[df['DateTime'] == selected_forecast_datetime]

    if df.empty:
        print("No data found for the selected forecast date and time.")
        return None

    # Convert columns to appropriate data types
    df['ForecastHour'] = pd.to_numeric(df['ForecastHour'], errors='coerce')
    df['MaxWindSpeed'] = pd.to_numeric(df['MaxWindSpeed'], errors='coerce')
    df['MaxWindSpeed_mph'] = df['MaxWindSpeed'] * 1.15078  # Knots to miles per hour
    df['Latitude'] = df['Latitude'].apply(parse_lat_lon)
    df['Longitude'] = df['Longitude'].apply(parse_lat_lon)
    df['DateTime'] = pd.to_datetime(df['DateTime'], format='%Y%m%d%H', errors='coerce')

    # Drop rows with missing data
    before_dropna = len(df)
    df.dropna(subset=['Latitude', 'Longitude', 'MaxWindSpeed', 'DateTime', 'ForecastHour'], inplace=True)
    after_dropna = len(df)
    dropped_na = before_dropna - after_dropna
    if dropped_na > 0:
        print(f"Dropped {dropped_na} rows due to missing data.")

    if df.empty:
        print("All data has been dropped after removing rows with missing values.")
        return None

    # Remove any rows where Latitude or Longitude is exactly 0
    before_zero_filter = len(df)
    df = df[(df['Latitude'] != 0) & (df['Longitude'] != 0)]
    after_zero_filter = len(df)
    removed_zero = before_zero_filter - after_zero_filter
    if removed_zero > 0:
        print(f"Removed {removed_zero} rows with Latitude or Longitude equal to 0.")

    if df.empty:
        print("All data has been dropped after removing rows with Latitude or Longitude equal to 0.")
        return None

    # Create 'ValidTime' as DateTime + ForecastHour
    df['ValidTime'] = df['DateTime'] + pd.to_timedelta(df['ForecastHour'], unit='h')

    # Categorize intensity into storm categories
    df['Category'] = df['MaxWindSpeed'].apply(wind_speed_to_category)

    # Define category colors in the desired order
    category_colors = {
        'Tropical Depression': 'green',
        'Tropical Storm': 'blue',
        'Category 1': 'yellow',
        'Category 2': 'orange',
        'Category 3': 'red',
        'Category 4': 'orange',
        'Category 5': 'magenta',
        'Unknown': 'gray'
    }

    # Define the desired order for the legend
    category_order = [
        'Tropical Depression',
        'Tropical Storm',
        'Category 1',
        'Category 2',
        'Category 3',
        'Category 4',
        'Category 5',
        'Unknown'
    ]

    # Map category to colors based on the updated category_colors
    df['Color'] = df['Category'].apply(lambda x: category_colors.get(x, 'gray'))

    # Sort the DataFrame by ModelName and ValidTime to ensure lines connect correctly per model
    df = df.sort_values(['ModelName', 'ValidTime'])

    # Initialize the Plotly figure
    fig = go.Figure()

    # Add a line trace for each model (white lines connecting points of the same model)
    models = df['ModelName'].unique()
    for model in models:
        model_df = df[df['ModelName'] == model].sort_values('ValidTime')
        if model_df.empty:
            continue
        fig.add_trace(go.Scattermapbox(
            lat=model_df['Latitude'],
            lon=model_df['Longitude'],
            mode='lines',
            line=dict(color='white', width=2),
            hoverinfo='none',  # No hover info for the lines
            showlegend=False    # Do not show lines in the legend
        ))

    # Group data by Category in the specified order and plot the markers
    for category in category_order:
        category_df = df[df['Category'] == category]
        if not category_df.empty:
            fig.add_trace(go.Scattermapbox(
                lat=category_df['Latitude'],
                lon=category_df['Longitude'],
                mode='markers',
                name=category,
                marker=dict(
                    size=8,
                    color=category_colors[category],  # Single color per category
                ),
                text=category_df.apply(lambda row: f"Time: {row['ValidTime']:%Y-%m-%d %H:%M UTC}<br>"
                                               f"Wind Speed (mph): {row['MaxWindSpeed_mph']:.1f}<br>"
                                               f"Central Pressure (mb): {row['MinPressure']}<br>"
                                               f"Category: {row['Category']}<br>"
                                               f"Model: {row['ModelName']}", axis=1),
                hoverinfo='text'
            ))

    # Set up the map layout with a dark style
    fig.update_layout(
        mapbox_style='carto-darkmatter',  # Use a dark basemap
        mapbox_zoom=4,
        mapbox_center={"lat": df['Latitude'].mean(), "lon": df['Longitude'].mean()},
        margin={"r": 0, "t": 0, "l": 0, "b": 0},
        legend_title_text='Storm Category',
        legend=dict(
            itemsizing='constant'
        )
    )

    return fig

# Helper function to parse latitude and longitude
def parse_lat_lon(value):
    """
    Parses latitude or longitude value from ATCF format to decimal degrees.
    """
    if isinstance(value, str):
        value = value.strip()
        if value and value[-1] in ['N', 'S', 'E', 'W']:
            direction = value[-1]
            try:
                degrees = float(value[:-1]) / 10.0  # Assuming value is in tenths of degrees
            except ValueError:
                print(f"Error parsing degrees from value: {value}")
                return None
            if direction in ['S', 'W']:
                degrees = -degrees
            return degrees
        elif value:
            # If no direction indicator, try to convert directly
            try:
                return float(value) / 10.0
            except ValueError:
                print(f"Error parsing lat/lon value '{value}': cannot convert to float.")
                return None
    return None

# Helper function to categorize wind speed
def wind_speed_to_category(wind_speed):
    """
    Converts wind speed in knots to hurricane category.
    """
    if wind_speed < 34:
        return 'Tropical Depression'
    elif 34 <= wind_speed <= 63:
        return 'Tropical Storm'
    elif 64 <= wind_speed <= 82:
        return 'Category 1'
    elif 83 <= wind_speed <= 95:
        return 'Category 2'
    elif 96 <= wind_speed <= 112:
        return 'Category 3'
    elif 113 <= wind_speed <= 136:
        return 'Category 4'
    elif wind_speed >= 137:
        return 'Category 5'
    else:
        return 'Unknown'

# Helper function to map categories to colors
def category_to_color(category):
    """
    Maps hurricane category to a color.
    """
    category_colors = {
        'Tropical Depression': 'green',
        'Tropical Storm': 'blue',
        'Category 1': 'yellow',
        'Category 2': 'orange',
        'Category 3': 'red',
        'Category 4': 'orange',
        'Category 5': 'magenta',
        'Unknown': 'gray'
    }
    return category_colors.get(category, 'gray')

In [3]:
fig=create_forecast_map("z:\\Event_Monitor\\forecast_data\\aal142024.dat","2024100718")

Removed 34 rows with Latitude or Longitude equal to 0.


In [6]:
import os
import re
import tempfile
import win32com.client as win32
import plotly.graph_objects as go

def send_map_via_email(fig, recipients, subject, body, cc=None, bcc=None, sender=None, send_as=False):
    """
    Sends a Plotly map as an HTML attachment via Outlook email.

    Parameters:
    - fig (plotly.graph_objects.Figure): The Plotly figure to send.
    - recipients (list or str): List of recipient email addresses or a single email address.
    - subject (str): Subject of the email.
    - body (str): Body content of the email. Supports HTML.
    - cc (list or str, optional): List of CC email addresses or a single email address.
    - bcc (list or str, optional): List of BCC email addresses or a single email address.
    - sender (str, optional): Email address to send on behalf of. Must have permissions.
    - send_as (bool, optional): If True, attempts to set the 'From' property instead of 'SentOnBehalfOfName'.

    Returns:
    - None

    Raises:
    - ValueError: If no valid recipients are provided.
    - Exception: If Outlook is not installed or an error occurs during email sending.
    """
    try:
        # Function to validate email addresses using a simple regex
        def is_valid_email(email):
            regex = r'^[\w\.-]+@[\w\.-]+\.\w+$'
            return re.match(regex, email) is not None

        # Helper function to ensure input is a list
        def ensure_list(input_field):
            if input_field is None:
                return []
            if isinstance(input_field, str):
                input_field = input_field.strip()
                return [input_field] if input_field else []
            if isinstance(input_field, list):
                # Remove any empty strings and strip whitespace
                return [email.strip() for email in input_field if email.strip()]
            raise ValueError("Email fields must be either a string or a list of strings.")

        # Convert recipients, cc, bcc to lists
        recipients = ensure_list(recipients)
        cc = ensure_list(cc)
        bcc = ensure_list(bcc)

        # Combine all recipients to ensure at least one is present
        all_recipients = recipients + cc + bcc

        if not all_recipients:
            raise ValueError("At least one recipient must be provided in To, Cc, or Bcc.")

        # Validate all email addresses
        invalid_emails = [email for email in all_recipients if not is_valid_email(email)]
        if invalid_emails:
            raise ValueError(f"The following email addresses are invalid: {invalid_emails}")

        # Create a temporary directory to store the HTML file
        with tempfile.TemporaryDirectory() as tmpdirname:
            # Define the HTML file path
            html_file_path = os.path.join(tmpdirname, 'forecast_map.html')
            
            # Save the Plotly figure as an HTML file
            fig.write_html(html_file_path, full_html=True)
            
            # Initialize Outlook application
            outlook = win32.Dispatch('outlook.application')
            mail = outlook.CreateItem(0)  # 0: olMailItem
            
            # Set email parameters
            mail.Subject = subject
            mail.Body = body  # Plain text body
            mail.HTMLBody = body  # HTML body
            
            # Set sender if specified
            if sender:
                try:
                    if send_as:
                        mail.From = sender  # Requires 'Send As' permissions
                    else:
                        mail.SentOnBehalfOfName = sender  # Requires 'Send on Behalf Of' permissions
                except Exception as e:
                    print(f"Error setting sender to '{sender}': {e}")
                    raise

            # Add recipients to To
            for recipient in recipients:
                mail.Recipients.Add(recipient)
            
            # Add CC recipients
            for c in cc:
                mail.Recipients.Add(c)
            
            # Add BCC recipients
            for b in bcc:
                mail.Recipients.Add(b)
            
            # Resolve all recipients
            if not mail.Recipients.ResolveAll():
                unresolved = [rec.Name for rec in mail.Recipients if not rec.Resolved]
                raise ValueError(f"Some recipients could not be resolved: {unresolved}")
            
            # Attach the HTML file
            mail.Attachments.Add(Source=html_file_path)
            
            # Send the email
            mail.Send()
            
            print("Email sent successfully.")
    
    except ValueError as ve:
        print(f"ValueError: {ve}")
        raise
    except Exception as e:
        print(f"An error occurred while sending the email: {e}")
        raise

In [7]:


# Update the layout to include mapbox settings
fig.update_layout(
    mapbox=dict(
        accesstoken='pk.eyJ1IjoiYWx2YXJvZmFyaWFzIiwiYSI6ImNtMXptbm9iaDA4OHMybG9vc3VqdW1vZ3oifQ.ZJ8d6gNAiR1htIYxESOYuQ',  # Replace with your Mapbox token
        style='carto-darkmatter',
        center=dict(lat=0, lon=0),
        zoom=1
    ),
    showlegend=False
)

# Define email parameters
recipients = ['swyatt11@gmail.com','alvaro.farias@lockton.com']

subject = 'Forecast Map'
body = """
<html>
<head></head>
<body>
    <p>Dear Team,</p>
    <p>Please find the attached forecast map.</p>
    <p>Best regards,<br>Lockton Storm Monitor</p>
</body>
</html>
"""

# Optional parameters
cc = ['alvaro.farias.velasco@gmail.com']
bcc = ['']
sender = 'alvaro.farias@lockton.com'  # Optional: specify if needed

# Send the email
send_map_via_email(fig, recipients, subject, body, cc=cc, bcc=bcc, sender=sender)

Email sent successfully.
