# Production of indicators for the COVID19 Mobility Task Force

In this notebook we produce indicators for the [COVID19 Mobility Task Force](https://github.com/worldbank/covid-mobile-data).

[Flowminder](https://covid19.flowminder.org) indicators are produced to increase the availability of comparable datasets across countries, and have been copied without modification from the [Flowminder COVID-19 github repository](https://github.com/Flowminder/COVID-19) (except for the start and end dates). These have been supplemented by a set of *priority* indicators with data for ingestion into the dashboard in this repository.

In this notebook we produce indicators in the following four steps:

- **Import code**: The code for the aggregation is included in the 'custom_aggregation' and 'flowminder_aggregation' scripts
- **Import data**: 
To set up the data import we need to place the CDR data files into the `data/new/CC/telco/` folder, where we replace `CC` with the country code and `telco` with the company abbreviation. 
We also need to place csv files with the tower-region mapping and distance matrices into the `data/support-data/CC/telco/geofiles` folder, and then modify the `data/support_data/config_file.py` to specify:
    - *geofiles*: the names of the geofiles, 
    - *country_code*: country code and company abbreviation,
    - *telecom_alias*: the path to the `data` folder,
    - *data_paths*: the names to the subfolders in `data/new/CC/telco/` that hold the csv files. Simply change this to `[*]` if you didn't create subfolders and want to load all files.
    - *dates*: set the start and end date of the data you want to produce the indicators for.
    
Find more information about the `config_file.py` settings see the [github page](https://github.com/worldbank/covid-mobile-data/tree/master/cdr-aggregation).
    
- **Run aggregations**: By default, we produce all flowminder and priority indicators. We've included 4 re-tries in case of failure, which we have experienced to help on databricks but is probably irrelevant in other settings. Note that before you can re-run these aggregations, you need to move the csv outputs that have been saved in `data/results/CC/telco/` in previous runs to another folder, else these indicators will be skipped. This prevents you from accidentally overwriting previous results. This way you can also delete the files only for the indicators you want to re-produce, and skip any indicatos you don't want to re-produce.

The outcome of this effort will be used to inform policy making using a [mobility indicator dashboard](https://github.com/worldbank/covid-mobile-data/tree/master/dashboard-dataviz).

# Import code

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from modules.setup import *

In [3]:
spark

# Import data

## Set up the configuration for data standardization

In [4]:
import os
home = os.environ['HOME']
data = os.path.join(home, 'work/data')
config_file = os.path.join(data, 'support-data/config_file.py')

In [5]:
exec(open(config_file).read())

In [6]:
ds = DataSource(datasource_configs)
ds.show_config()


Basepath: /home/jovyan/work/data
Country and company path: zw/econet
Paths for datafiles: ['mar20/*.csv', 'feb20/*.csv']
Geofiles: {'tower_sites': 'zw_econet_sites.csv', 'admin2': 'zw_admin2_shapefile.csv', 'admin3': 'zw_admin3_shapefile.csv', 'voronoi': 'zw_voronoi_shapefile.csv', 'admin2_tower_map': 'zw_admin2_tower_map.csv', 'admin3_tower_map': 'zw_admin3_tower_map.csv', 'voronoi_tower_map': 'zw_voronoi_tower_map.csv', 'distances': 'zw_distances_pd_long.csv', 'admin2_incidence': 'zw_admin2_covid_incidence_march30.csv', 'admin2_weight': 'zw_admin2_weight.csv'}
Load options: {'seperator': ',', 'header': 'false', 'mode': 'PERMISSIVE', 'datemask': 'dd/MM/yyyy HH:mm:ss'}
Load schema: StructType(List(StructField(msisdn,IntegerType,true),StructField(call_datetime,StringType,true),StructField(location_id,StringType,true)))
Filenames: {'parquetfile': 'febmar20.parquet'}



## Standardize raw csv files

In [7]:
# ds.standardize_csv_files(show=True)
# ds.save_as_parquet()

In [8]:
#ds.load_standardized_parquet_file()

In [9]:
## Use this in case you want to sample the data and run the code on the sample

#ds.sample_and_save(number_of_ids=1000)
ds.load_sample('sample_feb_mar2020')
ds.parquet_df = ds.sample_df

## Load geo data

In [10]:
ds.load_geo_csvs()

In [11]:
## Use this in case you want to cluster the towers and create a distance matrix

# ds.create_gpds()
# from modules.tower_clustering import *
# clusterer = tower_clusterer(ds, 'admin2', 'ID_2')
# ds.admin2_tower_map, ds.distances = clusterer.cluster_towers()
# clusterer = tower_clusterer(ds, 'admin3', 'ADM3_PCODE')
# ds.admin3_tower_map, ds.distances  = clusterer.cluster_towers()

In [12]:
## Use this in case you want to create a voronoi tesselation

# from modules.voronoi import *
# voronoi = voronoi_maker(ds, 'admin3', 'ADM3_PCODE')
# ds.voronoi = voronoi.make_voronoi()

# Run aggregations

## Flowminder indicators for admin2

In [13]:
agg_flowminder = aggregator(result_stub = '/admin2/flowminder',
                            datasource = ds,
                            regions = 'admin2_tower_map')

agg_flowminder.attempt_aggregation()

Skipped: count_unique_subscribers_per_region_per_day
Caching: home_locations
Skipped: count_unique_active_residents_per_region_per_day
Skipped: count_unique_visitors_per_region_per_day
Skipped: count_unique_subscribers_per_region_per_week
Skipped: count_unique_active_residents_per_region_per_week
Skipped: count_unique_visitors_per_region_per_week
Skipped: regional_pair_connections_per_day
Skipped: directed_regional_pair_connections_per_day
Skipped: total_calls_per_region_per_day
Skipped: home_location_counts_per_region
Indicators saved.


## Flowminder indicators for admin3

In [14]:
agg_flowminder = aggregator(result_stub = '/admin3/flowminder',
                            datasource = ds,
                            regions = 'admin3_tower_map')

agg_flowminder.attempt_aggregation()

Skipped: count_unique_subscribers_per_region_per_day
Caching: home_locations
Skipped: count_unique_active_residents_per_region_per_day
Skipped: count_unique_visitors_per_region_per_day
Skipped: count_unique_subscribers_per_region_per_week
Skipped: count_unique_active_residents_per_region_per_week
Skipped: count_unique_visitors_per_region_per_week
Skipped: regional_pair_connections_per_day
Skipped: directed_regional_pair_connections_per_day
Skipped: total_calls_per_region_per_day
Skipped: home_location_counts_per_region
Indicators saved.


## Priority indicators for admin2

In [15]:
agg_custom = custom_aggregator(result_stub = '/admin2/custom',
                               datasource = ds,
                               regions = 'admin2_tower_map')

agg_custom.attempt_aggregation()

Skipped: unique_subscribers_per_day
Skipped: percent_of_all_subscribers_active_per_day
Skipped: origin_destination_connection_matrix_per_day
Skipped: mean_distance_per_day
Skipped: week_home_vs_day_location_per_day
Skipped: month_home_vs_day_location_per_day
Skipped: origin_destination_matrix_time_per_day
Skipped: transactions_per_hour
Skipped: unique_subscribers_per_hour
Skipped: unique_subscriber_home_locations_per_week
Skipped: mean_distance_per_week
Custom indicators saved.


## Priority indicators for admin3

In [21]:
os.path.join(agg_custom.datasource.standardize_path, agg_custom.datasource.parquetfile_vars + agg_custom.level + '.parquet')

'/home/jovyan/work/data/standardized/zw/econet/febmar20_vars_admin3.parquet'

In [17]:
agg_custom = custom_aggregator(result_stub = '/admin3/custom',
                            datasource = ds,
                            regions = 'admin3_tower_map')

agg_custom.attempt_aggregation()

--> File does not exist. Saving: unique_subscribers_per_day
--> File does not exist. Saving: percent_of_all_subscribers_active_per_day
--> File does not exist. Saving: origin_destination_connection_matrix_per_day
--> File does not exist. Saving: mean_distance_per_day
--> File does not exist. Saving: week_home_vs_day_location_per_day
--> File does not exist. Saving: month_home_vs_day_location_per_day
--> File does not exist. Saving: origin_destination_matrix_time_per_day
--> File does not exist. Saving: transactions_per_hour
--> File does not exist. Saving: unique_subscribers_per_hour
--> File does not exist. Saving: unique_subscriber_home_locations_per_week
--> File does not exist. Saving: mean_distance_per_week
--> Renaming: unique_subscribers_per_day
--> Renaming: percent_of_all_subscribers_active_per_day
--> Renaming: origin_destination_connection_matrix_per_day
--> Renaming: mean_distance_per_day
--> Renaming: week_home_vs_day_location_per_day
--> Renaming: month_home_vs_day_locati

In [17]:
agg_custom = scaled_aggregator(result_stub = '/admin2/custom',
                               datasource = ds,
                               regions = 'admin2_tower_map',
                               re_create_vars = False)

result = agg_custom.mean_distance(agg_custom.period_filter, 'day')
result_pd = result.toPandas()
result_pd

Unnamed: 0,home_region,day,mean_distance,stdev_distance,mean_weighted_distance_population_scale,stdev_weighted_distance_population_scale,mean_weighted_distance_observed_scale,stdev_weighted_distance_observed_scale
0,1,2020-02-29,21,55,32,82,18,46
1,1,2020-03-05,20,56,30,83,17,47
2,49,2020-03-10,43,78,105,192,59,108
3,56,2020-03-07,32,75,43,100,24,56
4,36,2020-03-08,33,63,44,84,25,47
...,...,...,...,...,...,...,...,...
3475,54,2020-02-07,27,56,98,203,55,114
3476,29,2020-02-10,13,24,20,37,11,21
3477,20,2020-03-10,84,134,256,408,144,229
3478,45,2020-02-20,27,62,58,133,32,75


In [37]:
result_pd.to_csv('unique_subscribers_country.csv')

Unnamed: 0,day,count,weighted_count_population_scale,weighted_count_observed_scale
0,2020-03-06,7134,12582,7098
1,2020-03-19,7035,12424,7008
2,2020-03-24,7161,12707,7168
3,2020-03-11,7107,12634,7127
4,2020-02-11,6959,12284,6929
5,2020-03-13,7159,12683,7154
6,2020-02-03,7876,14025,7912
7,2020-03-20,7049,12509,7056
8,2020-02-13,7125,12569,7090
9,2020-02-15,7236,12790,7215


In [33]:
result_pd.sum()

region                            105,008
count                             493,009
weighted_count_population_scale   868,795
weighted_count_observed_scale     493,009
dtype: float64