In [1]:
from geopy.distance import geodesic
import os
import geopandas as gpd
import pandas as pd
from shapely import wkt
from shapely.geometry import Point
from typing import List
from tqdm.auto import tqdm
import pickle
from multiprocessing import Pool, cpu_count
from joblib import parallel_backend

    I want to merge your df DataFrame, which contains information about charging stations, with the all_features_gdf GeoDataFrame that contains information about different kinds of amenities for different years. The goal is to enrich our charging station data with information about the surrounding amenities for each year.

    Here is a strategy to do this:

    1. Compute the distance between each charging station and all the amenities for a specific year.
    2. Compute the descriptive statistics (total count, average distance, minimum distance, maximum distance, etc.) for each type of amenity.
    3. Add these statistics as new columns to your charging stations DataFrame (df) for each year.

In [2]:
def load_gdfs():
    # Get the current working directory
    cwd = os.getcwd()

    # Define the relative paths to the data files
    all_features_gdf_path = os.path.join(cwd, '..', 'data', 'interim', 'all_features_gdf.csv')
    df_path = os.path.join(cwd, '..', 'data', 'interim', 'train_gdf_forward_geocoded.csv')

    # Load the dataframes
    all_features_df = pd.read_csv(all_features_gdf_path)
    df = pd.read_csv(df_path)

    # Convert 'geometry' column to geometry type
    all_features_df['geometry'] = all_features_df['geometry'].apply(wkt.loads)
    df['geometry'] = df['geometry'].apply(wkt.loads)

    # Convert the pandas DataFrames to GeoDataFrames
    all_features_gdf = gpd.GeoDataFrame(all_features_df, geometry='geometry')
    gdf = gpd.GeoDataFrame(df, geometry='geometry')
    
    return all_features_gdf, gdf

all_features_gdf, gdf = load_gdfs()


In [3]:
def load_gdfs():
    # Get the current working directory
    cwd = os.getcwd()

    # Define the relative paths to the data files
    df_path = os.path.join(cwd, '..', 'data', 'interim', 'train_gdf_forward_geocoded.csv')

    # Load the dataframes
    df = pd.read_csv(df_path)

    # Convert 'geometry' column to geometry type
    df['geometry'] = df['geometry'].apply(wkt.loads)

    # Convert the pandas DataFrames to GeoDataFrames
    gdf = gpd.GeoDataFrame(df, geometry='geometry')
    
    return gdf

gdf = load_gdfs()

In [4]:
all_features_gdf.head()

Unnamed: 0.1,Unnamed: 0,geometry,osm_year,fuel_station,parking_station,busstation_station,trainstation_station,mall_station,supermarket_station,restaurant_station,hotel_station,school_station,university_station,cinema_station,theatre_station
0,0,POINT (-4.34409 55.93436),2016,1.0,,,,,,,,,,,
1,1,POINT (-3.72159 56.69738),2016,1.0,,,,,,,,,,,
2,2,POINT (-3.16965 56.19748),2016,1.0,,,,,,,,,,,
3,3,POINT (-3.29724 55.89943),2016,1.0,,,,,,,,,,,
4,4,POINT (-3.28774 55.92326),2016,1.0,,,,,,,,,,,


In [5]:
gdf.head()

Unnamed: 0.2,Unnamed: 0.1,Unnamed: 0,CP ID,Connector,Total kWh,Site,Model,End DateTime,Site_encoded,Model_encoded,Total kWh.1,geometry
0,0,0,50994,1,2.084,Leslie Street Car Park,APT Triple Rapid Charger,2016-01-09 07:27:00,0.0,3.0,2.084,POINT (-3.33802 56.59132)
1,1,1,50281,2,3.87,"Rie-Achan Road Car Park, Pitlochry",APT 22kW Dual Outlet,2016-01-09 09:01:00,19.0,0.0,3.87,POINT (-3.73882 56.70342)
2,2,2,50285,1,13.93,Broxden Park & Ride,APT 22kW Dual Outlet,2016-01-09 14:32:00,3.0,0.0,13.93,POINT (-3.47775 56.38661)
3,3,3,50281,1,10.38,"Rie-Achan Road Car Park, Pitlochry",APT 22kW Dual Outlet,2016-01-09 16:37:00,19.0,0.0,10.38,POINT (-3.73882 56.70342)
4,4,4,50745,2,3.58,Kinross Park and Ride,APT Triple Rapid Charger,2016-01-09 09:37:00,13.0,3.0,3.58,POINT (-3.43295 56.20673)


First, define a function to compute distance between two geographical points (coordinates are in (latitude, longitude) format). Note that we'll use geopy library for this

In [6]:
def calculate_amenity_distances(gdf: gpd.GeoDataFrame, amenity_gdf: gpd.GeoDataFrame, amenity: str, year: int, index: int) -> pd.DataFrame:
    # Compute distances
    station_coord = gdf.loc[index, 'geometry']
    distances = amenity_gdf['geometry'].apply(
        lambda x: geodesic((station_coord.y, station_coord.x), (x.y, x.x)).km
    )

    # Compute statistics
    distances = distances.astype(float)
    stats = {
        f'{amenity}_count_{year}': len(distances),
        f'{amenity}_avg_distance_{year}': distances.mean(),
        f'{amenity}_min_distance_{year}': distances.min(),
        f'{amenity}_max_distance_{year}': distances.max()
    }

    return pd.DataFrame(stats, index=[index])

Then, also we need to iterate over all rows in the df DataFrame and for each row (charging station), we compute the summary statistics for each amenity in the all_features_gdf GeoDataFrame.

In [7]:
from joblib import Parallel, delayed

def calculate_row_amenity_stats(index: int, row: pd.Series, all_features_gdf: gpd.GeoDataFrame, amenities: list) -> pd.DataFrame:
    all_stats = []
    # Iterate over all years
    for year in range(2016, 2020):  # adjust this as per your data
        # print('year, :', year)
        # Extract amenities for the year
        year_amenities_gdf = all_features_gdf[all_features_gdf['osm_year'] == year]

        # Iterate over all types of amenities
        for amenity in amenities:
            # Extract the GeoDataFrame for the specific amenity
            amenity_gdf = year_amenities_gdf.dropna(subset=[amenity])

            # Calculate distances and statistics for the amenity
            stats = calculate_amenity_distances(row.to_frame().T, amenity_gdf, amenity, year, index)
            all_stats.append(stats)

    # Concatenate all stats dataframes
    return pd.concat(all_stats, axis=0)

# # Next, we need to create a function to compute the distances of a point to all amenities and return the summary statistics:
# def calculate_amenity_stats(gdf: gpd.GeoDataFrame, all_features_gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
#     # Define all types of amenities
#     amenities = ['fuel_station', 'parking_station', 'busstation_station', 'trainstation_station', 
#                     'mall_station', 'supermarket_station', 'restaurant_station', 'hotel_station', 
#                     'school_station', 'university_station', 'cinema_station', 'theatre_station']

#     # Use joblib to parallelize the calculation with tqdm for progress update
#     results = Parallel(n_jobs=-1)(delayed(calculate_row_amenity_stats)(index, row, all_features_gdf, amenities) for index, row in tqdm(gdf.iterrows(), total=gdf.shape[0]))

#     # Join the results with the original geodataframe
#     gdf = pd.concat([gdf, pd.concat(results, axis=0)], axis=1)

#     return gdf

In [8]:
# def worker(args):
#     index, row, all_features_gdf, amenities = args
#     return calculate_row_amenity_stats(index, row, all_features_gdf, amenities)

# def calculate_amenity_stats(gdf: gpd.GeoDataFrame, all_features_gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
#     amenities = ['fuel_station', 'parking_station', 'busstation_station', 'trainstation_station', 
#                     'mall_station', 'supermarket_station', 'restaurant_station', 'hotel_station', 
#                     'school_station', 'university_station', 'cinema_station', 'theatre_station']

#     start_index = load_last_saved_index()
#     checkpoint_interval = calculate_checkpoint_interval(gdf)

#     results = []
#     pool = Pool(processes=cpu_count())
#     for index, result in tqdm(enumerate(pool.imap(worker, [(index, row, all_features_gdf, amenities) for index, row in gdf.iterrows()]), start=start_index), total=gdf.shape[0]):
#         results.append(result)

#         if is_checkpoint(index, checkpoint_interval):
#             save_checkpoint(gdf, results, index)

#     pool.close()
#     pool.join()

#     return join_results(gdf, results)


# def load_last_saved_index():
#     cwd = os.getcwd()
#     index_file_path = os.path.join(cwd, '..', 'data', 'interim', 'distance', 'last_saved_index.pkl')
#     if os.path.exists(index_file_path):
#         with open(index_file_path, 'rb') as f:
#             return pickle.load(f)
#     return 0


# def calculate_checkpoint_interval(gdf):
#     return gdf.shape[0] // 100


# def is_checkpoint(index, interval):
#     return (index + 1) % interval == 0


# def save_checkpoint(gdf, results, index):
#     cwd = os.getcwd()
#     gdf_partial = pd.concat([gdf.loc[:index], pd.concat(results, axis=0)], axis=1)
#     gdf_partial.to_pickle(os.path.join(cwd, '..', 'data', 'interim', 'distance', f'gdf_distance_save_{index + 1}.pkl'))
#     save_last_saved_index(index)
#     results.clear()


# def save_last_saved_index(index):
#     cwd = os.getcwd()
#     with open(os.path.join(cwd, '..', 'data', 'interim', 'distance', 'last_saved_index.pkl'), 'wb') as f:
#         pickle.dump(index, f)


# def join_results(gdf, results):
#     return pd.concat([gdf, pd.concat(results, axis=0)], axis=1)

In [9]:
# import time

# start_time = time.time()

# from joblib import parallel_backend

# with parallel_backend('loky'):
#     gdf = calculate_amenity_stats(gdf, all_features_gdf)

# end_time = time.time()

# print(f"Total time: {end_time - start_time} seconds")

In [10]:
# This function calculates the amenity stats for a given row
# It is called in parallel by the multiprocessing pool
def worker(args):
    index, row, all_features_gdf, amenities = args
    result = calculate_row_amenity_stats(index, row, all_features_gdf, amenities)
    # Reset the index of the result before returning it
    result = result.reset_index(drop=True)
    return result

# This function calculates amenity stats for the entire GeoDataFrame
# It uses multiprocessing to speed up the process
def calculate_amenity_stats(gdf: gpd.GeoDataFrame, all_features_gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
    # Define the amenities we are interested in
    amenities = ['fuel_station', 'parking_station', 'busstation_station', 'trainstation_station', 
                    'mall_station', 'supermarket_station', 'restaurant_station', 'hotel_station', 
                    'school_station', 'university_station', 'cinema_station', 'theatre_station']

    # Load the last saved index to resume work in case of interruption
    start_index = load_last_saved_index()
    # Calculate the checkpoint interval for saving progress
    checkpoint_interval = calculate_checkpoint_interval(gdf)

    # Initialize an empty list to store the results
    results = []
    # Create a multiprocessing pool
    pool = Pool(processes=cpu_count())
    # Use the pool to process the data in parallel
    for index, result in tqdm(enumerate(pool.imap(worker, [(index, row, all_features_gdf, amenities) for index, row in gdf.iterrows()]), start=start_index), total=gdf.shape[0]):
        # Append the result to the results list
        results.append(result)

        # Save a checkpoint if necessary
        if is_checkpoint(index, checkpoint_interval):
            save_checkpoint(gdf, results, index)

    # Close the pool and wait for all processes to finish
    pool.close()
    pool.join()

    # Combine the results with the original GeoDataFrame
    return join_results(gdf, results)

# This function loads the last saved index from a pickle file
def load_last_saved_index():
    # Define the path to the pickle file
    cwd = os.getcwd()
    index_file_path = os.path.join(cwd, '..', 'data', 'interim', 'distance', 'last_saved_index.pkl')
    # If the file exists, load the index from it
    if os.path.exists(index_file_path):
        with open(index_file_path, 'rb') as f:
            return pickle.load(f)
    # If the file does not exist, return 0
    return 0

# This function calculates the interval at which checkpoints should be saved
# It is set to 1% of the total number of rows in the GeoDataFrame
def calculate_checkpoint_interval(gdf):
    return gdf.shape[0] // 100

# This function checks if a checkpoint should be saved at the current index
def is_checkpoint(index, interval):
    return (index + 1) % interval == 0

# This function saves a checkpoint by saving the current state of the GeoDataFrame and the results to a pickle file
def save_checkpoint(gdf, results, index):
    cwd = os.getcwd()
    # Combine the current state of the GeoDataFrame with the results so far
    gdf_partial = pd.concat([gdf.loc[:index], pd.concat(results, axis=0)], axis=1)
    # Save the combined data to a pickle file
    gdf_partial.to_pickle(os.path.join(cwd, '..', 'data', 'interim', 'distances', f'gdf_distance_save_{index + 1}.pkl'))
    # Save the current index to a separate pickle file
    save_last_saved_index(index)
    # Clear the results list to free up memory
    results.clear()

# This function saves the last saved index to a pickle file
def save_last_saved_index(index):
    cwd = os.getcwd()
    with open(os.path.join(cwd, '..', 'data', 'interim', 'distances', 'last_saved_index.pkl'), 'wb') as f:
        pickle.dump(index, f)

# This function combines the results with the original GeoDataFrame
def join_results(gdf, results):
    return pd.concat([gdf, pd.concat(results, axis=0)], axis=1)

# Main execution
if __name__ == "__main__":
    import time

    # Record the start time
    start_time = time.time()

    # Reset the index of the GeoDataFrame
    gdf = gdf.reset_index(drop=True)

    # Use the 'loky' backend for joblib
    with parallel_backend('loky'):
        # Calculate the amenity stats for the GeoDataFrame
        gdf = calculate_amenity_stats(gdf, all_features_gdf)

    # Record the end time
    end_time = time.time()

    # Print the total execution time
    print(f"Total time: {end_time - start_time} seconds")

  0%|          | 0/66664 [00:00<?, ?it/s]

InvalidIndexError: Reindexing only valid with uniquely valued Index objects