In [23]:
# from routescience.studylib import datereported, get_mongo
from pymongo import MongoClient
from bson.raw_bson import RawBSONDocument
from pymongoarrow.api import Schema
from datetime import datetime
from pymongoarrow.monkey import patch_all
import pandas as pd
import pyarrow
import matplotlib.pyplot as plt
import plotly.express as px
import seaborn as sns
import numpy as np
import plotly.graph_objects as go
from matplotlib.cm import get_cmap
import os
import geopandas as gpd
from shapely.geometry import Point
from matplotlib.colors import LinearSegmentedColormap, to_hex
import geojson
import datetime

client = MongoClient(f"mongodb://")
db = client["client"]
collection = db["collection"]
patch_all()

# Functions For Graph Types

For this section, all you have to do it run the chunk of code. This would be something to copy and paste over to your notebook so that you are able to use these functions. The purpose of this is to keep the notebook looking clean! Another note, these functinos should be usable for all data. For example, there is a bar chart function that is used throughout the collection.

Functions included in the code block below are for: Bar charts, line charts, scatterplots, double bar charts, horizontal bar charts, pie charts, vehicle profile charts, and heatmaps.

### Pie Chart

In [24]:
def create_pie_chart(data, column=None, title=None, threshold=0.00, width=800, height=800, labels=None):
    """
    Create a pie chart showing the distribution of values in a specified column or variables.

    Parameters:
    - data: DataFrame or list of variables, if DataFrame, it is expected to have a specified column; if list, it contains variables to be plotted
    - column: str, the column containing the values to be plotted (only applicable if data is a DataFrame)
    - title: str, the title of the chart
    - threshold: float, the threshold to filter categories based on their percentages
    - width: int, the width of the chart
    - height: int, the height of the chart
    - labels: list of str, custom labels for the legend (optional)
    """
    if isinstance(data, pd.DataFrame) and column:
        # If data is a DataFrame and column is specified
        value_counts = data[column].value_counts()
        category_df = pd.DataFrame({'Category': value_counts.index, 'Count': value_counts.values})
        category_df['Percentage'] = category_df['Count'] / category_df['Count'].sum()

        # Filter data to only show categories with percentages larger than the threshold
        filtered_df = category_df[category_df['Percentage'] >= threshold]

        # Create a new DataFrame with custom labels for both legend and pie chart slices
        custom_labels_df = pd.DataFrame({'Category': filtered_df['Category'], 'Count': filtered_df['Count']})
        custom_labels_df['Label'] = labels[:len(custom_labels_df)] if labels else custom_labels_df['Category']

        # Plot the pie chart with custom labels
        fig = px.pie(custom_labels_df, values='Count', names='Category', title=title, width=width, height=height)

        # Update layout for custom legend labels
        if labels:
            fig.update_layout(legend_title_text='Legend', legend_tracegroupgap=0)
            fig.update_traces(name=labels)

        fig.show()
    elif isinstance(data, list):
        # If data is a list, assume it contains variables to be plotted
        total_sum = sum(data)
        percentages = [value / total_sum for value in data]

        # Create a temporary DataFrame for plotting
        temp_df = pd.DataFrame({'Category': range(1, len(data)+1), 'Count': data, 'Percentage': percentages})

        # Filter data to only show categories with percentages larger than the threshold
        filtered_df = temp_df[temp_df['Percentage'] >= threshold]

        # Create a new DataFrame with custom labels for both legend and pie chart slices
        custom_labels_df = pd.DataFrame({'Category': filtered_df['Category'], 'Count': filtered_df['Count']})
        custom_labels_df['Label'] = labels[:len(custom_labels_df)] if labels else custom_labels_df['Category']

        # Plot the pie chart with custom labels
        fig = px.pie(custom_labels_df, values='Count', names='Category', title=title, width=width, height=height)

        if labels:
            fig.update_layout(legend_title_text='Legend', legend_tracegroupgap=0)

    # Update the labels attribute with custom labels
            fig.data[0].labels = labels

        fig.show()
    
    else:
     raise ValueError("Invalid input. Provide either a DataFrame with a specified column or a list of variables.")

### Get Event Data Counts

In [25]:
def gather_data_count(data, column):
    # break down all legal events and group them by type
    columns = data[["UniqueTripID", column]]
    normalized_columns = columns.explode(column).dropna()
    df = normalized_columns[column].apply(pd.Series)
    grouped_columns = columns.join(df).dropna()

    grouped_column_count = grouped_columns.groupby('Type').count()
    return grouped_column_count

### Get Event Data Counts with Stops

In [26]:
def gather_events_withstops(dataframe, event_column):
    """
    Break down all events and group them by type.

    Parameters:
    - dataframe: DataFrame, the input data frame containing the event data
    - event_column: str, the name of the column containing the event data to be exploded and grouped

    Returns:
    - DataFrame, the resulting data frame with grouped events and stops information
    """
    event_col = dataframe[["UniqueTripID", "Analysis_Stops", event_column, 'Analysis_AccountID']]
    normalized_event_col = event_col.explode(event_column).dropna()
    df = normalized_event_col[event_column].apply(pd.Series)
    grouped_events = event_col.join(df).dropna()

    stops = pd.DataFrame() 
    stops[['Address', 'City', 'County', 'Jurisdiction']] = grouped_events["Analysis_Stops"].apply(
        lambda x: pd.Series([x[0]['Address'], x[0]['City'], x[0]['County'], x[0]['Jurisdiction']]) if len(x) > 0 else pd.Series(['', '', '', ''])
    )
    grouped_events_with_stops = grouped_events.join(stops).dropna()
    
    return grouped_events_with_stops

# Example usage:
# Planned_grouped_closures = gather_events_withstops(analysis_df, "Analysis_Planned_Safety_Closures")

### Get Event Counts

In [27]:
def gather_event_count(data, column):
    columns = data[["UniqueTripID", column]]
    normalized_columns = columns.explode(column).dropna()
    df = normalized_columns[column].apply(pd.Series)
    grouped_columns = columns.join(df).dropna()

    grouped_column_count = grouped_columns.groupby('Type')
    return grouped_column_count

### Process Event Data

In [28]:
def merge_event_data(planned_data, actual_data, computed_data):
    """
    Function to merge planned, actual, and computed closure data.
    It ensures that missing data is handled by filling NaNs with 0.
    """
    merged_closures = planned_data.merge(actual_data, on='Type', how='outer', suffixes=('_pclose', '_aclose'))
    merged_closures = merged_closures.merge(computed_data, on='Type', how='outer', suffixes=('', '_cclose'))

    closure_columns = ['DistToStop_pclose', 'DistToStop_aclose', 'DistToStop_cclose']
    for col in closure_columns:
        if col in merged_closures.columns:
            merged_closures[col] = merged_closures[col].fillna(0)
    
    print("Columns after merge:", merged_closures.columns)  # To inspect column names
    
    return merged_closures

### Process RTR Data

In [29]:
def process_column_by_type(df, column_name):
    col_df = df[["UniqueTripID", column_name]].copy()
    normalized_col_df = col_df.explode(column_name).dropna()
    expanded_df = normalized_col_df[column_name].apply(pd.Series)
    joined_df = normalized_col_df.join(expanded_df).dropna()
    grouped_by_class = joined_df.groupby(['UniqueTripID', 'Class']).agg({'DistMiles': 'sum'}).reset_index()
    total_distance = grouped_by_class.groupby('UniqueTripID')['DistMiles'].sum().reset_index()
    grouped_by_class = grouped_by_class.merge(total_distance, on='UniqueTripID', suffixes=('', '_total'))
    grouped_by_class['Percentage'] = (grouped_by_class['DistMiles'] / grouped_by_class['DistMiles_total']) * 100
    merged_df = joined_df.merge(grouped_by_class[['UniqueTripID', 'Class', 'Percentage']], on=['UniqueTripID', 'Class'])
    cleaned_df = merged_df.drop(column_name, axis=1)
    final_grouped_df = expanded_df.groupby('Class').agg({'DistMiles': 'sum', 'DistMilesBuiltUp': 'sum'}).reset_index()

    return final_grouped_df

# Example usage:
# result_df = process_column_by_type(analysis_df, 'Analysis_Actual_RoadTypeReport')


### Bar Chart - Simple

In [30]:
def create_bar_chart_horizontal(df, y_column, title, n=5, y_label=None, x_label="Count"):

    """
    Create a horizontal bar chart showing the top N values in a specified column.

    Parameters:
    - df: DataFrame, the input data containing the values to be plotted
    - y_column: str, the column containing the values to be plotted
    - title: str, the title of the chart
    - n: int, the number of top values to display (default is 5)
    - y_label: str, the label for the y-axis (default is None)
    - x_label: str, the label for the x-axis (default is "Count")
    """

    plt.figure(figsize=(10, 6))

    # Group by the specified column and count occurrences, then sort in descending order
    column_counts = df[y_column].value_counts().sort_values(ascending=False)

    # Select the top N values
    top_values = column_counts.head(n)

    bars = plt.barh(top_values.index.astype(str), top_values)  # Convert index to strings
    
    if y_label:
        plt.ylabel(y_label)
    else:
        plt.ylabel(y_column.capitalize())  # Capitalize the column name for better aesthetics
    
    plt.xlabel(x_label)
    plt.title(title)

    # Set the zorder of the grid lines to a higher value than the bars
    plt.gca().grid(which='both', axis='x', color='gray', linestyle='-', linewidth=0.5, zorder=0)

    # Add numbers beside the bars
    for bar in bars:
        xval = bar.get_width()
        plt.text(xval, bar.get_y() + bar.get_height()/2, f"{int(xval)}", ha='left', va='center')

    plt.show()

### Bar Chart

In [31]:
def bar_chart(df, type_column, state_column=None, title=None, top_n_states=10, types=None, figure_width=18,
                        x_label="Count", y_label="States", orientation='horizontal'):
    """
    Plot a horizontal or vertical countplot of types per state, showing the top N states.

    Parameters:
    - df: DataFrame, the input data containing the values to be plotted
    - type_column: str, the column representing the event types
    - state_column: str or None, the column representing the states (default is None)
    - title: str or None, the title of the plot (default is None)
    - top_n_states: int, number of top states to display (default is 10)
    - types: List of specific 'Type' values to include in the plot (default is None, which includes all).
    - figure_width: int, the width of the figure (default is 18)
    - x_label: str, the label for the x-axis (default is "Count")
    - y_label: str, the label for the y-axis (default is "States")
    - orientation: str, optional, either 'horizontal' (default) or 'vertical'

    Returns:
    - None
    """

    plt.figure(figsize=(figure_width, 8))

    # If types are specified, filter rows accordingly
    if types is not None:
        if isinstance(types, str):
            types = [types]  # Convert a single string to a list
        df = df[df[type_column].isin(types)]

    if state_column is not None:
        # Choose the top N states based on their counts
        top_states = df[state_column].value_counts().nlargest(top_n_states).index

        # Filter the DataFrame to include only the top N states
        df_top_states = df[df[state_column].isin(top_states)]

        # Use seaborn's countplot with 'Type' on y-axis and horizontal or vertical orientation
        if orientation == 'horizontal':
            ax = sns.countplot(y=state_column, hue=type_column, data=df_top_states, orient='h')
        elif orientation == 'vertical':
            ax = sns.countplot(x=state_column, hue=type_column, data=df_top_states, orient='v')
        else:
            raise ValueError("Invalid value for 'orientation'. It should be 'horizontal' or 'vertical'.")

        # Add count values on the right side of the bars
        for p in ax.patches:
            if orientation == 'horizontal':
                ax.annotate(f'{p.get_width()}', (p.get_width(), p.get_y() + p.get_height() / 2.), va='center', ha='left', xytext=(5, 0), textcoords='offset points')
            elif orientation == 'vertical':
                ax.annotate(f'{p.get_height()}', (p.get_x() + p.get_width() / 2., p.get_height()), va='bottom', ha='center', xytext=(0, 5), textcoords='offset points')

        # Add grid lines behind the bars on the x-axis
        if orientation == 'horizontal':
            ax.grid(axis='x', linestyle='--', linewidth=0.5)
        elif orientation == 'vertical':
            ax.grid(axis='y', linestyle='--', linewidth=0.5)

        plt.title(title)
        plt.legend(title=type_column, bbox_to_anchor=(1.05, 1), loc='upper left')
        plt.xlabel(x_label)
        plt.ylabel(y_label)
        plt.tight_layout()
        plt.show()
    else:
        # If state_column is None, create a simple countplot without grouping by states
        if orientation == 'horizontal':
            ax = sns.countplot(y=type_column, data=df, orient='h')
        elif orientation == 'vertical':
            ax = sns.countplot(x=type_column, data=df, orient='v')
        else:
            raise ValueError("Invalid value for 'orientation'. It should be 'horizontal' or 'vertical'.")

        # Add count values on the bars
        for p in ax.patches:
            if orientation == 'horizontal':
                ax.annotate(f'{int(p.get_width())}', (p.get_width(), p.get_y() + p.get_height() / 2.), va='center', ha='left', xytext=(5, 0), textcoords='offset points')
            elif orientation == 'vertical':
                ax.annotate(f'{int(p.get_height())}', (p.get_x() + p.get_width() / 2., p.get_height()), va='bottom', ha='center', xytext=(0, 5), textcoords='offset points')

        # Add grid lines behind the bars on the x-axis
        if orientation == 'horizontal':
            ax.grid(axis='x', linestyle='--', linewidth=0.5)
        elif orientation == 'vertical':
            ax.grid(axis='y', linestyle='--', linewidth=0.5)

        plt.title(title)
        plt.xlabel(x_label)
        plt.ylabel(y_label)
        plt.tight_layout()
        plt.show()

### Bar Chart - Events

In [32]:
def event_bar_chart(merged_event, base_column, legend_labels, title, include_types=None, types=None, figsize=(12, 9), legend_loc='upper right', colors=None):
    """
    Plot a horizontal bar graph for multiple event types from a merged event DataFrame.

    Parameters:
    - merged_event: DataFrame with columns for planned, actual, and computed events (or similar).
    - event_types: List of event types to include in the plot (e.g., ['pclose', 'aclose', 'cclose']).
    - base_column: Base column name prefix (e.g., 'DistToStop', 'Count', etc.).
    - legend_labels: List of labels for the legend corresponding to the event_types.
    - title: Title of the plot.
    - include_types: List of event 'Type' values to include in the plot (default is None, which includes all).
    - types: List of specific 'Type' values to include in the plot (default is None, which includes all).
    - figsize: Size of the figure.
    - legend_loc: Location of the legend in the plot.
    - colors: Dictionary mapping event types to colors.

    Returns:
    - None (displays the plot).
    """
    # If include_types is specified, filter rows accordingly
    if include_types is not None:
        merged_event = merged_event[merged_event['Type'].isin(include_types)]

    # If types is specified, filter rows accordingly
    if types is not None:
        if isinstance(types, str):
            types = [types]  # Convert a single string to a list
        merged_event = merged_event[merged_event.index.isin(types)]

    # Filter valid columns based on event_types that exist in the DataFrame
    valid_event_types = [etype for etype in  ['pclose', 'aclose', 'cclose'] if f'{base_column}_{etype}' in merged_event.columns]
    
    if not valid_event_types:
        print("No valid event types found in the DataFrame.")
        return

    # Create a 'Total' column by summing the valid event columns
    total_column_names = [f'{base_column}_{etype}' for etype in valid_event_types]
    merged_event['Total'] = merged_event[total_column_names].sum(axis=1)
    merged_event = merged_event.sort_values(by='Total', ascending=True)

    # Create a horizontal bar graph with multiple bars for each event type
    fig, ax = plt.subplots(figsize=figsize)

    bar_width = 0.2  # Adjust the width as needed

    for i, etype in enumerate(valid_event_types):
        column_name = f'{base_column}_{etype}'
        counts = merged_event[column_name]
        positions = range(len(merged_event))

        # Adjust the position to place bars next to each other
        positions = [pos + i * bar_width for pos in positions]

        # Use custom colors if provided, otherwise use a default set of colors
        color = colors.get(etype, None) if colors else None

        ax.barh(positions, counts, height=bar_width, label=f'{etype.capitalize()}', color=color)

        # Annotate bars with count values
        for j, count in enumerate(counts):
            ax.text(int(count), positions[j], str(int(count)), ha='left', va='center', color='black')

    ax.set_xlabel('Count')
    ax.set_ylabel('Event Type')
    ax.set_yticks(range(len(merged_event)))
    ax.set_yticklabels(merged_event.index)
    ax.set_title(title)

    # Set custom legend labels and title
    ax.legend(labels=legend_labels, title='Event Types', loc=legend_loc)

    # Add grid lines behind the bars on the x-axis
    ax.grid(axis='x', linestyle='--', linewidth=0.5)

    plt.show()

### Bar Chart - Individual Route Events

In [33]:
def plot_route_event_bar_chart(filtered_merged_closures, suffix, title):
    columns_to_include = [col for col in filtered_merged_closures.columns if col.endswith(suffix) or col == 'Total']
    
    filtered_df = filtered_merged_closures[columns_to_include]
    legend_labels = [suffix.replace('_', ' ').title()]

    event_bar_chart(
        filtered_df,
        base_column='DistToStop',
        legend_labels=legend_labels,
        title=title,
        include_types=None,
        types=None,
        figsize=(12, 9),
        legend_loc='lower right',
        colors={suffix: 'red'}
    )

### Bar Chart - RTR

In [34]:
def plot_dist_miles(dist_miles_actual, dist_miles_planned, dist_miles_computed):
    # Create a DataFrame for easier plotting
    dist_miles_df = pd.DataFrame({
        'Class': dist_miles_actual.index,
        'DistMiles_actual': dist_miles_actual.values,
        'DistMiles_planned': dist_miles_planned.values,
        'DistMiles_computed': dist_miles_computed.values
    }).reset_index(drop=True)

    # Plotting the side-by-side bar graph
    fig, ax = plt.subplots(figsize=(12, 8))

    # Define the width of the bars and the positions
    bar_width = 0.25
    bar_positions = np.arange(len(dist_miles_df['Class']))

    # Plot each bar with the specified colors and positions
    bars_actual = ax.barh(bar_positions - bar_width, dist_miles_df['DistMiles_actual'], height=bar_width, color='blue', label='Actual')
    bars_planned = ax.barh(bar_positions, dist_miles_df['DistMiles_planned'], height=bar_width, color='red', label='Planned')
    bars_computed = ax.barh(bar_positions + bar_width, dist_miles_df['DistMiles_computed'], height=bar_width, color='green', label='Computed')

    # Adding labels and title
    ax.set_ylabel('Class')
    ax.set_xlabel('Total DistMiles')
    ax.set_title('Total DistMiles per Class')
    ax.set_yticks(bar_positions)
    ax.set_yticklabels(dist_miles_df['Class'])
    ax.legend()

    # Add sum labels to the top of each bar
    for bars in [bars_actual, bars_planned, bars_computed]:
        for bar in bars:
            width = bar.get_width()
            ax.text(width, bar.get_y() + bar.get_height() / 2, f'{width:.2f}', ha='left', va='center')

    # Display the plot
    plt.xticks(rotation=45)
    plt.show()

# Example usage:
# plot_dist_miles(dist_miles_actual, dist_miles_planned, dist_miles_computed)

### Dataset Comparisons

In [35]:
def compare(main, *datasets, type_filter=None, custom_title=None, colors=None, dataset_names=None, orientation='horizontal', x_label='Count of Trips', y_label=None):

    """
    Plot the count of unique trip IDs in the main dataset and additional datasets.

    Parameters:
    - main: DataFrame or scalar, the main dataset containing unique trip IDs or a scalar value
    - *datasets: Variable number of DataFrames, additional datasets to compare with the main dataset
    - type_filter: str or None, optional, a filter based on the 'Type' column for the datasets
    - custom_title: str or None, optional, a custom title for the plot
    - colors: list of str or None, optional, colors for each dataset bar
    - dataset_names: list of str or None, optional, names for each dataset
    - orientation: str, optional, either 'horizontal' (default) or 'vertical'
    - x_label: str, optional, label for the x-axis
    - y_label: str or None, optional, label for the y-axis

    Returns:
    - None (displays the plot)
    """
    
    # Check if main is a DataFrame or scalar
    if isinstance(main, pd.DataFrame):
        main_count = main['UniqueTripID'].nunique()
    elif isinstance(main, (int, float)):
        main_count = main
    else:
        raise ValueError("Invalid type for 'main'. It should be a DataFrame or a scalar value.")

    # Filter datasets based on the 'type' column if a type_filter is provided
    if type_filter is not None:
        datasets = [df[df['Type'] == type_filter] if isinstance(df, pd.DataFrame) else df for df in datasets]

    # If main is a scalar, use it directly; otherwise, get the count
    main_count = main_count if isinstance(main, (int, float)) else main['UniqueTripID'].nunique()

    counts = [df['UniqueTripID'].nunique() if isinstance(df, pd.DataFrame) else df for df in datasets]

    # Round the count values to the nearest tenth
    main_count = round(main_count, 1)
    counts = [round(count, 1) if isinstance(count, (int, float)) else count for count in counts]

    # Use default colors and dataset names if not provided
    if colors is None:
        colors = ['gray', 'red', 'blue', 'green']

    if dataset_names is None:
        dataset_names = ['Main'] + [f'Dataset {i}' for i in range(1, len(counts) + 1)]

    # Create a bar graph with colored bars
    fig, ax = plt.subplots()

    if orientation == 'horizontal':
        bars = ax.barh(dataset_names, [main_count] + counts, color=colors[:len(dataset_names)])
        for bar, count in zip(bars, [main_count] + counts):
            plt.text(bar.get_width(), bar.get_y() + bar.get_height()/2, f'{count}', va='center', ha='left')
        ax.grid(axis='x', linestyle='--', alpha=0.6)
        plt.xlabel(x_label)
        if y_label is not None:
            plt.ylabel(y_label)
    elif orientation == 'vertical':
        bars = ax.bar(dataset_names, [main_count] + counts, color=colors[:len(dataset_names)])
        for bar, count in zip(bars, [main_count] + counts):
            plt.text(bar.get_x() + bar.get_width()/2, bar.get_height(), f'{count}', va='bottom', ha='center')
        ax.grid(axis='y', linestyle='--', alpha=0.6)
        plt.ylabel(x_label)
        if y_label is not None:
            plt.xlabel(y_label)
    else:
        raise ValueError("Invalid value for 'orientation'. It should be 'horizontal' or 'vertical'.")

    # Set the title based on the custom_title parameter
    if custom_title is not None:
        ax.set_title(custom_title)
    else:
        default_title = f'Count of Unique IDs in Each Dataset{" Filtered by Type: " + type_filter if type_filter else ""}'
        ax.set_title(default_title)

    plt.show()


### Stacked Bar Graph

In [36]:
def create_stacked_bar_chart(data, type_columns, jurisdiction_column, top_n=5, title='Stacked Bar Chart', x_label=None, y_label=None, type_labels=None, graph_titles=None, cmap='tab10', value_color='black', font_size=8, distance_from_bar=0.02):
    """
    Create a stacked bar chart for multiple type columns, each representing a different category, and stacked by jurisdiction.

    Parameters:
    - data: DataFrame, the input data frame containing information for the chart
    - type_columns: list, the list of columns representing different categories for stacking
    - jurisdiction_column: str, the column representing jurisdictions for stacking
    - top_n: int, the number of top jurisdictions to display (default is 5)
    - title: str, the title of the chart (default is 'Stacked Bar Chart')
    - x_label: str or None, optional, the label for the x-axis (None if not needed)
    - y_label: str or None, optional, the label for the y-axis (None if not needed)
    - type_labels: dict or None, optional, a dictionary mapping type column names to custom labels (None if not needed)
    - graph_titles: dict or None, optional, a dictionary mapping type column names to custom titles (None if not needed)
    - value_color: str, the color of the displayed values on top of each bar (default is 'black')
    - font_size: int, the font size for displayed values (default is 8)
    - distance_from_bar: float, the distance from the bar for displaying values (default is 0.02)
    """

    # Select the top N jurisdictions based on counts
    top_jurisdictions = data[jurisdiction_column].value_counts().nlargest(top_n).index
    
    # Filter the data for the top jurisdictions
    data_top_jurisdictions = data[data[jurisdiction_column].isin(top_jurisdictions)]

    # Set default type labels if not specified
    if type_labels is None:
        type_labels = {col: col.capitalize() for col in type_columns}
    
    # Set default graph titles if not specified
    if graph_titles is None:
        graph_titles = {col: f'{title} for {type_labels.get(col, col)}' for col in type_columns}
    
    # Get a colormap with distinct colors for each type column
    color_map = get_cmap(cmap, len(type_columns))

    # Create a mapping of type columns to colors
    type_color_mapping = {col: color_map(i) for i, col in enumerate(type_columns)}

    # Plot a single bar for each jurisdiction with stacking for 'NumRoutes' and 'OORCount'
    fig, ax = plt.subplots(figsize=(10, 6))

    # Group the data by jurisdiction and calculate counts for each type column
    grouped_data = data_top_jurisdictions.groupby(jurisdiction_column)[type_columns].sum()

    # Plot the stacked bar chart with distinct colors for each type column
    ax = grouped_data.plot(kind='bar', stacked=True, ax=ax, color=[type_color_mapping[col] for col in type_columns])

    # Set labels and title
    ax.set_xlabel(x_label or jurisdiction_column)
    ax.set_ylabel(y_label or 'Count')
    ax.set_title(title, color='black')

    # Show legend with corrected titles
    ax.legend(title='Type', labels=[type_labels.get(col, col) for col in type_columns], loc='upper right')

    # Add counts on top of each bar (excluding 0 values) to the side of the bars
    for idx, p in enumerate(ax.patches):
        width, height = p.get_width(), p.get_height()
        x, y = p.get_xy()

        # Determine the side for annotation based on the index
        annotation_side = 'left' if idx % 2 == 0 else 'right'

        # Adjust the y-position of annotations to ensure they are within the graph boundaries
        annotation_y = y + height / 2

        # Adjust the x-position for the side placement
        annotation_x = x - distance_from_bar if annotation_side == 'left' else x + width + distance_from_bar

        # Add annotation only if the height is greater than 0
        if height > 0:
            ax.annotate(f'{height}', (annotation_x, annotation_y),
                        ha='center', va='center', color=value_color, rotation=0, 
                        xycoords='data', textcoords='offset points', xytext=(5, 0) if annotation_side == 'left' else (-5, 0),
                        fontsize=font_size)

    # Adjust layout
    plt.tight_layout()

    # Show the plot
    plt.show()

# Example usage:
# create_stacked_bar_chart(grouped_data, type_columns=['NumRoutes', 'OORCount'], jurisdiction_column='LaneID', title='Custom Title', font_size=9, distance_from_bar=0.1)


### Stacked Bar/Table

In [37]:
def create_stacked_bar_chart_table(data, type_columns, jurisdiction_column, top_n=5, title='Stacked Bar Chart', x_label=None, y_label=None, type_labels=None, graph_titles=None, color_gradients=None, value_color='black', font_size=8, distance_from_bar=0.02, x_label_rotation=0, show_counts_threshold=0, table_colors=None, wspace=0.4, table_space=0.8):
    # Ensure that type_columns exist in the DataFrame
    invalid_columns = set(type_columns) - set(data.columns)
    if invalid_columns:
        raise ValueError(f"Invalid columns specified in type_columns: {invalid_columns}")

    # Select the top N jurisdictions based on counts
    top_jurisdictions = data[jurisdiction_column].value_counts().nlargest(top_n).index
    
    # Filter the data for the top jurisdictions
    data_top_jurisdictions = data[data[jurisdiction_column].isin(top_jurisdictions)]

    # Set default type labels if not specified
    if type_labels is None:
        type_labels = {col: col.capitalize() for col in type_columns}
    
    # Set default graph titles if not specified
    if graph_titles is None:
        graph_titles = {col: f'{title} for {type_labels.get(col, col)}' for col in type_columns}
    
    # Get a colormap with distinct colors for each unique value within all type columns
    default_color_map = plt.get_cmap('tab10', len(data[type_columns].stack().unique()))
    
    # Create a mapping of type column value to color gradient
    if color_gradients is None:
        color_gradients = {col: default_color_map for col in type_columns}
    else:
        for col in type_columns:
            if col not in color_gradients:
                color_gradients[col] = default_color_map

    # Plot a single bar for each jurisdiction with stacking for type columns
    fig, (ax, ax_table) = plt.subplots(1, 2, gridspec_kw={'width_ratios': [3, 1]}, figsize=(12, 6))

    # Manually adjust the layout to move the table closer to the graph
    fig.subplots_adjust(wspace=wspace, right=table_space)

    # Plot the stacked bar chart with distinct colors for each value within each type column
    bottom_values = None
    for col in type_columns:
        # Get the color gradient for the current type column
        color_gradient = color_gradients[col]

        # Iterate through unique values in the current type column
        for i, value in enumerate(data_top_jurisdictions[col].unique()):
            # Filter the data for the specific type column value
            type_data = data_top_jurisdictions[(data_top_jurisdictions[col] == value) & (data_top_jurisdictions[col] != 0)]

            # Count occurrences for each jurisdiction
            jurisdiction_counts = type_data[jurisdiction_column].value_counts().reindex(top_jurisdictions, fill_value=0)

            # Only add to the table if there are non-zero counts
            if jurisdiction_counts.sum() > 0:
                # Calculate the color based on the color gradient
                color = to_hex(color_gradient(i / len(data_top_jurisdictions[col].unique())))

                # Plot a bar for each jurisdiction
                bars = ax.bar(jurisdiction_counts.index, jurisdiction_counts, label=f'{type_labels.get(col, col)} - {value}', color=color, bottom=bottom_values)

                if bottom_values is None:
                    bottom_values = jurisdiction_counts
                else:
                    bottom_values += jurisdiction_counts

                # Add counts on top of each bar to the side of the bars
                for bar, count in zip(bars, jurisdiction_counts):
                    width, height = bar.get_width(), bar.get_height()
                    x, y = bar.get_xy()

                    # Determine the side for annotation based on the width
                    annotation_side = 'left' if width > 0 else 'right'

                    # Adjust the y-position of annotations to ensure they are within the graph boundaries
                    annotation_y = y + height / 2

                    # Adjust the x-position for the side placement
                    annotation_x = x - distance_from_bar if annotation_side == 'left' else x + width + distance_from_bar

                    # Attempt to move the annotation horizontally to avoid overlap
                    x_offset = 10 if annotation_side == 'left' else -10
                    annotation_x += x_offset

                    # Add annotation only if the height is greater than the threshold
                    if height > show_counts_threshold:
                        # Add annotation
                        ax.annotate(f'{count}', (annotation_x, annotation_y),
                                    ha='center', va='center', color=value_color, rotation=0, 
                                    xycoords='data', textcoords='offset points', xytext=(5, 0) if annotation_side == 'left' else (-5, 0),
                                    fontsize=font_size)

    # Set labels and title
    ax.set_xlabel(x_label or jurisdiction_column)
    ax.set_ylabel(y_label or 'Count')
    ax.set_title(title, color='black')

    # Customize x-axis ticks
    ax.set_xticks(data_top_jurisdictions[jurisdiction_column].unique())

    # Show legend with corrected titles
    ax.legend(title='Type', bbox_to_anchor=(1.05, 1), loc='upper left')

    # Add a table next to the graph
    table_data = []

    # Iterate through unique values in the type columns
    for col in type_columns:
        for value in data_top_jurisdictions[col].unique():
            row_data = []
            for jurisdiction in top_jurisdictions:
                # Filter the data for the specific type column value, jurisdiction, and count occurrences
                type_data = data_top_jurisdictions[(data_top_jurisdictions[col] == value) & (data_top_jurisdictions[jurisdiction_column] == jurisdiction)]
                count = type_data[col].count()

                # Add the count to the row data
                row_data.append(count)

            # Add the row data to the table data
            table_data.append(row_data)

    # Use unique values in the type columns as rowLabels
    table = ax_table.table(cellText=table_data, rowLabels=[f'{col} - {value}' for col in type_columns for value in data_top_jurisdictions[col].unique()], colLabels=[f'{jurisdiction}' for jurisdiction in top_jurisdictions], loc='center', bbox=[0.1, 0, table_space, 1])

    # Add cell colors based on the provided table_colors for table_cells
    if table_colors is not None:
        for col_idx, col in enumerate(type_columns):
            unique_values = data_top_jurisdictions[col].unique()
            for j, value in enumerate(unique_values):
                for k, jurisdiction in enumerate(top_jurisdictions):
                    # Get the color gradient for the current type column
                    color_gradient = table_colors[col]

                    # Use the proportion of the shifted row index to determine the color
                    cell_color = to_hex(color_gradient(j / len(unique_values)))

                    # Ensure the index is not negative and is within the range of the table
                    row_index = j + len(unique_values) * col_idx + 1
                    if 0 <= row_index < len(table.get_celld()):
                        table[row_index, k].set_facecolor(cell_color)

    # Hide the axes for the table
    ax_table.axis('off')

    # Adjust layout
    plt.tight_layout()

    # Show the plot
    plt.show()


### Histogram/Boxplot

In [38]:
def histogram_boxplot(analysisn, feature, title=None, event_types=None, figsize=(7, 7), kde=False, bins=None, boxplot_color="#f1f1f6", histogram_color="royalblue", threshold=None, legend_position='right'):
    """
    Create a combined histogram and boxplot for a given feature in the dataset.

    Parameters:
    - analysisn: DataFrame, the input data frame containing the analysis data
    - feature: str, the column name for the feature to be analyzed
    - title: str or None, optional, the title of the plot (None for default title)
    - event_types: list or None, optional, a list of event types to include in the analysis (None for all event types)
    - figsize: tuple, optional, the size of the resulting plot (default is (7, 7))
    - boxplot_color: str, optional, the color of the boxplot (default is "#f1f1f6")
    - histogram_color: str, optional, the color of the histogram (default is "royalblue")
    - threshold: float or None, optional, a threshold value to filter out data points beyond the threshold (None for no filtering)
    - legend_position: str, optional, the position of the legend ('right' or 'left') (default is 'right')
    """
    if event_types:
        analysisn = analysisn[analysisn['Type'].isin(event_types)]

    if threshold is not None:
        # Filter out data points beyond the threshold
        analysisn = analysisn[analysisn[feature] <= threshold]

    f2, (ax_box2, ax_hist2) = plt.subplots(
        nrows=2,
        sharex=True,
        gridspec_kw={"height_ratios": (0.25, 0.75)},
        figsize=figsize,
    )

    sns.boxplot(
        data=analysisn, x=feature, ax=ax_box2, showmeans=True, color=boxplot_color
    ).set(title=f"Distribution of {title}" if title else f"Distribution of {feature}")

    if bins:
        sns.histplot(
            data=analysisn, x=feature, kde=kde, ax=ax_hist2, bins=bins, color=histogram_color
        )
    else:
        sns.histplot(
            data=analysisn, x=feature, kde=kde, ax=ax_hist2, color=histogram_color
        )

    mean_value = analysisn[feature].mean()
    median_value = analysisn[feature].copy().median()
    sum_value = analysisn[feature].sum()  # Calculate the sum

    ax_hist2.axvline(
        mean_value, color="green", linestyle="--", label=f"Mean ({mean_value:.2f})"
    )
    ax_hist2.axvline(
        median_value, color="black", linestyle="-", label=f"Median ({median_value:.2f})"
    )

    total_ids = analysisn['UniqueTripID'].nunique()

    # Annotate the total IDs right underneath the legend, moved to the right
    ax_hist2.text(
        1.05,
        1.0,
        f"Total IDs: {total_ids}",
        transform=ax_hist2.transAxes,
        horizontalalignment='left',
        verticalalignment='top',
        bbox=dict(facecolor='#f1f1f6', edgecolor='gray', boxstyle='round'),
    )

    # Annotate the sum right underneath the legend, moved to the right
    ax_hist2.text(
        1.05,
        0.90,
        f"Sum: {sum_value:.2f}",
        transform=ax_hist2.transAxes,
        horizontalalignment='left',
        verticalalignment='top',
        bbox=dict(facecolor='#f1f1f6', edgecolor='gray', boxstyle='round'),
    )
    
    # Move the entire legend to the left or right without overlapping
    if legend_position == 'right':
        legend_loc = 'upper left'
        bbox_anchor = (1.0, 0.85)  # Adjust the second value here
    elif legend_position == 'left':
        legend_loc = 'upper right'
        bbox_anchor = (-0.3, 1.0)  # Adjust the second value here
    else:
        raise ValueError("Invalid legend position. Use 'left' or 'right'.")

    ax_hist2.legend(loc=legend_loc, bbox_to_anchor=bbox_anchor)

# Example usage with a threshold of 100 and legend on the left
# histogram_boxplot(analysisn, 'your_feature', threshold=100, legend_position='left')


### Process Vehicle Combinations and Identify Custom Profiles

In [39]:
def process_and_summarize_vehicle_profiles(data_frame):
    """
    Process vehicle combinations in a DataFrame and summarize custom vehicle profiles.

    Parameters:
    - data_frame: DataFrame, the input data frame containing information about vehicles, including columns 'Height', 'Length', 'Weight', and 'Width'

    Returns:
    - Tuple, containing two DataFrames:
        1. Updated input data frame with an additional column 'combination_name'
        2. Summary of custom vehicle combinations with counts, sorted in descending order
    """

    # Create a dictionary to map combinations to names
    combination_names = {
        (16200, 63600, 8000, 10200): 'Sample',
        # Add more combinations and names as needed
    }

    # Create a new column to store the combination name
    data_frame['combination_name'] = ''

    # Iterate over each row in the dataset
    for index, row in data_frame.iterrows():
        # Extract the values for length, width, height, and length
        height = row['Height']
        length = row['Length']
        weight = row['Weight']
        width = row['Width']

        # Define your logic to determine the combination name
        combination = (height, length, weight, width)
        if combination in combination_names:
            combination_name = combination_names[combination]
        else:
            combination_name = 'custom'

        # Assign the combination name to the 'combination_name' column
        data_frame.at[index, 'combination_name'] = combination_name

    # Filter rows with custom vehicle types
    custom_vehicles = data_frame[data_frame['combination_name'] == 'custom']

    # Select only the required columns
    custom_vehicles = custom_vehicles[['Height', 'Length', 'Width', 'Weight']]

    # Get the count of each combination
    combinations = custom_vehicles.groupby(['Height', 'Length', 'Width', 'Weight']).size().reset_index(name='count')

    # Return the updated DataFrame and the resulting combinations
    return data_frame, combinations.sort_values('count', ascending=False)

# Example usage:
# updated_data_frame, custom_combinations_summary = process_and_summarize_vehicle_profiles(original_data_frame)


In [40]:
def create_heatmap(df, columns=None, title="Heatmap", x_title=None, y_title=None):
    """
    Create a heatmap from a DataFrame.

    Parameters:
    - df: DataFrame, the data frame containing the data
    - columns: list or None, optional, a list of column names to be used for the heatmap; if None, all columns will be used
    - title: str, the title of the heatmap (default is "Heatmap")
    - x_title: str or None, optional, the title for the x-axis
    - y_title: str or None, optional, the title for the y-axis
    """

    plt.figure(figsize=(10, 6))

    # If specific columns are provided, select only those columns
    if columns is not None:
        df = df[columns]

    # Convert dictionaries to strings
    df = df.applymap(lambda x: str(x) if isinstance(x, dict) else x)

    sns.heatmap(df.apply(pd.value_counts).fillna(0), annot=True, fmt='g', cmap='Blues')

    # Set x and y titles if provided
    if x_title is not None:
        plt.xlabel(x_title)
    if y_title is not None:
        plt.ylabel(y_title)

    plt.title(title)
    plt.show()


### Remove Outliers

In [41]:
def outliers(c_df_filtered, column):
    """
    Remove outliers from a DataFrame based on a specified column using the IQR method.

    Parameters:
    - c_df_filtered: DataFrame, the input data frame containing the column with potential outliers
    - column: str, the name of the column to check for outliers

    Returns:
    - DataFrame, a filtered DataFrame excluding the outliers
    """

    Q1 = c_df_filtered[column].quantile(0.25)
    Q3 = c_df_filtered[column].quantile(0.75)
    IQR = Q3 - Q1

    # filter outliers
    df_filtered = c_df_filtered[(c_df_filtered[column] >= Q1 - 1.5 * IQR) & (c_df_filtered[column] <= Q3 + 1.5 * IQR)]

    return(df_filtered)

### Plot Average Distance To Stop

In [42]:
def plot_average_dist_to_stop(dataset1, dataset2, dataset3, num_states=None, title=None, type=None):
    """
    Plot the average 'DistToStop' for the top states across three datasets.

    Parameters:
    - dataset1, dataset2, dataset3: DataFrames, the three datasets to be compared
    - num_states: int or None, the number of top states to include in the plot
    - title: str or None, optional, the title of the plot
    - type: str or None, optional, a specific type to filter the data (e.g., 'Planned', 'Actual', 'Computed')

    Returns:
    - None (displays the plot)
    """

    # Combine all datasets
    all_datasets = pd.concat([dataset1, dataset2, dataset3], ignore_index=True)

    # Filter the data based on the specified type variable
    if type:
        all_datasets = all_datasets[all_datasets['Type'] == type]

    # Calculate the average 'DistToStop' for each state across all datasets
    avg_dist_to_stop_all = all_datasets.groupby('Jurisdiction')['DistToStop'].mean()

    # Get the top num_states states based on the highest means across all datasets
    top_states = avg_dist_to_stop_all.nlargest(num_states).index

    # Calculate the average 'DistToStop' for each dataset and state
    avg_dist_to_stop_1 = dataset1.groupby('Jurisdiction')['DistToStop'].mean()
    avg_dist_to_stop_2 = dataset2.groupby('Jurisdiction')['DistToStop'].mean()
    avg_dist_to_stop_3 = dataset3.groupby('Jurisdiction')['DistToStop'].mean()

    # Combine the data for plotting
    states_union = avg_dist_to_stop_1.index.union(avg_dist_to_stop_2.index).union(avg_dist_to_stop_3.index).intersection(top_states)

    values_1 = [avg_dist_to_stop_1.get(key, 0) for key in states_union]
    values_2 = [avg_dist_to_stop_2.get(key, 0) for key in states_union]
    values_3 = [avg_dist_to_stop_3.get(key, 0) for key in states_union]

     # Set the figure size
    plt.figure(figsize=(15, 6))
    # Create a bar graph with adjusted bar width and spacing
    bar_width = 0.15
    space_width = 0.1
    bar_positions_1 = range(len(states_union))
    bar_positions_2 = [pos + bar_width + space_width for pos in bar_positions_1]
    bar_positions_3 = [pos + 2*(bar_width + space_width) for pos in bar_positions_1]

    plt.bar(bar_positions_1, values_1, width=bar_width, label='Planned', color='red')
    plt.bar(bar_positions_2, values_2, width=bar_width, label='Actual', color='blue')
    plt.bar(bar_positions_3, values_3, width=bar_width, label='Computed', color='green')

    # Display values on top of bars
    for pos, value in zip(bar_positions_1, values_1):
        plt.text(pos + bar_width / 2 - 0.1, value, f'{value:.2f}', ha='center', va='bottom')

    for pos, value in zip(bar_positions_2, values_2):
        plt.text(pos + bar_width / 2 - 0.1, value, f'{value:.2f}', ha='center', va='bottom')

    for pos, value in zip(bar_positions_3, values_3):
        plt.text(pos + bar_width / 2 - 0.1, value, f'{value:.2f}', ha='center', va='bottom')

    plt.xlabel('State')
    plt.ylabel(f'Average DistToStop')
    plt.title(title if title else f'Average DistToStop for Top {num_states} States ({type} Type)')
    plt.xticks([pos + (bar_width + space_width) for pos in bar_positions_1], states_union)
    plt.legend()
    plt.show()