In [1]:
import sys
import pathlib
import pandas as pd
import numpy as np
import swifter
import dask
from pandarallel import pandarallel

 #multiprocess works better within Jupyter Notebooks than multiprocessing package
from multiprocess import Pool
from multiprocess import cpu_count
from functools import partial


ROOT = pathlib.Path().absolute().parent.as_posix()
if ROOT not in sys.path:
    sys.path.append(ROOT)
    
from helpers import *

In [2]:

# npartitions : Integer. The number of partitions to distribute the data into for dask processing. Default: 2*cpu_count()

# dask_threshold : Float. The amount of seconds to use for estimating whether to use dask or pandas apply. Default: 1 second

# scheduler : String. Whether to use threads or processes for the dask scheduler Default: processes

# progress_bar : Boolean. Whether to turn the progress bar on or off. Default: True

# progress_bar_desc : String. Progress Bar Description Default: None

# allow_dask_on_strings : Boolean. Allows user to enable dask parallel processing on string data Default: False

# force_parallel : Boolean. Allows user to override swifter algorithm and jump straight to using dask processing Default: False


from swifter import set_defaults
set_defaults(
    npartitions=None,
    dask_threshold=1,
    scheduler="processes",
    progress_bar=True,
    progress_bar_desc=None,
    allow_dask_on_strings=False,
    force_parallel=False,
)

In [3]:
points_df_filled = pd.read_csv(ROOT + '/Spikes/Dash/data/points_df.csv', index_col = 0)

def parallelize(data, func, num_of_processes=cpu_count()):
    data_split = np.array_split(data, num_of_processes)
    pool = Pool(num_of_processes)
    data = pd.concat(pool.map(func, data_split))
    pool.close()
    pool.join()
    return data

def run_on_subset(func, data_subset):
    return data_subset.apply(func, axis=1)

#replace df.apply(some_func, axis=1) with parallelize_on_rows(df, some_func) 
def parallelize_on_rows(data, func, num_of_processes=8):
    return parallelize(data, partial(run_on_subset, func), num_of_processes)


def apply_aq_functions(points_df_filled):
    #molar mass constants
    co_molar_mass = 28.01
    no2_molar_mass = 46.0055
    o3_molar_mass = 48
    so2_molar_mass = 64.066

    #apply aq functions to each row (using latitude and longitude columns) and multiply by associated molar mass to give g/m2
    #axis = 1, apply function to each row
    
    points_df_filled['Value_co'] = points_df_filled.apply(lambda row : co_function(row['Latitude'], row['Longitude']) * co_molar_mass, axis=1)
    print('co_function complete')
    points_df_filled['Value_no2'] = points_df_filled.apply(lambda row : no2_function(row['Latitude'], row['Longitude']) * no2_molar_mass, axis=1)
    print('no2_function complete')
    points_df_filled['Value_o3'] = points_df_filled.apply(lambda row : o3_function(row['Latitude'], row['Longitude']) * o3_molar_mass, axis=1)
    print('o3_function complete')
    points_df_filled['Value_so2'] = points_df_filled.apply(lambda row : so2_function(row['Latitude'], row['Longitude']) * so2_molar_mass, axis=1)
    print('so2_function complete')
    points_df_filled['Value_ai'] = points_df_filled.apply(lambda row : ai_function(row['Latitude'], row['Longitude']), axis=1)
    print('ai_function complete')
    
    # points_df_filled['Value_co'] = points_df_filled.swifter.apply(lambda row : co_function(row['Latitude'], row['Longitude']) * co_molar_mass, axis=1)
    # print('co_function complete')
    # points_df_filled['Value_no2'] = points_df_filled.swifter.apply(lambda row : no2_function(row['Latitude'], row['Longitude']) * no2_molar_mass, axis=1)
    # print('no2_function complete')
    # points_df_filled['Value_o3'] = points_df_filled.swifter.apply(lambda row : o3_function(row['Latitude'], row['Longitude']) * o3_molar_mass, axis=1)
    # print('o3_function complete')
    # points_df_filled['Value_so2'] = points_df_filled.swifter.apply(lambda row : so2_function(row['Latitude'], row['Longitude']) * so2_molar_mass, axis=1)
    # print('so2_function complete')
    # points_df_filled['Value_ai'] = points_df_filled.swifter.apply(lambda row : ai_function(row['Latitude'], row['Longitude']), axis=1)
    # print('ai_function complete')
    
    #Pandarallel does what Multiprocess does just under the hood but adds progress bar
    # pandarallel.initialize(progress_bar=True)
    
    # points_df_filled['Value_co'] = points_df_filled.parallel_apply(lambda row : co_function(row['Latitude'], row['Longitude']) * co_molar_mass, axis=1)
    # print('co_function complete')
    # points_df_filled['Value_no2'] = points_df_filled.parallel_apply(lambda row : no2_function(row['Latitude'], row['Longitude']) * no2_molar_mass, axis=1)
    # print('no2_function complete')
    # points_df_filled['Value_o3'] = points_df_filled.parallel_apply(lambda row : o3_function(row['Latitude'], row['Longitude']) * o3_molar_mass, axis=1)
    # print('o3_function complete')
    # points_df_filled['Value_so2'] = points_df_filled.parallel_apply(lambda row : so2_function(row['Latitude'], row['Longitude']) * so2_molar_mass, axis=1)
    # print('so2_function complete')
    # points_df_filled['Value_ai'] = points_df_filled.parallel_apply(lambda row : ai_function(row['Latitude'], row['Longitude']), axis=1)
    # print('ai_function complete')
    
    return points_df_filled

def normalise(points_df_filled):
    norm_cols = ['Value_co', 'Value_no2', 'Value_o3', 'Value_so2', 'Value_ai']
    #normalise each aq metric value set between 1 and 0 where 0 = 0% and 1 = 20%
    for i in points_df_filled[norm_cols]:   #normalise aq value columns
        points_df_filled['norm_' + i]=(points_df_filled[i]-points_df_filled[i].min())/(points_df_filled[i].max()-points_df_filled[i].min())
    
    return points_df_filled

def aqs_function(aq1, aq2, aq3, aq4, aq5):
    #smaller value = better air quality
    aqs = (aq1 * (20/100)) + (aq2 * (20/100)) + (aq3 * (20/100)) + (aq4 * (20/100)) + (aq5 * (20/100))
    return aqs

def apply_aqs_function(points_df_filled):
    #assumption: each metric is worth 20% of AQS, 100 / 5 metrics
    #apply calculate_aqi function to each row of the 5 aq columns
    points_df_filled['AQ_score'] = points_df_filled.swifter.apply(lambda row : aqs_function(row['norm_Value_co'], 
                                                                                            row['norm_Value_no2'], 
                                                                                            row['norm_Value_o3'], 
                                                                                            row['norm_Value_so2'], 
                                                                                            row['norm_Value_ai']), axis=1)
    
    #drop normalised columns (as unuseful now)
    points_df_filled = points_df_filled.drop(['norm_Value_co', 'norm_Value_no2', 'norm_Value_o3', 'norm_Value_so2', 'norm_Value_ai'], axis = 1)
    
    return points_df_filled

def apply_popd_function(points_df_filled):
    #same as above apply aq functions but with...
    #popdensity_function
    points_df_filled['Pop_density'] = points_df_filled.swifter.apply(lambda row : popdensity_function(row['Latitude'], row['Longitude']), axis=1)
    
    return points_df_filled

def greenspace_score_function(points_df_filled, aqs, pop_density, land_type):
    #Air Quality Score
    aqs_pct = 100 - sum(popd_pct)
    aqs_weight = 1
    
    #Population Density
    popd_pct = 50/100
    #50m2 per capita according to WHO standards or 100m2 (our resolution) per 2 people
    standard_gs_per_pop_m2 = 50
    sum_popd = points_df_filled['Pop_density'].sum()
    sum_greenspace_m2 = points_df_filled[points_df_filled['Land_type'] == 'greenspace'].sum() * 100   #multiplied by resolution
    gs_per_capita = sum_greenspace_m2 / sum_popd
    diff_standard_real = standard_gs_per_pop_m2 - gs_per_capita
    #if current greenspace per capita is BETTER than WHO standards, it is LESS likely greenspace is required so PENALISE lower weighting
    if diff_standard_real < 0:   #check if negative
        popd_weight = abs(diff_standard_real) / standard_gs_per_pop_m2   #will reduce weight below 1, decreasing contribution of pop density to greenspace score
    #if current greenspace per capita is WORSE than WHO standards, it is MORElikely greenspace is required so REWARD higher weighting
    elif diff_standard_real > 0:
        popd_weight = abs(diff_standard_real) / standard_gs_per_pop_m2   #will increase weight above 1, increasing contribution of pop density to greenspace score
    else:
        popd_weight = 1   #no difference means weighting is cancelled out
    
    #Land Type
    if (land_type == 'hospital'):
        penalty = 0
    elif (land_type == 'hospital'):
        penalty = 0
    elif land_type in ['hospital', 'hospital']:
        penalty = 0
        
    Greenspace_score = ((aqs_weight * aqs) * aqs_pct) + ((popd_weight * pop_density) * popd_pct) * penalty
    return Greenspace_score
    
def apply_greenspace_score_function(points_df_filled):
    points_df_filled['Greenspace_score'] = points_df_filled.swifter.apply(lambda row : greenspace_score_function(row['Land_type'], 
                                                                                            row['AQ_score'], 
                                                                                            row['Pop_density']), axis=1)

def fill_df(points_df_filled):
    points_df_filled = apply_aq_functions(points_df_filled)
    print('apply_aq_functions complete')
    points_df_filled = normalise(points_df_filled)
    print('normalisation of aq values complete')
    points_df_filled = apply_aqs_function(points_df_filled)
    print('apply_aqs_function complete')
    
    # n_cores = cpu_count()
    # df_splits = np.array_split(points_df_filled, n_cores)
    # pool = Pool(n_cores)
    # results = pool.map(apply_popd_function, df_splits)
    # pool.close()
    # pool.join()
    # points_df_filled = pd.concat(results)
    # print('apply_popd_function complete')
    
    # try:
    #     points_df_filled = apply_greenspace_score_function(points_df_filled)
    #     print('apply_greenspace_score_function complete')
    # except:
    #     pass   #remove when complete
    
    return points_df_filled

In [4]:
points_df_filled = pd.read_csv(ROOT + '/Spikes/Dash/data/points_df.csv', index_col = 0)

fill_df(points_df_filled)

#save dataframe
points_df_filled.to_csv(ROOT + '/Spikes/Dash/data/final_csv.csv')

Dask Apply: 100%|██████████| 16/16 [05:50<00:00, 21.93s/it]  


co_function complete


Dask Apply: 100%|██████████| 16/16 [24:35<00:00, 92.24s/it] 


no2_function complete


Dask Apply: 100%|██████████| 16/16 [02:00<00:00,  7.53s/it]


o3_function complete


Dask Apply: 100%|██████████| 16/16 [01:52<00:00,  7.02s/it]


so2_function complete


Dask Apply: 100%|██████████| 16/16 [02:11<00:00,  8.24s/it]


ai_function complete
apply_aq_functions complete
normalisation of aq values complete
apply_aqs_function complete


Dask Apply:   0%|          | 0/16 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [None]:
points_df_filled

Unnamed: 0,Latitude,Longitude,Value_co,Value_no2,Value_o3,Value_so2,Value_ai
0,51.737184,-0.620643,0.764264,0.004088,7.248996,0.032141,-1.031149
1,51.737184,-0.617012,0.764304,0.004140,7.250456,0.032148,-1.035486
2,51.737184,-0.613382,0.764350,0.004143,7.250998,0.032186,-1.037714
3,51.737183,-0.609751,0.764624,0.004142,7.251659,0.032087,-1.035978
4,51.737183,-0.606120,0.764288,0.004142,7.252279,0.031958,-1.039697
...,...,...,...,...,...,...,...
58243,51.238843,0.312049,0.764161,0.002717,7.310894,0.028964,-0.748909
58244,51.238815,0.315640,0.764524,0.002728,7.310550,0.029058,-0.745427
58245,51.238786,0.319231,0.764969,0.002732,7.310113,0.029139,-0.744029
58246,51.238757,0.322822,0.765131,0.002735,7.311109,0.029128,-0.740657
