## Imports

In [178]:
import numpy as np
import random
import threading
from collections import OrderedDict
import pandas as pd
import time
import os
import geopandas as gpd
from pathlib import Path
import concurrent.futures
import matplotlib.pyplot as plt
import pickle
from multiprocessing import cpu_count
from typing import Any, Dict, List, Tuple
from libpysal.weights import Kernel
from collections import defaultdict
import random


## Modules

In [179]:
def linear_combinations(step: float) -> list[list[Any]]:
    """_summary_

    Args:
        step (int): _description_

    Returns:
        List[List[Any]]: _description_
    """
    combinations = []
    for i in np.arange(0, 1 + step, step):
        for j in np.arange(0, 1 - i + step, step):
            k = 1.0 - i - j
            i_rounded, j_rounded, k_rounded = round(i, 2), round(j, 2), round(k, 2)

            # Check if k_rounded is within valid range and the summation equals 1.0
            if 0 <= k_rounded <= 1 and i_rounded + j_rounded + k_rounded == 1.0:
                combinations.append([i_rounded, j_rounded, k_rounded])

    return combinations

In [180]:
def order_results(simulator_results):
    x, y, z, values = [], [], [], []
    for weight, avg_free in simulator_results.items():
        x.append(weight[0])
        y.append(weight[1])
        z.append(weight[2])
        values.append(avg_free)

    return x, y, z, values


In [181]:
def calculate_results(simulator_results):
    (
        x,
        y,
        z,
        values,
    ) = order_results(simulator_results)
    # Find the index of the maximum average free requests
    max_index = np.argmax(values)

    # Retrieve the corresponding weight combination
    optimal_weights = (x[max_index], y[max_index], z[max_index])
    max_free_requests = values[max_index]
    return optimal_weights, max_free_requests


## Classes

In [182]:
class CombinationCache:
    def __init__(self, capacity: int, expiration_time=10, prepopulate=False):
        self.cache = OrderedDict()
        self.capacity = capacity
        self.expiration_time = expiration_time
        self.lock = threading.Lock()
        if prepopulate:
            self.prepopulate_cache()

    def get(self, key: int) -> int:
        if key not in self.cache:
            return -1
        else:
            return self.cache[key][0]

    def put(self, keys: List[int]) -> None:
        with self.lock:
            # Update the counter of existing items
            keys_to_delete = []
            for k, (value, count) in self.cache.items():
                if count <= 1:
                    keys_to_delete.append(k)
                else:
                    self.cache[k] = (value, count - 1)

            # Remove items whose counter has reached zero
            for k in keys_to_delete:
                del self.cache[k]

            for key in keys:
                # Check if the cache is already full
                if len(self.cache) >= self.capacity:
                    # Remove the least recently used item from the cache
                    self.cache.popitem(last=False)

                # Put the new items in the cache with a counter of expiration_time
                self.cache[key] = (key, self.expiration_time)
                self.cache.move_to_end(key)

    def prepopulate_cache(self) -> None:
        keys = random.sample(range(886), self.capacity)
        for key in keys:
            self.cache[key] = (key, self.expiration_time)

    def current_state(self) -> list[Any]:
        return list(self.cache.keys())


In [183]:
class TimeCache:
    def __init__(self, expiration_time):
        self.cache = OrderedDict()
        self.expiration_time = expiration_time

    def get(self, key: int) -> int:
        if key not in self.cache:
            return -1
        else:
            return self.cache[key][0]

    def put(self, keys: List[int]) -> None:
        # Update the counter of existing items
        keys_to_delete = []
        for k, (value, count) in self.cache.items():
            if count <= 1:
                keys_to_delete.append(k)
            else:
                self.cache[k] = (value, count - 1)

        # Remove items whose counter has reached zero
        for k in keys_to_delete:
            del self.cache[k]

        # Put the new items in the cache with a counter of expiration_time
        for key in keys:
            self.cache[key] = (key, self.expiration_time)

    def current_state(self) -> list[Any]:
        return list(self.cache.keys())


In [184]:
class LRUCache:
    def __init__(self, capacity: int, prepopulate=False):
        self.cache = OrderedDict()
        self.capacity = capacity
        if prepopulate:
            self.prepopulate_cache()

    def get(self, key: int) -> int:
        if key not in self.cache:
            return -1
        else:
            return self.cache[key]

    def put(self, keys: List[int]) -> None:
        for key in keys:
            # Check if the cache is already full
            if len(self.cache) >= self.capacity:
                # Remove the least recently used item from the cache
                self.cache.popitem(last=False)

            # Add the new key or update the existing key, and move it to the end
            self.cache[key] = key
            self.cache.move_to_end(key)

    def prepopulate_cache(self) -> None:
        keys = random.sample(range(886), self.capacity)
        for key in keys:
            self.cache[key] = key

    def current_state(self) -> list[Any]:
        return list(self.cache.keys())

In [185]:
class MonteCarloSimulation:
    def __init__(
        self,
        weights: list[float],
        num: int,
        cache_type,
        param,
        counties_gdf,
        states_gdf,
        divisions_gdf,
        prepopulate_cache: bool = True,
        return_type: str = "requests",
    ) -> None:
        """_summary_

        Args:
            weights (list[Any]): _description_
            num (int): _description_
            hot_layer_constraint (_type_): _description_
            preload_data (bool, optional): _description_. Defaults to False.
        """
        self.states_gdf = states_gdf
        self.divisions_gdf = divisions_gdf
        self.counties_gdf = counties_gdf

        self.weights = weights
        self.num = num
        self.return_type = return_type
        if cache_type == "LRUCache":
            self.cache = LRUCache(param, prepopulate_cache)
        elif cache_type == "TimeCache":
            self.cache = TimeCache(param)
        elif cache_type == "CombinationCache":
            self.cache = CombinationCache(param, prepopulate=prepopulate_cache)
        else:
            raise ValueError("Invalid cache type. Use 'LRUCache' or 'TimeCache'.")
        self.load_data()
        self.load_stochastic_probabilities()

    def load_dict_from_file(self, filename: str) -> Dict:
        current_dir = Path.cwd()
        data_dicts = (Path(current_dir) / "dictionaries").resolve()
        """Load dictionary from a file."""
        with Path.open(data_dicts / filename, "rb") as f:
            return pickle.load(f)

    def load_csv_from_file(self, filename):
        current_dir = Path.cwd()
        stochastic_paths = (Path(current_dir) / "stochastic_paths").resolve()
        with Path.open(stochastic_paths / filename, mode="rb") as f:
            return pd.read_csv(f)

    def load_data(self) -> None:
        """Load data from the pickled dictionaries."""
        self.regions_data = self.load_dict_from_file("divisions_mapping.pkl")
        self.states_data = self.load_dict_from_file("states_mapping.pkl")
        self.counties_data = self.load_dict_from_file("counties_mapping.pkl")

    def load_stochastic_probabilities(self):
        self.counties_stochastic = self.load_csv_from_file("counties_stochastic.csv")
        self.states_stochastic = self.load_csv_from_file("states_stochastic.csv")
        self.divisions_stochastic = self.load_csv_from_file("divisions_stochastic.csv")

    def fetch_data(self, data: dict, count: int) -> dict[Any, Any]:
        """Fetch a subset of items from a dictionary.

        Args:
            data (dict): Source dictionary.
            count (int): Maximum number of items to retrieve.

        Returns:
            dict: A subset of the dictionary.
        """
        # Since dictionaries are unordered, we'll convert the dictionary to a list of items,
        # take the first 'count' items, and then convert it back to a dictionary.
        return dict(list(data.items())[:count])

    def monte_carlo_simulation(self, num_runs: int) -> list:
        """Execute the Monte Carlo simulation for a specified number of runs using parallel threads.

        Args:
            num_runs (int): _description_

        Returns:
            float: _description_
        """

        with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count()) as executor:
            # Use a loop to run the simulation num_runs times
            futures = [executor.submit(self.run_sim) for _ in range(num_runs)]

            results = []
            for _i, f in enumerate(concurrent.futures.as_completed(futures), 1):
                free_requests, _ = f.result()  # Extract only the free_requests_count
                results.append(free_requests)

        return results

    def kernelweights(self, gdf, k=15, steps=100, min_lag=3, max_lag=10):
        kernel_weights = Kernel.from_dataframe(
            gdf, k=k, fixed=False, function="gaussian"
        )
        num_entries = len(gdf)
        base_prob = 1e-6
        prob_matrix = np.full((num_entries, steps), base_prob)
        influence_decay = defaultdict(lambda: np.zeros(num_entries))

        for t in range(steps):
            # Apply decay from previous steps to the current base probability matrix
            current_weights = np.full(
                num_entries, base_prob
            )  # Start with base probability each step

            if t in influence_decay:
                # Add the decay amounts to the base probabilities
                current_weights += influence_decay[t]

            selected_index = np.random.randint(num_entries)
            weights = kernel_weights.sparse[selected_index, :].toarray().flatten()
            weights += base_prob  # Ensure no zero probabilities

            # Calculate decay for future periods
            lag_duration = random.randint(min_lag, max_lag)
            decay_amount = (
                weights - base_prob
            ) / lag_duration  # Only the excess probability decays

            for future_t in range(1, lag_duration + 1):
                if t + future_t < steps:
                    influence_decay[t + future_t] += decay_amount

            # Update the current step's probabilities with the freshly calculated weights
            prob_matrix[:, t] = (
                current_weights + weights
            )  # Add the weights to current weights
            prob_matrix[:, t] /= prob_matrix[
                :, t
            ].sum()  # Normalize probabilities to sum to 1

        return prob_matrix

    def run_sim(self) -> Tuple[int, List[Any]]:
        """Execute the simulation.

        Returns:
            Tuple[int, List[Any]]: Tuple containing total count of free requests and history.
        """
        free_scenes = 0
        total_scenes = 0
        free_requests = 0
        history: List[Any] = []
        counties_stochastic = self.kernelweights(self.counties_gdf, k=45, steps=100)
        states_stochastic = self.kernelweights(self.states_gdf, k=3, steps=100)
        divisions_stochastic = self.kernelweights(self.divisions_gdf, k=2, steps=100)

        # Fetch subset of data
        regions_subset = self.fetch_data(self.regions_data, 9)
        states_subset = self.fetch_data(self.states_data, 49)
        counties_subset = self.fetch_data(self.counties_data, 4437)

        for i in range(self.num):
            scale = np.random.choice(["regions", "states", "counties"], p=self.weights)
            if scale == "regions":
                data = regions_subset
                probabilities = self.divisions_stochastic.iloc[:, i].values
            elif scale == "states":
                data = states_subset
                probabilities = self.states_stochastic.iloc[:, i].values
            else:
                data = counties_subset
                probabilities = self.counties_stochastic.iloc[:, i].values
            # Ensure the keys in the data dictionary are in the same order as the DataFrame columns
            feature_ids = list(data.keys())
            feature_probabilities = probabilities[
                : len(feature_ids)
            ]  # Adjust in case of size mismatch

            # Normalize probabilities
            feature_probabilities = feature_probabilities / feature_probabilities.sum()

            # Use the normalized probabilities to pick a feature_id
            feature_id = np.random.choice(feature_ids, p=feature_probabilities)
            landsat_scenes = data[feature_id]
            moved_to_hot = False

            for scene in landsat_scenes:
                total_scenes += 1
                if self.cache.get(scene) != -1:  # is found
                    free_scenes += 1
                elif self.cache.get(scene) == -1:  # is not found
                    moved_to_hot = True

            if not moved_to_hot:
                free_requests += 1
            self.cache.put(landsat_scenes)
            history.append(self.cache.current_state())

        free_ratio = free_scenes / total_scenes if total_scenes > 0 else 0

        if self.return_type == "ratio":  # type: ignore
            return free_ratio, history  # type: ignore
        elif self.return_type == "requests":  # type: ignore
            return free_requests, history  # type: ignore
        elif self.return_type == "scenes":  # type: ignore
            return free_scenes, history  # type: ignore
        else:
            raise ValueError("Invalid return type specified")

## More Modules

In [186]:
def run_simulation(
    num_requests,
    weights,
    cache_type,
    parameters_list,
    num_runs,
    prepopulate_cache,
    return_type,
):
    # Define the base directory where your data is located
    current_dir = Path.cwd()
    print(current_dir)
    data = (Path(current_dir) / "data").resolve()
    # Paths for the shapefiles
    divisions_path = os.path.join(data, "USA_Divisions", "usa_divisions.shp")
    counties_path = os.path.join(data, "USA_Counties", "usa_counties.shp")
    states_path = os.path.join(data, "USA_States", "usa_states.shp")

    # Reading the shapefiles using geopandas
    divisions = gpd.read_file(divisions_path)
    counties = gpd.read_file(counties_path)
    states = gpd.read_file(states_path)

    counties_df = pd.DataFrame()
    counties_df["FIPS"] = counties["FIPS"]
    counties_df["geometry"] = counties["geometry"]

    states_df = pd.DataFrame()
    states_df["FIPS"] = states["STATEFP"]
    states_df["geometry"] = states["geometry"]

    divisions_df = pd.DataFrame()
    divisions_df["FIPS"] = divisions.index
    divisions_df["geometry"] = divisions["geometry"]
    states_gdf = gpd.GeoDataFrame(states_df, geometry="geometry")
    counties_gdf = gpd.GeoDataFrame(counties_df, geometry="geometry")
    divisions_gdf = gpd.GeoDataFrame(divisions_df, geometry="geometry")
    states_gdf.crs = "EPSG:4326"
    divisions_gdf.crs = "EPSG:4326"
    counties_gdf.crs = "EPSG:4326"

    parameter_results = {}
    for ijx, parameter in enumerate(parameters_list, start=1):
        parameter = int(parameter)
        wstart_time = time.time()
        simulator = MonteCarloSimulation(
            num=num_requests,
            weights=weights,
            cache_type=cache_type,
            param=parameter,
            counties_gdf = counties_gdf,
            states_gdf = states_gdf,
            divisions_gdf = divisions_gdf,
            prepopulate_cache=prepopulate_cache,
            return_type=return_type
        )
        results = simulator.monte_carlo_simulation(num_runs)
        average_free_requests = sum(results) / num_runs
        free_request_std = np.std(np.array(results))
        free_request_sem = free_request_std / np.sqrt(num_runs)
        parameter_results[parameter] = [average_free_requests, free_request_sem]

        print(
            f"      Parameter Analysis {parameter} completed in {(time.time() - wstart_time):.2f} seconds"
        )

        print(f"      Mean requests {average_free_requests}")
        print(f"      variance of requests {free_request_std}")
        print("------------------------------------------\n")

    print("------------------------------------------\n")

    return parameter_results

In [187]:
def run_analysis(weights) -> dict:
    """
    This function runs the analysis for the Monte Carlo Simulation.
    """
    num_requests = 100
    weights = weights
    cache_type = "LRUCache"
    parameters_list = np.linspace(1, 886, 20)
    init_time = time.time()
    num_runs = 100
    prepopulate_cache = True
    return_type = "requests"

    simulator_results = run_simulation(
        num_requests=num_requests,
        weights=weights,
        cache_type=cache_type,
        parameters_list=parameters_list,
        num_runs=num_runs,
        prepopulate_cache=prepopulate_cache,
        return_type=return_type,
    )
    print(f"Analysis completed in {(time.time() - init_time):.2f} seconds")

    return simulator_results  # type: ignore


In [188]:
%matplotlib inline

In [189]:
def plot_single_sim(weights):
    results = run_analysis(weights)
    x = np.array(list(results.keys())) / 8.6
    values = list(results.values())
    y = np.array([val[0] for val in values])
    yerr = np.array([val[1] for val in values])

    plt.figure(figsize=(10, 6))
    plt.errorbar(
        x,
        y,
        yerr=yerr,
        fmt="o",
        color="b",
        ecolor="lightgray",
        elinewidth=3,
        capsize=5,
        capthick=2,
        markersize=4,
    )
    # Calculate coefficients of the line of best fit
    coefficients = np.polyfit(x, y, 2)  # 1 means linear (degree of the polynomial)
    # Create a polynomial from coefficients, representing the line of best fit
    polynomial = np.poly1d(coefficients)

    # Generate y-values for the line of best fit based on x-values
    y_fit = polynomial(x)
    # Plot the line of best fit
    plt.plot(x, y_fit, "-", color="red")  # '-' specifies a solid line
    y_mean = np.mean(y)
    ss_tot = np.sum((y - y_mean) ** 2)  # Total sum of squares
    ss_res = np.sum((y - y_fit) ** 2)  # Residual sum of squares
    r_squared = 1 - (ss_res / ss_tot)

    # Add R^2 as text on the plot
    plt.text(
        0.05,
        0.95,
        f"$R^2 = {r_squared:.3f}$",
        transform=plt.gca().transAxes,
        fontsize=12,
    )
    plt.xticks(range(0, 100, 5))
    plt.yticks(range(0, 100, 5))
    plt.grid(True, which="both", linestyle="--", linewidth=0.5)
    plt.title("Even Weight with Order 2 Line of Best Fit")
    plt.ylabel("Percent of Free Requests")
    plt.xlabel("Percent of Hot Layer in Relation to Cold Layer")
    plt.show()


In [190]:
weights = [0.33, 0.33, 0.34]
plot_single_sim(weights)

c:\Users\jackr\dev\Hot-Cold-Simulation
