In [1]:
!export SPARK_LOCAL_DIRS="/data/togo_anon/tmp/conner"

In [2]:
import os
import pandas as pd
from pathlib import Path

In [3]:
survey_data_path = '/data/togo_anon/surveys/endline/endline_demographic.csv'
treatment_assignment_path = '/data/togo_anon/novissi/gd/whitelist/oldsims_p1_anon_info.csv'
registered_path = '/data/togo_anon/novissi/gd/registrations/2021-01-11.csv'

call_dir = Path('/data/togo_anon/cdr/voice')
sms_dir = Path('/data/togo_anon/cdr/sms')
mobile_money_dir = Path('/data/togo_anon/mobilemoney/clean_data')
mobile_data_dir = Path('/data/togo_anon/mobiledata/')

In [4]:
months = ['2020/11', '2020/12', '2021/1', '2021/2', '2021/3', '2021/4']
providers = ['moov', 'togocom']

In [5]:
call_paths = [call_dir / (m + '.csv') for m in months]
sms_paths = [sms_dir / (m + '.csv') for m in months]
mobile_money_paths = [[mobile_money_dir / m / (provider + '.csv') for provider in providers] for m in months]
mobile_data_paths = [mobile_data_dir / table for table in os.listdir(mobile_data_dir)]

# Find list of people

In [6]:
from cider.datastore import DataStore
from cider.featurizer import Featurizer
from cider.featurizer import get_spark_session

In [7]:
cfg_file = './config.yml'
ds = DataStore(cfg_file)
spark = get_spark_session(ds.cfg)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/05 12:24:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/05 12:24:39 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [8]:
spark

In [9]:
cdr_dfs = []
for path in call_paths + sms_paths:
    cdr_dfs.append(spark.read.option('header', 'true').csv(str(path)))

In [10]:
treatment_assignment_df = spark.read.option('header', 'true').csv(treatment_assignment_path)
registered_df = spark.read.option('header', 'true').csv(registered_path)
joined = registered_df.join(treatment_assignment_df, on='phone_number')

In [12]:
label_fpath = '/data/togo_anon/surveys/endline/fsec.csv'
label_df = spark.read.csv(label_fpath, inferSchema=True, header=True)
label_df.show(5)

                                                                                

+----------------+------+----------+------+-------+--------+------------------+----+--------------------+---------+---+
|    phone_number|1meals|2preferred|3limit|4reduce|5reducec|           6mealsc|7buy|          fsec_index|treatment|uid|
+----------------+------+----------+------+-------+--------+------------------+----+--------------------+---------+---+
|o7zAGNlD6X4rVQKa|   0.0|      0.25|  0.25|    0.0|     0.0|0.3333333333333333| 0.0| -0.7257207768326157|        1|  0|
|pKy2r1NKw99mv0oD|   0.0|      0.25|  0.25|    0.0|     0.0|0.3333333333333333| 0.0| -0.7257207768326157|        0|  1|
|YRV0m8W0gonr3E4g|   0.0|       1.0|  0.25|   0.25|    null|              null| 0.0|-0.42853823493545007|        1|  2|
|5Kowry7neRKGYReA|   0.0|      0.75|  0.75|   0.75|     1.0|0.3333333333333333| 1.0|   0.639351452963377|        0|  3|
|6wd0G5YAVQWGj3ZV|   0.0|      0.25|  0.25|    0.0|     0.0|               0.0| 0.0| -0.7257207768326157|        1|  4|
+----------------+------+----------+----

In [14]:
joined.count()

                                                                                

53934

In [15]:
joined = joined.join(label_df, on='phone_number')

In [16]:
joined.count()

9982

In [14]:
# cdr_dfs[0].show()

In [15]:
# cdr_dfs[0].select('caller_msisdn').show()

In [17]:
def select_subset(df):
    cols = ['interaction', 'caller_msisdn', 
            'recipient_msisdn', 'datetime', 
            'duration', 'antenna_id', 'recipient_type']
    return df.select(cols).join(joined, 
                on=(df['caller_msisdn'] == joined['phone_number'])).select(cols)

cdr_dfs = list(map(select_subset, cdr_dfs))

In [18]:
from functools import reduce
from pyspark.sql import DataFrame

cdr_df_master = reduce(DataFrame.unionAll, cdr_dfs)

# Value Cleaning

Replace `sms` with `text` and `voice` with `call`

Replace `shortcode` with `other`

In [19]:
cdr_df_master = cdr_df_master.replace({'sms': 'text', "voice": "call", 'shortcode': 'other'})

# Export

In [20]:
cdr_df_master.schema

StructType(List(StructField(interaction,StringType,true),StructField(caller_msisdn,StringType,true),StructField(recipient_msisdn,StringType,true),StructField(datetime,StringType,true),StructField(duration,StringType,true),StructField(antenna_id,StringType,true),StructField(recipient_type,StringType,true)))

In [21]:
cdr_master_path = '/data/tmp/cdr_impact_eval/filtered_cdr.csv'
cdr_master_filepath_test = '/data/tmp/cdr_impact_eval/filtered_cdr_test.csv'

In [22]:
import shutil
shutil.rmtree(cdr_master_path)

In [23]:
cdr_df_master.write.csv(cdr_master_path, header=True)

                                                                                

In [None]:
cdr_df_master.limit(1000).write.csv(cdr_master_filepath_test, header=True)

[Stage 79:(488 + 48) / 702][Stage 80:> (0 + 0) / 686][Stage 81:> (0 + 0) / 246] 

In [25]:
print('done')

done
