# Production of indicators for the COVID19 Mobility Task Force (Modified for running on HDP/Hive environment)

This notebook is an extension of cdr-aggregation under [worldbank/covid-mobile-data](https://github.com/worldbank/covid-mobile-data/tree/master/cdr-aggregation). It specifically customized to allow computing the indicators on the data resisted on Hive table under Hortonwork Data Platform (HDP).

   
Find more information about the `config_file_hive.py` settings.
    


# Import code

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from modules.DataSource import *

In [None]:
config_file = '../config_file_hive.py'

In [None]:
exec(open(config_file).read())

In [None]:
ds = DataSource(datasource_configs)
ds.show_config()

In [None]:
ds.spark.sql("show tables").show()

In [None]:
from modules.setup import *

# Import data

## Load CDR data

### Process/standardize raw data, save as parquet, and then load it

In [None]:
# ds.standardize_csv_files(show=True)
# ds.save_as_parquet()

In [None]:
#ds.load_standardized_parquet_file()

In [None]:
# test is for testing connection to spark service under HDP/hive. please change 'calls_100k' to CDR Data table.
ds.spark.sql("select * from calls_100k limit 10").show()

### Specify and load hive table

In [None]:
# # Specify and load hive data
ds.parquet_df = ds.spark.sql("""SELECT {} AS msisdn, 
                                        {} AS call_datetime, 
                                        {} AS location_id FROM {}""".format(ds.hive_vars['msisdn'],
                                                                            ds.hive_vars['call_datetime'],
                                                                            ds.hive_vars['location_id'],
                                                                            ds.hive_vars['calls']))

In [None]:
##  Saurav
ds.parquet_df
#ds.save_as_parquet()

## Load geo data

In [None]:
ds.load_geo_csvs(show=False)

In [None]:
## Use this in case you want to cluster the towers and create a distance matrix

ds.create_gpds()
from modules.tower_clustering import *
clusterer = tower_clusterer(ds, 'admin2', 'ID_2')
ds.admin2_tower_map, ds.distances = clusterer.cluster_towers()


In [None]:
ds.admin2_tower_map
ds.distances

In [None]:
#ds.create_gpds()
#from modules.tower_clustering import *
clusterer = tower_clusterer(ds, 'admin3', 'ID_3')
ds.admin3_tower_map, ds.distances  = clusterer.cluster_towers()

In [None]:
## Use this in case you want to create a voronoi tesselation

from modules.voronoi import *
voronoi = voronoi_maker(ds, 'admin3', 'ID_3')
ds.voronoi = voronoi.make_voronoi()

In [None]:
#ds.getattr('admin2_tower_map')

getattr(ds, 'admin2_tower_map')
getattr(ds, 'admin3_tower_map')
print(getattr(ds, 'voronoi'))
for att in dir(ds):
    print(att)

# Run aggregations

## Flowminder indicators for admin2

In [None]:
agg_flowminder_admin2 = flowminder_aggregator(result_stub = '/admin2/flowminder',
                            datasource = ds,
                            regions = 'admin2_tower_map')



In [None]:
agg_flowminder_admin2.attempt_aggregation()

## Flowminder indicators for admin3

In [None]:
agg_flowminder_admin3 = flowminder_aggregator(result_stub = '/admin3/flowminder',
                            datasource = ds,
                            regions = 'admin3_tower_map')

In [None]:


agg_flowminder_admin3.attempt_aggregation()

## Priority indicators for admin2

In [None]:
agg_priority_admin2 = priority_aggregator(result_stub = '/admin2/priority',
                               datasource = ds,
                               regions = 'admin2_tower_map')

agg_priority_admin2.attempt_aggregation(indicators_to_produce = {'unique_subscribers_per_day' : ['unique_subscribers', 'day'],
                                                                 'percent_of_all_subscribers_active_per_day' : ['percent_of_all_subscribers_active', 'day'],
                                                                 'origin_destination_connection_matrix_per_day' : ['origin_destination_connection_matrix', 'day'],
                                                                 'mean_distance_per_day' : ['mean_distance', 'day'],
                                                                 'mean_distance_per_week' : ['mean_distance', 'week'],
                                                                 'origin_destination_matrix_time_per_day' : ['origin_destination_matrix_time', 'day'],
                                                                 'home_vs_day_location_per_day' : ['home_vs_day_location_per_day', ['day','week']],
                                                                 'home_vs_day_location_per_day' : ['home_vs_day_location_per_day', ['day','month']]})

## Priority indicators for admin3

In [None]:
agg_priority_admin3 = priority_aggregator(result_stub = '/admin3/priority',
                            datasource = ds,
                            regions = 'admin3_tower_map')

agg_priority_admin3.attempt_aggregation(indicators_to_produce = {'transactions_per_hour' : ['transactions', 'hour'],
                                                                 'transactions_per_hour' : ['transactions', 'hour']})

## Scaled priority indicators for admin2

In [None]:
agg_scaled_admin2 = scaled_aggregator(result_stub = '/admin2/scaled',
                               datasource = ds,
                               regions = 'admin2_tower_map')

agg_scaled_admin2.attempt_aggregation()

## Priority indicators for tower-cluster

In [None]:
agg_priority_tower = priority_aggregator(result_stub = '/voronoi/priority',
                               datasource = ds,
                               regions = 'voronoi')
                               #regions = 'voronoi_tower_map')

agg_priority_tower.attempt_aggregation(indicators_to_produce = {'unique_subscribers_per_hour' : ['unique_subscribers', 'hour'],
                                                        'mean_distance_per_day' : ['mean_distance', 'day'],
                                                        'mean_distance_per_week' : ['mean_distance', 'week']})

In [None]:
agg_priority_tower_harare = priority_aggregator(result_stub = '/voronoi/priority/harare',
                               datasource = ds,
                               regions = 'voronoi_tower_map_harare')

agg_priority_tower_harare.attempt_aggregation(indicators_to_produce = {'origin_destination_connection_matrix_per_day' : ['origin_destination_connection_matrix', 'day']})

In [None]:
agg_priority_tower_bulawayo = priority_aggregator(result_stub = '/voronoi/priority/bulawayo',
                               datasource = ds,
                               regions = 'voronoi_tower_map_bulawayo')

agg_priority_tower_bulawayo.attempt_aggregation(indicators_to_produce = {'origin_destination_connection_matrix_per_day' : ['origin_destination_connection_matrix', 'day']})

# Produce script

In [None]:
!jupyter nbconvert --to script *.ipynb

In [None]:
test = pd.DataFrame([[np.nan,1, 2],[0,1,2]])

In [None]:
test = ds.spark.createDataFrame([[None,1, 1,2],[2,2,2,2]])

In [None]:
test.toPandas()

In [None]:
test.groupby('_4').sum().toPandas()

In [None]:
test.withColumn('f', F.col('_1') + F.col('_2')).toPandas()