# fit_us_data-digest.ipynb

Key parts of the notebook `fit_us_data.ipynb` broken out for the purpose of demonstrating the use of Ray.

In [None]:
# Initialization boilerplate
import os
import json
import pandas as pd
import numpy as np
import scipy.optimize
from sklearn import metrics
import time
#from typing import List

import ipywidgets
import IPython.display

import text_extensions_for_pandas as tp

# Local file of utility functions
import util

# What precision of floating-point to use.
# Consider 32-bit if using GPU-accelerated solvers. Otherwise, 64-bit
# floating point is better because it reduces the chance of divergence.
fp_type = np.float64

# Allow environment variables to override data file locations.
_OUTPUTS_DIR = os.getenv("COVID_OUTPUTS_DIR", "outputs")
util.ensure_dir_exists(_OUTPUTS_DIR)  # create if necessary


_NUM_COUNTIES = 3081


# Code for displaying progress bars
def make_progress_bar():
    progress_bar = ipywidgets.IntProgress(
        0, 0, _NUM_COUNTIES, description="Fitting curves...",
        layout=ipywidgets.Layout(width="100%"),
        style={"description_width": "15%"})
    IPython.display.display(progress_bar)
    return progress_bar


def update_progress_bar(progress_bar: ipywidgets.IntProgress, num_done: int):
    progress_bar.value = num_done
    progress_bar.description = f"{num_done} of {_NUM_COUNTIES} counties"

# Load data

In [None]:
####################################################################################
# Load data.
dates_file = os.path.join(_OUTPUTS_DIR, "dates.feather")
cases_file = os.path.join(_OUTPUTS_DIR, "us_counties_clean.feather")
cases = pd.read_feather(cases_file).set_index("FIPS")
dates = pd.read_feather(dates_file)["date"].to_numpy()

####################################################################################
# Drop time series that are subject to aliasing.
alias_threshold = 100
ts_col_name = "Confirmed"

# We also cut off the sections at the beginning of the time 
# series where every time series' value is below this threshold.

# Find what point in the time series at least one county went above
# the threshold.
first_time_above_min = np.argmax(np.max(cases[ts_col_name].array, axis=0) >= alias_threshold)
print(f"Dropping the first {first_time_above_min} elements of each time series.")

# Find which counties have at least one time series value above the 
# threshold.
counties_mask = np.max(cases[ts_col_name].array, axis=1) >= alias_threshold

# Filter rows
filtered = cases[counties_mask].copy(deep=True)

# Truncate time series to just the times when at least one county
# was above our threshold.
filtered[ts_col_name] = filtered[ts_col_name].array[:,first_time_above_min:]

# Also filter the outlier masks
outlier_col_name = ts_col_name + "_Outlier"
filtered[outlier_col_name] = filtered[outlier_col_name].array[:,first_time_above_min:]

filtered_dates = dates[first_time_above_min:]

# Drop time series columns other than the one we analyze
series_to_keep = [ts_col_name, outlier_col_name]
metadata_cols = []
to_drop = []
for colname in filtered.columns:
    if not isinstance(filtered[colname].dtype, tp.TensorDtype):
        metadata_cols.append(colname)
    elif colname not in series_to_keep:
        to_drop.append(colname)

filtered = filtered.drop(columns=to_drop)

# Figure out how many values remain
series_len = filtered.iloc[0]["Confirmed"].to_numpy().shape[0]

filtered

# Define the parameters of the curve-fitting operation

In [None]:
def logistic_fn(x, var_values):
    max_, rate, offset = var_values
    # Y = max / (1 + e^(-rate *(X - offset))
    return max_ / (1.0 + np.exp(-rate * (x - offset)))


x_values = np.linspace(0, series_len - 1, series_len, dtype=fp_type)
def logistic_objective(var_values, y):
    return np.sum((y - (logistic_fn(x_values, var_values))) ** 2)

param_names = ("Max", "Rate", "Offset")
initial_guess = (1000.0, 0.1, 1.0)
logistic_bounds = ((0.0, 1e6), (0.0, 1.0), (0.0, float(len(filtered_dates))))

# Fit the logistic function to 3081 time series, *without* Ray

In [None]:
def fit_logistic(points: np.array):
    return scipy.optimize.minimize(
        logistic_objective, initial_guess,
        args=(points), bounds=logistic_bounds)


progress_bar = make_progress_bar()
start_time = time.time()

results = []
for time_series in filtered["Confirmed"]:
    results.append(fit_logistic(time_series.to_numpy()))
    update_progress_bar(progress_bar, len(results))

print(f"Fit {len(results)} time series in {time.time() - start_time:0.1f} seconds")

# Fit the logistic function to 3081 time series, *with* Ray¶

In [None]:
import ray
ray.init(num_cpus=32, resources={"ram": 64})

In [None]:
@ray.remote(num_cpus=4, resources={"ram": 8})
def fit_logistic(points: np.array):
    return scipy.optimize.minimize(
        logistic_objective, initial_guess,
        args=(points), bounds=logistic_bounds)

progress_bar = make_progress_bar()
start_time = time.time()

results = []
futures = []
for time_series in filtered["Confirmed"]:
    futures.append(fit_logistic.remote(time_series.to_numpy()))
    
while len(results) < _NUM_COUNTIES:
    results.append(ray.wait(futures, num_returns=1, timeout=None))
    update_progress_bar(progress_bar, len(results))

print(f"Fit {len(results)} time series in {time.time() - start_time:0.1f} seconds")

In [None]:
ray.shutdown()