# Setup


In [None]:
import geopandas as gpd
import pandas as pd
import numpy as np
import csv as csv
import requests
from requests.exceptions import RequestException
from concurrent.futures import ThreadPoolExecutor, as_completed
from io import StringIO
import os
import time


## Data Preparation

In [None]:
# import prison boundaries as shapefile from Department of Homeland Security
prisonsRaw = gpd.read_file('https://opendata.arcgis.com/api/v3/datasets/2d6109d4127d458eaf0958e4c5296b67_0/downloads/data?format=geojson&spatialRefId=4326&where=1%3D1')

# Load the clean list of prisons from a CSV file, ensuring 'FACILITYID' is read as a string
prisonsClean = pd.read_csv('https://raw.githubusercontent.com/callmeufu/prison_environmental_justice/main/prison_datasets/state_fed_prisons.csv', dtype={'FACILITYID': str})
# prisonsClean = pd.read_csv('../prison_datasets/state_fed_prisons.csv', dtype={'FACILITYID': str})

# Ensure 'FACILITYID' in the raw prisons data is treated as a string
prisonsRaw['FACILITYID'] = prisonsRaw['FACILITYID'].astype(str)

# Filter the raw prisons data to include only those records with 'FACILITYID' present in the clean list
filtered_prisons = prisonsRaw[prisonsRaw['FACILITYID'].isin(prisonsClean['FACILITYID'])]

# Create a deep copy of the filtered prisons data for further processing
prisonsFinal = filtered_prisons.copy(deep=True)

In [None]:
# Reproject the dataset to the EPSG:2163 coordinate reference system
# Calculating centroids in a projected coordinate system ensures that 
# the centroids are calculated in a planar coordinate system, which is 
# more accurate than calculating them in a geographic coordinate system.

prisons_projected = prisonsFinal.to_crs(epsg=2163)

# Calculate the centroids of the reprojected geometries
centroids_projected = prisons_projected.geometry.centroid

# Create a new GeoDataFrame containing the original attributes and the computed centroids
prisons_centroids = gpd.GeoDataFrame(prisonsFinal[['FID', 'FACILITYID', 'NAME', 'ADDRESS', 'CITY', 'STATE', 'ZIP', 'ZIP4',
       'TELEPHONE', 'TYPE', 'STATUS', 'POPULATION', 'COUNTY', 'COUNTYFIPS',
       'COUNTRY', 'NAICS_CODE', 'NAICS_DESC', 'SOURCE', 'SOURCEDATE',
       'VAL_METHOD', 'VAL_DATE', 'WEBSITE', 'SECURELVL', 'CAPACITY',
       'SHAPE_Leng', 'GlobalID', 'CreationDate', 'Creator', 'EditDate',
       'Editor', 'SHAPE_Length', 'SHAPE_Area', 'geometry']], geometry=centroids_projected)

# Reproject the centroids GeoDataFrame back to WGS84 (EPSG 4326), the default CRS of daymet for coordinate queries 
prisons_centroids = prisons_centroids.to_crs(epsg=4326)

## Daymet Setup

In [None]:
# --- GLOBAL VARIABLES
# variables to be used to compute date range
first_day_summer = '-06-01'
last_day_summer = '-08-31'
begin_year = 1990
end_year = 2023

# variables that are used between functions
prisons_shapes = None
time_series = None
csv_writer = None

# --- FUNCTIONS

# Function to convert Celsius to Fahrenheit
# Input: single numerical value in Celsius
# Output: single numerical value in Fahrenheit
def celsius_to_fahrenheit(celsius):
   fahrenheit = (1.8 * celsius) + 32
   return fahrenheit

# Function to calculate the mean temperature of the day in Fahrenheit
# Input: day_tmin -> list of minimum daily temperatures in Celsius
#        day_tmax -> list of maximum daily temperatures in Celsius
# Output: mean daily temperature in Fahrenheit
def calc_mean(day_tmin, day_tmax):
    # Convert tmin and tmax to Fahrenheit and calculate the mean
    current_tmean = (celsius_to_fahrenheit(min(day_tmin[0])) + celsius_to_fahrenheit(min(day_tmax[0]))) / 2
    return current_tmean

# Function to estimate the number of extreme heat days
# Input: daily_stat -> single numerical value, daily temperature
#        threshold_temp -> single numerical value, threshold temperature
# Output: 1 if daily_stat is above threshold_temp, otherwise 0
def estimate_extreme_heat(daily_stat, threshold_temp):
    # Check if daily_stat is above threshold_temp
    above_threshold = 1 if daily_stat > threshold_temp else 0
    return above_threshold

# Function to calculate the number of heatwaves based on daily maximum temperatures
# heatwave is defined as a period where the daily maximum temperature exceeds the 
# 90th percentile of maximum temperatures for the given period. 
# Input: data -> DataFrame with daily temperature data, must include 'tmax' column
#        percentile_90th -> 90th percentile value of daily maximum temperatures
# Output: heatwave1 -> count of single-day heatwaves
#         heatwave2 -> count of two-day heatwaves
#         heatwave3 -> count of three-or-more-day heatwaves
def calculate_heatwaves(data, percentile_90th):
    # Determine which days have maximum temperatures above the 90th percentile
    above_90 = data['tmax'] > percentile_90th

    # Initialize counts for different heatwave lengths
    heatwave1, heatwave2, heatwave3 = 0, 0, 0
    current_streak = 0  # Counter for current streak of consecutive days above 90th percentile

    # Iterate over each day in the group
    for i in range(len(data)):
        if above_90.iloc[i]:  # If the temperature is above the 90th percentile
            current_streak += 1  # Increment the streak counter
        else:  # If the temperature is not above the 90th percentile
            # Count the completed heatwave based on its length
            if current_streak == 1:
                heatwave1 += 1
            elif current_streak == 2:
                heatwave2 += 1
            elif current_streak >= 3:
                heatwave3 += 1
            current_streak = 0  # Reset the streak counter

    # Handle the case where the last day(s) form a heatwave
    if current_streak == 1:
        heatwave1 += 1
    elif current_streak == 2:
        heatwave2 += 1
    elif current_streak >= 3:
        heatwave3 += 1

    return heatwave1, heatwave2, heatwave3

# Function to retrieve and process heat statistics for a given prison over a range of years
# Input: prison_index -> index of the prison in the GeoDataFrame
#        prisons_shapes -> GeoDataFrame containing prison geometries, in this case lat/lon
#        begin_year -> start year for the data retrieval
#        end_year -> end year for the data retrieval
#        max_retries -> maximum number of retries for HTTP requests
#        retry_delay -> delay between retries in seconds
# Output: DataFrame containing processed heat statistics for the specified range of years
def get_heat_stats(prison_index, prisons_shapes, begin_year, end_year, max_retries=15, retry_delay=20):
    all_year_results = []

    # Get the geometry (coordinates) of the specified prison
    coords = prisons_shapes.iloc[prison_index].geometry
    lat, lon = coords.y, coords.x

    # Loop over each year in the specified range
    for year in range(begin_year, end_year + 1):
        retries = 0  # Initialize retry counter
        while retries <= max_retries:
            
            # Define the start and end dates to iterate across
            start_date = f"{year}{first_day_summer}"
            end_date = f"{year}{last_day_summer}"
            
            # Construct the URL for the API request
            # For each prison and date, grab tmax, tmin, and vp
            url = f"https://daymet.ornl.gov/single-pixel/api/data?lat={lat}&lon={lon}&vars=tmax,tmin,vp&start={start_date}&end={end_date}"
            
            try:
                # Make the API request
                response = requests.get(url, timeout=20)

                if response.ok:
                    # Split the response text into lines
                    lines = response.text.split('\n')

                    # Find the index of the line where the CSV data starts (where the header 'year' is found)
                    start_line = next((i for i, line in enumerate(lines) if line.startswith('year')), None)

                    # Ensure that the start_line is valid and there are lines following the header
                    if start_line is not None and start_line < len(lines) - 1:
                        
                        # Extract the CSV data from the response starting from the header line
                        csv_data = '\n'.join(lines[start_line:])

                        # Read the CSV data into a DataFrame
                        data = pd.read_csv(StringIO(csv_data))

                        # Convert temperature values from Celsius to Fahrenheit
                        data['tmax'] = data['tmax (deg c)'].apply(celsius_to_fahrenheit)
                        data['tmin'] = data['tmin (deg c)'].apply(celsius_to_fahrenheit)

                        # Extract daily maximum temperatures and calculate daily mean temperatures
                        daily_maxs = data['tmax'].tolist()
                        daily_means = data[['tmax', 'tmin']].mean(axis=1).tolist()
                        
                        # Calculate summer statistics
                        summer_temp_mean = np.mean(daily_means)
                        summer_tmax_mean = np.mean(daily_maxs)
                        summer_90th_percent = np.percentile(daily_maxs, 90)
                        count_days_above_10F = sum(1 for mean in daily_means if estimate_extreme_heat(mean - summer_temp_mean, 10))
                        count_days_above_85F = sum(1 for mean in daily_means if estimate_extreme_heat(mean, 85))

                        # Calculate heatwaves using the precomputed 90th percentile
                        heatwave1, heatwave2, heatwave3 = calculate_heatwaves(data, summer_90th_percent)

                        all_year_results.append({
                            'FACILITYID': prisons_shapes.iloc[prison_index]['FACILITYID'],
                            'NAME': prisons_shapes.iloc[prison_index]['NAME'],
                            'STATE': prisons_shapes.iloc[prison_index]['STATE'],
                            'YEAR': year,
                            'SUMMER_TEMP_MEAN': summer_temp_mean,
                            'SUMMER_TMAX_MEAN': summer_tmax_mean,
                            'SUMMER_90TH_PERCENT': summer_90th_percent,
                            'COUNT_DAYS_ABOVE_10F': count_days_above_10F,
                            'COUNT_DAYS_ABOVE_85F': count_days_above_85F,
                            'HEATWAVE1': heatwave1,
                            'HEATWAVE2': heatwave2,
                            'HEATWAVE3': heatwave3,
                        })
                    else:
                        print(f"Unexpected data format for year {year} at index {prison_index}.")
                    break  # Successful request, break the retry loop
                else:
                    print(f"Request failed for {start_date} to {end_date} at index {prison_index} with status code {response.status_code}. URL: {url}")
                    break  # Non-retryable error (e.g., bad request), break the retry loop

            except RequestException as e:
                print(f"Timeout or RequestException for {year} at index {prison_index}: {e}.")
                retries += 1
                if retries > max_retries:
                    print("Max retries exceeded, moving to next year.")
                    break  # Max retries exceeded, move to the next year
                else:
                    print("Retrying...")
                    time.sleep(retry_delay)  # Wait before retrying

            except pd.errors.ParserError:
                print(f"ParserError: Error parsing CSV data for year {year} at index {prison_index}. URL: {url}")
                break  # No retry for parsing errors

            except Exception as e:
                print(f"Unexpected error processing data for year {year} at index {prison_index}: {e}. URL: {url}")
                break  # No retry for unexpected errors

    return pd.DataFrame(all_year_results)

## Data Processing

In [None]:
# Function to split a DataFrame into smaller chunks
# Input: df -> DataFrame to be split
#        chunk_size -> number of rows per chunk
# Output: List of DataFrame chunks
def split_dataframe(df, chunk_size):
    chunks = [df[i:i + chunk_size] for i in range(0, df.shape[0], chunk_size)]
    return chunks

# Function to process a dataset chunk and retrieve heat statistics
# Input: dataset -> DataFrame chunk to be processed
#        start_year -> start year for the data retrieval
#        end_year -> end year for the data retrieval
#        area_name -> name of the area being processed (for logging)
# Output: DataFrame with processed heat statistics for the chunk
def process_dataset(dataset, start_year, end_year, area_name):
    print(f"Processing chunk with {len(dataset)} records.")  # Log processing start of a chunk
    results = []
    with ThreadPoolExecutor(max_workers=4) as executor: # Use ThreadPoolExecutor to parallelize processing
        # Submit get_heat_stats tasks to the executor for each index in the dataset
        future_to_index = {executor.submit(get_heat_stats, idx, dataset, start_year, end_year): idx for idx in range(len(dataset))}
        for future in as_completed(future_to_index):
            result = future.result()
            if result is not None:
                results.append(result)
            else:
                # Handle None result if necessary
                pass
    # Concatenate results into a single DataFrame
    df = pd.concat(results, ignore_index=True)
    return df

# Split the prisons_centroids DataFrame into 10 chunks
prisons_centroids_chunks = split_dataframe(prisons_centroids, chunk_size=np.ceil(len(prisons_centroids) / 15).astype(int))

processed_chunks = []
for chunk_idx, chunk in enumerate(prisons_centroids_chunks):
    # Log processing of each chunk
    print(f"Processing chunk {chunk_idx+1}/{len(prisons_centroids_chunks)}")
    processed_chunk = process_dataset(chunk, begin_year, end_year, area_name="Complete_Dataset")
    processed_chunks.append(processed_chunk)

# Combine all processed chunks into a single DataFrame
combined_data = pd.concat(processed_chunks, ignore_index=True)
# Save the combined DataFrame to a CSV file
combined_data.to_csv('./Output/combined_processed_prisons.csv')
# combined_data.to_csv(os.path.join(PATH_MAIN,'Output/processed_prisons_1990_2023.csv'))

In [None]:
# Define states for the study
study_area = ['AZ', 'WA', 'MA', 'CA', 'FL']

# Filter the DataFrame to include only the rows where 'STATE' is in the study area list
filtered_data = combined_data[combined_data['STATE'].isin(study_area)]

# Save the filtered study area data to a new CSV
filtered_data.to_csv('./Output/study_area.csv', index=False)