In [1]:
import sys
import pandas as pd
sys.path.insert(0, '..')
from featurizer import *
from helpers.utils import *

config_file = '../configs/config_new.yml'

### Load from datastore

In [2]:
datastore = DataStore(config_file)

In [3]:
spark = datastore.spark
featurizer = Featurizer(datastore)

Loading CDR...
Loading recharges...
Loading mobile data...
Loading mobile data...
Loading antennas...


### Deduplicate

In [4]:
cdr = pd.read_csv('../synthetic_data/cdr.csv')
recharges = pd.read_csv('../synthetic_data/recharges.csv')

assert featurizer.cdr.count() == len(cdr)

featurizer.cdr = featurizer.cdr.union(featurizer.cdr)
featurizer.recharges = featurizer.recharges.union(featurizer.recharges)
assert featurizer.cdr.count() == 2*len(cdr)
assert featurizer.recharges.count() == 2*len(recharges)

featurizer.deduplicate()
assert featurizer.cdr.count() == len(cdr)
assert featurizer.recharges.count() == len(recharges)

AttributeError: 'Featurizer' object has no attribute 'cdr'

### Filter Dates

In [None]:
featurizer = Featurizer(config)

for df in [featurizer.cdr, featurizer.recharges, featurizer.mobiledata, featurizer.mobilemoney]:
    assert df.agg({'day':'min'}).collect()[0][0] == pd.to_datetime('2020-01-01')
    assert df.agg({'day':'max'}).collect()[0][0] == pd.to_datetime('2020-02-29')

featurizer.filter_dates('2020-01-05', '2020-02-01')
for df in [featurizer.cdr, featurizer.recharges, featurizer.mobiledata, featurizer.mobilemoney]:
    assert df.agg({'day':'min'}).collect()[0][0] == pd.to_datetime('2020-01-05')
    assert df.agg({'day':'max'}).collect()[0][0] == pd.to_datetime('2020-02-01')

### Remove Spammers

In [None]:
# Test for number of spammers identified

for spammer_threshold in [0, 1.5, 2]:
    
    featurizer = Featurizer(config)

    spammers = len(featurizer.remove_spammers(spammer_threshold=spammer_threshold))
    
    cdr = pd.read_csv('../synthetic_data/cdr.csv')
    cdr['timestamp'] = pd.to_datetime(cdr['timestamp'])
    cdr['day'] = cdr['timestamp'].dt.date
    grouped = cdr.groupby(['caller_id', 'txn_type']).agg({'day':['count', 'nunique']}).reset_index()
    assert len(grouped[grouped['day']['count'] > spammer_threshold*grouped['day']['nunique']]['caller_id'].unique()) == spammers

In [None]:
# Test for removal of spammers

spammer_threshold = 1

featurizer = Featurizer(config)

spammers = featurizer.remove_spammers(spammer_threshold=spammer_threshold)

for df in [featurizer.cdr, featurizer.recharges, featurizer.mobiledata, featurizer.mobilemoney]:
    
    caller_ids = [item[0] for item in featurizer.cdr.select('caller_id').collect()]
    assert set(spammers).intersection(set(caller_ids)) == set()
    
    if 'recipient_id' in df.columns:
        recipient_ids = [item[0] for item in featurizer.cdr.select('caller_id').collect()]
        assert set(spammers).intersection(set(recipient_ids)) == set()

### Filter Outlier Days

In [None]:
# Test for number of days identified

for num_sds in [1, 1.5, 2]:
    featurizer = Featurizer(config)

    outliers = [pd.to_datetime(item).tz_localize(None) for item in featurizer.filter_outlier_days(num_sds)]

    cdr = pd.read_csv('../synthetic_data/cdr.csv')
    cdr['timestamp'] = pd.to_datetime(cdr['timestamp'])
    cdr['day'] = cdr['timestamp'].dt.floor('d')
    grouped = cdr.groupby('day', as_index=False).agg('count')
    u = grouped['txn_type'].mean() + num_sds*grouped['txn_type'].std()
    l = grouped['txn_type'].mean() - num_sds*grouped['txn_type'].std()
    outliers_manual = grouped[(grouped['txn_type'] < l) | (grouped['txn_type'] > u)]['day'].astype('object')
    assert list(outliers_manual.unique()) == outliers

In [None]:
# Test for removal of days

num_sds = 2

featurizer = Featurizer(config)

original_counts = []
for df in [featurizer.cdr, featurizer.recharges, featurizer.mobiledata, featurizer.mobilemoney]:
    original_counts.append(df.select('day').distinct().count())
    
num_outliers = len(featurizer.filter_outlier_days(num_sds))

for d, df in enumerate([featurizer.cdr, featurizer.recharges, featurizer.mobiledata, featurizer.mobilemoney]):
    assert(df.select('day').distinct().count() == original_counts[d] - num_outliers)

### Diagnostic Statistics

In [None]:
featurizer = Featurizer(config)

statistics = featurizer.diagnostic_statistics('test_output')

for fname, name in [('cdr', 'CDR')]:
    
    df = pd.read_csv('../synthetic_data/' + fname + '.csv')
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    
    assert((df['timestamp'].max() - df['timestamp'].min()).days + 1 == statistics[name]['Days'])
    assert(len(df) == statistics[name]['Transactions'])
    assert(len(df['caller_id'].unique()) == statistics[name]['Subscribers'])
    if 'recipient_id' in df.columns:
        assert(len(df['recipient_id'].unique()) == statistics[name]['Recipients'])

### Diagnostic Plots

In [None]:
featurizer = Featurizer(config)

featurizer.diagnostic_plots('test_output')

### CDR Features

In [None]:
featurizer = Featurizer(config)
featurizer.location_features()

In [None]:
featurizer = Featurizer(config)
featurizer.cdr_features()
featurizer.international_features()
featurizer.location_features()
featurizer.recharges_features()
featurizer.mobiledata_features()
featurizer.mobilemoney_features()
featurizer.all_features()

In [None]:
# Total number of records
cdr = pd.read_csv('../synthetic_data/cdr.csv')
outgoing = cdr.groupby('caller_id').agg('count')\
    [['txn_type']]
incoming = cdr.groupby('recipient_id').agg('count')\
    [['txn_type']]
txns = pd.concat([outgoing, incoming])
txns['caller_id'] = txns.index
txn_count = txns.groupby('caller_id').agg('sum')
txn_count['name'] = txn_count.index
feats = featurizer.features['cdr'].toPandas()\
    [['name', 'cdr_reporting__number_of_records']]
feats['cdr_reporting__number_of_records'] = feats['cdr_reporting__number_of_records']\
    .astype('float')
merged = txn_count.merge(feats, on='name', how='outer')
assert len(merged[merged['txn_type'] != merged['cdr_reporting__number_of_records']]) == 0

In [None]:
# Mean call time
cdr = pd.read_csv('../synthetic_data/cdr.csv')
txns = pd.concat([cdr[['caller_id', 'duration']].rename({'caller_id':'name'}, axis=1), 
            cdr[['recipient_id', 'duration']].rename({'recipient_id':'name'}, axis=1)])
txns = txns.groupby('name', as_index=False).agg('mean')
feats = featurizer.features['cdr'].toPandas()
feats['cdr_call_duration__allweek__allday__call__mean'] = \
    feats['cdr_call_duration__allweek__allday__call__mean'].astype('float')
merged = feats[['name', 'cdr_call_duration__allweek__allday__call__mean']]\
    .merge(txns, on='name')
assert len(merged[merged['duration'].astype('int') != \
       merged['cdr_call_duration__allweek__allday__call__mean'].astype('int')]) == 0

In [None]:
# Number of contacts
cdr = pd.read_csv('../synthetic_data/cdr.csv')
txns = cdr[cdr['txn_type'] == 'call']
txns = pd.concat([txns[['caller_id', 'recipient_id', 'duration']]\
                      .rename({'caller_id':'name', 'recipient_id':'contact'}, axis=1), 
                  txns[['caller_id','recipient_id', 'duration']]\
                      .rename({'recipient_id':'name', 'caller_id':'contact'}, axis=1)])
contacts = pd.DataFrame(txns.groupby('name')['contact'].nunique())
feats = featurizer.features['cdr'].toPandas()\
    [['name', 'cdr_number_of_contacts__allweek__allday__call']]
feats['cdr_number_of_contacts__allweek__allday__call'] = \
    feats['cdr_number_of_contacts__allweek__allday__call'].astype('int')
merged = feats.merge(contacts, on='name', how='outer')
assert len(merged[merged['contact'] != \
                  merged['cdr_number_of_contacts__allweek__allday__call']]) == 0

### International Features

In [None]:
# Number of outgoing transactions
cdr = pd.read_csv('../synthetic_data/cdr.csv')
cdr = cdr[cdr['international'] == 'international']
feats = featurizer.features['international'].toPandas()
inter = cdr.groupby('caller_id', as_index=False).agg('count')[['caller_id', 'txn_type']]\
    .rename({'caller_id':'name'}, axis=1)
merged = feats.merge(inter, on='name')
assert list(merged['international_all__recipient_id__count'].astype('int')) == \
    list(merged['txn_type'].astype('int'))

In [None]:
# Total call duration
cdr = pd.read_csv('../synthetic_data/cdr.csv')
cdr = cdr[cdr['international'] == 'international']
feats = featurizer.features['international'].toPandas()
inter = cdr.groupby('caller_id', as_index=False).agg('sum')[['caller_id', 'duration']]\
    .rename({'caller_id':'name'}, axis=1)
merged = feats.merge(inter, on='name')
assert list(merged['international_call__duration__sum'].fillna(0).astype('float')) == \
    list(merged['duration'].astype('float'))

### Mobile Data Features

In [None]:
# Number of unique days
feats = featurizer.features['mobiledata'].toPandas()
data = pd.read_csv('../synthetic_data/mobiledata.csv')
data['day'] = data['timestamp'].apply(lambda x: x[:10])
days = pd.DataFrame(data.groupby('caller_id')['day'].nunique())
days['name'] = days.index
merged = days.merge(feats, on='name')
assert len(merged[merged['day'] != merged['mobiledata_num_days']]) == 0

In [None]:
# Max volume
feats = featurizer.features['mobiledata'].toPandas()
data = pd.read_csv('../synthetic_data/mobiledata.csv')
data = data.groupby('caller_id', as_index=False).agg('max')\
    .rename({'caller_id':'name'}, axis=1)
merged = data.merge(feats, on='name')
assert list(merged['volume'].astype('int')) == \
    list(merged['mobiledata_max_volume'].astype('int'))

### Mobile Money Features

In [None]:
# Minimum outgoing p2p volume
feats = featurizer.features['mobilemoney'].toPandas()
mm = pd.read_csv('../synthetic_data/mobilemoney.csv')
mm = mm[mm['txn_type'] == 'p2p']
mm = mm.groupby('caller_id', as_index=False).agg('min')\
    .rename({'caller_id':'name'}, axis=1)
merged = mm.merge(feats, on='name', how='outer').fillna(0)
assert (merged['mobilemoney_outgoing_p2p_amount_min'].astype('float').round(1) == \
    merged['amount'].astype('float').round(1)).all()

In [None]:
# Minimum outgoing p2p volume
feats = featurizer.features['mobilemoney'].toPandas()
mm = pd.read_csv('../synthetic_data/mobilemoney.csv')
outgoing = mm[['caller_id', 'sender_balance_before']]\
    .rename({'caller_id':'name', 'sender_balance_before':'balance'}, axis=1)
incoming = mm[['recipient_id', 'recipient_balance_before']]\
    .rename({'recipient_id':'name', 'recipient_balance_before':'balance'}, axis=1)
mm = pd.concat([outgoing, incoming])
mm = mm.groupby('name', as_index=False).agg('mean')
merged = mm.merge(feats, on='name', how='outer').fillna(0)
assert (merged['mobilemoney_all_all_balance_before_mean'].astype('float').round(1) == \
    merged['balance'].astype('float').round(1)).all()

### Recharges Features

In [None]:
# Total recharge
feats = featurizer.features['recharges'].toPandas()
recharges = pd.read_csv('../synthetic_data/recharges.csv')\
    .rename({'caller_id':'name'}, axis=1)
recharges = recharges.groupby('name', as_index=False).agg('sum')
merged = recharges.merge(feats, on='name')
assert (merged['amount'].astype('float') == merged['recharges_sum'].astype('float')).all()

### Location Features

In [None]:
feats = featurizer.features['location'].toPandas()
cdr = pd.read_csv('../synthetic_data/cdr.csv')
antennas = pd.read_csv('../synthetic_data/antennas.csv')
antennas = gpd.GeoDataFrame(antennas, geometry=gpd.points_from_xy(antennas['longitude'], antennas['latitude']))
antennas.crs = {"init":"epsg:4326"}
prefectures = gpd.read_file('../synthetic_data/prefectures.geojson')
antennas = gpd.sjoin(antennas, prefectures, op='within', how='left')[['antenna_id', 'region']]
antennas['region'] = antennas['region'].fillna('Unknown')
outgoing = cdr[['caller_id', 'caller_antenna']]\
    .rename({'caller_id':'name', 'caller_antenna':'antenna_id'}, axis=1)\
    .merge(antennas, on='antenna_id', how='left')
incoming = cdr[['recipient_id', 'recipient_antenna']]\
    .rename({'recipient_id':'name', 'recipient_antenna':'antenna_id'}, axis=1)\
    .merge(antennas, on='antenna_id', how='left')
cdr = pd.concat([outgoing, incoming])
cdr['region'] = cdr['region'].fillna('Unknown')

In [None]:
# Unique prefectures
unique = pd.DataFrame(cdr.groupby('name')['region'].nunique())
merged = unique.merge(feats[['name', 'location_count(prefectures)']], on='name')
assert (merged['region'].astype('int') == merged['location_count(prefectures)'].astype('int')).all()

In [None]:
# Count per prefecture
cdr['call'] = 1
counts = cdr.groupby(['name', 'region'], as_index=False).count()\
    .rename({'call':'count'}, axis=1)\
    .pivot(index='name', columns='region', values='count').fillna(0)
merged = counts.merge(feats, on='name', how='inner')
for prefecture in counts.columns:
    assert (merged[prefecture].astype('float') == merged['location_prefectures_' + prefecture].astype('float'))\
        .all(), prefecture