In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os
os.environ['JAVA_HOME'] = '/Library/Java/JavaVirtualMachines/openjdk-17.jdk/Contents/Home'
os.environ['PATH'] = f"{os.environ['JAVA_HOME']}/bin:{os.environ['PATH']}"

In [None]:
# Imports
import sys
import pandas as pd
from pathlib import Path
import geopandas as gpd

from cider.utils import get_spark_session
from cider.featurizer.dependencies import (
    filter_to_datetime,
    get_spammers_from_cdr_data,
    get_outlier_days_from_cdr_data, 
    get_static_diagnostic_statistics,
    get_timeseries_diagnostic_statistics,
    identify_daytime,
    identify_weekend, 
    swap_caller_and_recipient,
    identify_and_tag_conversations,
    identify_mobile_money_transaction_direction)
from cider.featurizer.plotting import plot_timeseries_diagnostics
from cider.featurizer.core import (
    get_active_days,
    get_number_of_contacts_per_caller,
    get_call_duration_stats,
    get_percentage_of_nocturnal_interactions,
    get_percentage_of_initiated_conversations,
    get_percentage_of_initiated_calls,
    get_text_response_time_delay_stats,
    get_text_response_rate,
    get_entropy_of_interactions_per_caller,
    get_outgoing_interaction_fraction_stats,
    get_interaction_stats_per_caller,
    get_inter_event_time_stats,
    get_pareto_principle_interaction_stats,
    get_pareto_principle_call_duration_stats,
    get_number_of_interactions_per_user,
    get_number_of_antennas,
    get_entropy_of_antennas_per_caller,
    get_radius_of_gyration,
    get_pareto_principle_antennas,
    get_average_num_of_interactions_from_home_antennas,
    get_international_interaction_statistics,
    get_mobile_data_stats,
    get_mobile_money_amount_stats,
    get_mobile_money_transaction_stats,
    get_mobile_money_balance_stats,
    get_recharge_amount_stats,
    get_caller_counts_per_region)


sys.path.insert(0, '..')
from deprecated.datastore import DataStore
from pyspark.sql.functions import col, date_trunc, to_timestamp

In [4]:
# Get spark session
timezone = "UTC"
spark = get_spark_session()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/23 20:58:09 WARN Utils: Your hostname, Poornimas-MacBook-Air.local, resolves to a loopback address: 127.0.0.1, but we couldn't find any external IP address!
26/01/23 20:58:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/23 20:58:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# Load data
DATA_DIR = Path('../synthetic_data/')

cdr = pd.read_csv(DATA_DIR / 'cdr.csv')
cdr.rename(columns={
    "caller_antenna": "caller_antenna_id",
    "recipient_antenna": "recipient_antenna_id",
    "txn_type": "transaction_type",
    "international": "transaction_scope"}, 
    inplace=True)
cdr = cdr[~cdr.caller_antenna_id.isna()]
cdr["timestamp"] = pd.to_datetime(cdr["timestamp"])
cdr["caller_id"] = cdr["caller_id"].astype(str)
cdr["recipient_id"] = cdr["recipient_id"].astype(str)
cdr["recipient_antenna_id"] = cdr["recipient_antenna_id"].astype(str)


antennas = pd.read_csv(DATA_DIR / 'antennas.csv').dropna()
antennas = gpd.GeoDataFrame(antennas, geometry=gpd.points_from_xy(antennas.longitude, antennas.latitude))
antennas.set_crs(epsg=4326, inplace=True)

shapefile = gpd.read_file(DATA_DIR / 'prefectures.geojson')
antennas_merged_shp = gpd.sjoin(antennas, shapefile, how='left', predicate='within')[['antenna_id', 'region']]
antennas_merged_shp.region.fillna('Unknown', inplace=True)

recharges = pd.read_csv(DATA_DIR / 'recharges.csv')
recharges["caller_id"] = recharges["caller_id"].astype(str)


mobile_money = pd.read_csv(DATA_DIR / 'mobilemoney.csv')
mobile_money["caller_id"] = mobile_money["caller_id"].astype(str)
mobile_money["recipient_id"] = mobile_money["recipient_id"].astype(str)
mobile_money.rename(columns={"txn_type": "transaction_type",
                             "sender_balance_before": "caller_balance_before",
                             "sender_balance_after": "caller_balance_after"}, inplace=True)
mobile_money.loc[mobile_money.recipient_id == 'nan', 'recipient_id'] = None
mobile_money['caller_balance_before'] = pd.to_numeric(mobile_money['caller_balance_before'], errors='coerce')
mobile_money['caller_balance_after'] = pd.to_numeric(mobile_money['caller_balance_after'], errors='coerce')
mobile_money.drop(mobile_money[mobile_money.caller_balance_before.isna() | mobile_money.caller_balance_after.isna()].index, inplace=True)
mobile_money.loc[
    mobile_money.transaction_type.isin(['cashin', 'cashout']), 
    ['recipient_id', 'recipient_balance_before', 'recipient_balance_after']] = None



mobile_data = pd.read_csv(DATA_DIR / 'mobiledata.csv')
mobile_data["caller_id"] = mobile_data["caller_id"].astype(str)

# Get deprecated code stuff
config_file = '../configs/config_new.yml'
datastore = DataStore(config_file)
datastore.cdr = spark.createDataFrame(cdr)
datastore.recharges = spark.createDataFrame(recharges)
datastore.mobiledata = spark.createDataFrame(mobile_data)
datastore.mobilemoney = spark.createDataFrame(mobile_money)

# featurizer = Featurizer(datastore)


The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  antennas_merged_shp.region.fillna('Unknown', inplace=True)
26/01/23 20:58:12 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [6]:
def get_datastore_datasets(datastore_instance: DataStore):
    return (
        datastore_instance.cdr.toPandas(),
        datastore_instance.recharges.toPandas(),
        datastore_instance.mobiledata.toPandas(),
        datastore_instance.mobilemoney.toPandas()
        )


In [7]:
# Filter to date range with new code
filter_start_date = "2020-01-05"
filter_end_date = "2020-02-01"

# Filter data to date range with old code
datastore.filter_dates(filter_start_date, filter_end_date)
(
    old_cdr_filtered, 
    old_recharges_filtered, 
    old_mobile_data_filtered, 
    old_mobile_money_filtered) = get_datastore_datasets(datastore)

# Filter data to date range with new code
filter_start_date = pd.to_datetime(filter_start_date)
filter_end_date = pd.to_datetime(filter_end_date)

cdr_filtered = filter_to_datetime(cdr, filter_start_date, filter_end_date)
recharges_filtered = filter_to_datetime(recharges, filter_start_date, filter_end_date)
mobile_money_filtered = filter_to_datetime(mobile_money, filter_start_date, filter_end_date)
mobile_data_filtered = filter_to_datetime(mobile_data, filter_start_date, filter_end_date)

# Check that there are no differences:
for title, old, new in zip(
    ["CDR", "Recharges", "Mobile Data", "Mobile Money"],
    [old_cdr_filtered, old_recharges_filtered, old_mobile_data_filtered, old_mobile_money_filtered],
    [cdr_filtered, recharges_filtered, mobile_data_filtered, mobile_money_filtered]):
    print(title)
    print(old.shape[0], new.shape[0])
    print(pd.concat([old["caller_id"], new["caller_id"]]).drop_duplicates(keep=False).shape[0])
    print("\n")

                                                                                

CDR
41736 41736
0


Recharges
4698 4698
0


Mobile Data
4540 4540
0


Mobile Money
4670 4670
0




In [None]:
# Remove spammers
spammer_threshold = 1.75

# Get spammers with old code
datastore.cdr = datastore.cdr.withColumn("timestamp", to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss")).withColumn("day", date_trunc("day", col("timestamp"))).withColumnRenamed("transaction_type", "txn_type")
old_spammer_ids = datastore.remove_spammers(spammer_threshold)


(
    old_cdr_filtered, 
    old_recharges_filtered, 
    old_mobile_data_filtered, 
    old_mobile_money_filtered
) = get_datastore_datasets(datastore)


# Get spammers with new code
spammer_ids = get_spammers_from_cdr_data(
    cdr_filtered, 
    threshold_of_calls_per_day=spammer_threshold)

# Check old and new spammer IDs match
assert set(old_spammer_ids) == set(spammer_ids)

cdr_filtered = cdr_filtered[
    ~cdr_filtered.caller_id.isin(spammer_ids) & 
    ~cdr_filtered.recipient_id.isin(spammer_ids)]
recharges_filtered = recharges_filtered[~recharges_filtered.caller_id.isin(spammer_ids)]
mobile_money_filtered = mobile_money_filtered[
    ~mobile_money_filtered.caller_id.isin(spammer_ids) &
    ~mobile_money_filtered.recipient_id.isin(spammer_ids)]
mobile_data_filtered = mobile_data_filtered[~mobile_data_filtered.caller_id.isin(spammer_ids)]

# Mobile money data does not match because of the inclusion of non-recipient transactions in the new data
for title, old, new in zip(
    ["CDR", "Recharges", "Mobile Data", "Mobile Money"],
    [old_cdr_filtered, old_recharges_filtered, old_mobile_data_filtered, old_mobile_money_filtered],
    [cdr_filtered, recharges_filtered, mobile_data_filtered, mobile_money_filtered]):
    print(title)
    print(old.shape[0], new.shape[0])
    print(pd.concat([old["caller_id"], new["caller_id"]]).drop_duplicates(keep=False).shape[0])
    print("\n")


Number of spammers identified: 85


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  cdr_data.loc[:, "day"] = cdr_data["timestamp"].dt.date


CDR
34556 34556
0


Recharges
4283 4283
0


Mobile Data
4164 4164
0


Mobile Money
2544 4264
1




In [None]:
# Remove outlier days
z_score_threshold = 2.0
outlier_days = get_outlier_days_from_cdr_data(cdr_filtered, z_score_threshold)

cdr_filtered = cdr_filtered[~cdr_filtered.timestamp.dt.date.isin(outlier_days)]
recharges_filtered = recharges_filtered[~recharges_filtered.timestamp.dt.date.isin(outlier_days)]
mobile_money_filtered = mobile_money_filtered[~mobile_money_filtered.timestamp.dt.date.isin(outlier_days)]
mobile_data_filtered = mobile_data_filtered[~mobile_data_filtered.timestamp.dt.date.isin(outlier_days)]

In [None]:
# Get static diagnostics
cdr_diagnostics = get_static_diagnostic_statistics(cdr_filtered)
recharges_diagnostics = get_static_diagnostic_statistics(recharges_filtered)
mobile_money_diagnostics = get_static_diagnostic_statistics(mobile_money_filtered)
mobile_data_diagnostics = get_static_diagnostic_statistics(mobile_data_filtered)

In [None]:
# Get timeseries diagnostics
cdr_timeseries_diagnostics = get_timeseries_diagnostic_statistics(cdr_filtered)
recharges_timeseries_diagnostics = get_timeseries_diagnostic_statistics(recharges_filtered)
mobile_money_timeseries_diagnostics = get_timeseries_diagnostic_statistics(mobile_money_filtered)
mobile_data_timeseries_diagnostics = get_timeseries_diagnostic_statistics(mobile_data_filtered)

In [None]:
# Plot diagnostics
fig = plot_timeseries_diagnostics(
    cdr_timeseries_diagnostics,
    value_column="num_unique_callers",
    groupby_column="transaction_type",
    plot_title="CDR: Number of Unique Callers by Transaction Type"
    )
fig

In [None]:
# Identify daytime and weekend calls
cdr_spark = spark.createDataFrame(cdr_filtered) 
cdr_spark_with_daytime = identify_daytime(cdr_spark)
cdr_spark_with_weekend = identify_weekend(cdr_spark_with_daytime)

# Swap caller and recipient to get recipient-centric view
cdr_swapped_caller_recipient = swap_caller_and_recipient(cdr_spark_with_weekend) 

# Identify and tag conversations
cdr_tagged_conversations = identify_and_tag_conversations(cdr_swapped_caller_recipient, max_wait=3600)

In [None]:
cdr_tagged_conversations.toPandas().head()

In [None]:
# Featurize data

In [None]:
# Get number of active days
cdr_active_days = get_active_days(cdr_tagged_conversations)

In [None]:
cdr_active_days.toPandas().head()

In [None]:
# Get number of contacts per caller
cdr_number_of_contacts_per_caller = get_number_of_contacts_per_caller(cdr_tagged_conversations)
cdr_number_of_contacts_per_caller.toPandas().head()

In [None]:
# Get call duration stats
cdr_call_duration_stats = get_call_duration_stats(cdr_tagged_conversations)
cdr_call_duration_stats.toPandas().head()

In [None]:
# Get percentage of nocturnal calls
cdr_nocturnal_calls = get_percentage_of_nocturnal_interactions(cdr_tagged_conversations)
cdr_nocturnal_calls.toPandas().head()

In [None]:
# Get percentage of initiated conversations
cdr_percentage_initiated_conversations = get_percentage_of_initiated_conversations(cdr_tagged_conversations)
cdr_percentage_initiated_conversations.toPandas().head()

In [None]:
# Get percentage of initiated calls
cdr_percentage_initiated_calls = get_percentage_of_initiated_calls(cdr_tagged_conversations)
cdr_percentage_initiated_calls.toPandas().head()

In [None]:
# Get text response time statistics
cdr_text_response_time_delay_stats = get_text_response_time_delay_stats(cdr_tagged_conversations)
cdr_text_response_time_delay_stats.toPandas().head()

In [None]:
# Get text response rate
cdr_text_response_rate = get_text_response_rate(cdr_tagged_conversations)
cdr_text_response_rate.toPandas().head()

In [None]:
# Get entropy of interactions per caller
cdr_entropy_of_interactions = get_entropy_of_interactions_per_caller(cdr_tagged_conversations)
cdr_entropy_of_interactions.toPandas().head()

In [None]:
# Get fraction of outgoing interactions
cdr_fraction_of_outgoing_interactions = get_outgoing_interaction_fraction_stats(cdr_tagged_conversations)
cdr_fraction_of_outgoing_interactions.toPandas().head()

In [None]:
# Get interaction stats per caller
cdr_interaction_stats_per_caller = get_interaction_stats_per_caller(cdr_tagged_conversations)
cdr_interaction_stats_per_caller.toPandas().head()

In [None]:
# Get inter-event time statistics
cdr_inter_event_time_stats = get_inter_event_time_stats(cdr_tagged_conversations)
cdr_inter_event_time_stats.toPandas().head()

In [None]:
# Get pareto principle interaction stats
cdr_pareto_stats = get_pareto_principle_interaction_stats(cdr_tagged_conversations, percentage_threshold=0.8)
cdr_pareto_stats.toPandas().head()

In [None]:
# Get pareto principle statistics for call duration
cdr_pareto_call_stats = get_pareto_principle_call_duration_stats(cdr_tagged_conversations, percentage_threshold=0.8)
cdr_pareto_call_stats.toPandas().head()

In [None]:
# Get number of interactions per user
cdr_number_of_interactions = get_number_of_interactions_per_user(cdr_tagged_conversations)
cdr_number_of_interactions.toPandas().head()

In [None]:
# Get number of antennas per caller
cdr_number_of_antennas = get_number_of_antennas(cdr_tagged_conversations)
cdr_number_of_antennas.toPandas().head()

In [None]:
# Get entropy of antennas per caller
cdr_entropy_of_antennas = get_entropy_of_antennas_per_caller(cdr_tagged_conversations)
cdr_entropy_of_antennas.toPandas().head()

In [None]:
# Get radius of gyration: the number of callers' typical movement range
antennas.rename(columns={"antenna_id": "caller_antenna_id"}, inplace=True)
# antennas.drop(columns=['geometry'], inplace=True)
spark_antennas = spark.createDataFrame(antennas)
cdr_radius_of_gyration = get_radius_of_gyration(cdr_tagged_conversations, spark_antennas)
cdr_radius_of_gyration.toPandas().head()

In [None]:
# Get pareto principle statistics for antennas
cdr_pareto_antennas = get_pareto_principle_antennas(cdr_tagged_conversations, percentage_threshold=0.8)
cdr_pareto_antennas.toPandas().head()

In [None]:
# Get average number of interactions from home antennas; 
# home antenna is defined as the antenna with the most interactions for a caller
cdr_home_antenna_interactions = get_average_num_of_interactions_from_home_antennas(cdr_tagged_conversations)
cdr_home_antenna_interactions.toPandas().head()

In [None]:
# Get international interaction statistics: 
# number of interactions, num unique recipients, total call duration and num unique days
# disaggregated by transaction type
cdr_international_stats = get_international_interaction_statistics(cdr_tagged_conversations)
cdr_international_stats.toPandas().head()

In [None]:
# Get CDR antenna location features
antennas_merged_shp.rename(columns={"antenna_id": "caller_antenna_id"}, inplace=True)
antennas_merged = antennas_merged_shp.merge(antennas, on="caller_antenna_id", how="inner")

spark_antenna_data = spark.createDataFrame(antennas_merged)
cdr_antenna_region_counts = get_caller_counts_per_region(cdr_tagged_conversations, spark_antenna_data)
cdr_antenna_region_counts.toPandas().head()

In [None]:
# Featurize mobile data

In [None]:
spark_mobile_data = spark.createDataFrame(mobile_data_filtered)
mobile_data_stats = get_mobile_data_stats(spark_mobile_data)
mobile_data_stats.toPandas().head()

In [None]:
# Featurize mobile money data

In [None]:
pd_mobile_money = mobile_money_filtered.copy()
spark_mobile_money = spark.createDataFrame(pd_mobile_money)
spark_mobile_money_direction = identify_mobile_money_transaction_direction(spark_mobile_money)
spark_mobile_money_direction.toPandas().head()

In [None]:
# Get mobile money amount stats
mobile_money_amount_stats = get_mobile_money_amount_stats(spark_mobile_money_direction)
mobile_money_amount_stats.toPandas().head()

In [None]:
# Get mobile money transaction stats
mobile_money_transaction_stats = get_mobile_money_transaction_stats(spark_mobile_money_direction)
mobile_money_transaction_stats.toPandas().head()

In [None]:
# Get mobile money balance stats
mobile_money_balance_stats = get_mobile_money_balance_stats(spark_mobile_money_direction)
mobile_money_balance_stats.toPandas().head()

In [None]:
# Featurize recharge data

In [None]:
# Get recharge amount stats
spark_recharges = spark.createDataFrame(recharges_filtered)
recharge_amount_stats = get_recharge_amount_stats(spark_recharges)
recharge_amount_stats.toPandas().head()

In [None]:
recharge_amount_stats.toPandas().shape