In [None]:
from datetime import datetime, timedelta

import pyspark
import pyspark.sql.functions as f
import pyspark.sql.functions as F
import pyspark.sql.types as t
from IPython.core.interactiveshell import InteractiveShell
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

In [None]:
InteractiveShell.ast_node_interactivity = "all"

In [None]:
%%time
conf = (SparkConf()
        .setMaster('yarn-client')
        .setAppName('all-domain-weekly-agg')
        .set("spark.driver.maxResultSize", "10g")
        .set("spark.driver.memory", "16g")
        .set("spark.driver.memoryOverhead", "4096")
        .set("spark.dynamicAllocation.enabled", "true")
        .set("spark.dynamicAllocation.initialExecutors", "1")
        .set("spark.dynamicAllocation.maxExecutors", "25")
        .set("spark.dynamicAllocation.minExecutors", "1")
        .set("spark.executor.cores", "4")
        .set("spark.executor.memory", "16g")
        .set("spark.hadoop.fs.permissions.umask-mode", "002")
        .set("spark.kryoserializer.buffer.max", "512m")
        .set("spark.shuffle.service.enabled", "true")
        .set("spark.sql.broadcastTimeout", "1000")
        .set("spark.sql.hive.convertMetastoreParquet", "false")
        .set("spark.sql.parquet.compression.codec", "snappy")
        .set("spark.sql.shuffle.partitions", "1000")
        .set("spark.sql.sources.partitionOverwriteMode", "dynamic")
        .set("spark.yarn.driver.memoryOverhead", "4096")
        .set("spark.yarn.executor.memoryOverhead", "4096")
        .set("spark.yarn.maxAppAttempts", "2")
        .set("spark.yarn.queue", "root.hue_dmp")
        .set("yarn.nodemanager.vmem-check-enabled", "false")
        )
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

In [None]:
from utils import filter_latest_data, next_week_start_day


# Partition Size

In [None]:
partition_size_0 = 1
partition_size_1 = 100
partition_size_2 = 200
partition_size_3 = 300
partition_size_4 = 500
partition_size_5 = 1000
partition_size_6 = 2000
partition_size_7 = 5000
partition_size_8 = 10000
partition_size_9 = 20000
partition_size_10 = 50000

# Airtime Loan

In [None]:
timestamp_format = 'yyyy-MM-dd HH:mm:ss'


def fea_calc_airtime_loan_count_features(takers_df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    takers_df_weekstart = (takers_df
                           .withColumn("trx_date", from_unixtime(unix_timestamp(f.col('timestamp'), timestamp_format)))
                           .withColumn("weekstart", next_week_start_day("trx_date"))
                           )

    takers_with_count_df = (takers_df_weekstart
                            .groupBy("msisdn", "weekstart")
                            .agg(f.count(f.when(f.col("campaignid").isNotNull(), True)).alias("tot_airtime_loan_count"))
                            )

    return takers_df_weekstart.join(takers_with_count_df, ["msisdn", "weekstart"], "left")


def compute_dpd_per_weeekstart(takers_df: pyspark.sql.DataFrame,
                               payment_df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:

    # Left join takers and payment with msisdn, campaignid, month campaign
    takers_payment_joined = takers_df.alias('takers_df').join(
        payment_df.alias('payment_df'),
        (f.col('takers_df.msisdn') == f.col('payment_df.msisdn')) & (
                    f.col('takers_df.campaignid').substr(22, 4) == f.col('payment_df.campaignid').substr(24, 4)) & (
                    f.col('takers_df.month_campaign') == f.col('payment_df.month_campaign')),
        'left_outer'
    )

    # Convert timestamp columns for takers and payment
    takers_payment_joined_cleaned = takers_payment_joined.select(
        'takers_df.msisdn',
        'takers_df.weekstart',
        'takers_df.trx_date',
        f.col('takers_df.tot_airtime_loan_count'),
        from_unixtime(unix_timestamp(f.col('takers_df.timestamp'), timestamp_format)).alias('taker_timestamp'),
        from_unixtime(unix_timestamp(f.col('payment_df.timestamp'), timestamp_format)).alias('repayment_timestamp')
    )

    # Compute duration between dates (excluding start, including end)
    paid_takers_df = (takers_payment_joined_cleaned
                      # .filter(f.col('repayment_timestamp').isNotNull())
                      .withColumn('repayment_duration',
                                  f.datediff(f.col('repayment_timestamp'), f.col('taker_timestamp')))
                      )

    paid_takers_df_weekly = (paid_takers_df.groupBy("msisdn", "weekstart").agg(
            f.first(f.col("tot_airtime_loan_count")).alias("tot_airtime_loan_count"),
            f.max(f.col("repayment_duration")).alias("repayment_duration"),

            f.when(f.max(f.col("repayment_duration")).isNull(), None)
                .otherwise(f.count(f.when(f.col("repayment_duration") > 2, True))).alias("2_dpd_count"),
            f.when(f.max(f.col("repayment_duration")).isNull(), None)
                .otherwise(f.count(f.when(f.col("repayment_duration") > 10, True))).alias("10_dpd_count"),
            f.when(f.max(f.col("repayment_duration")).isNull(), None)
                .otherwise(f.count(f.when(f.col("repayment_duration") > 15, True))).alias("15_dpd_count"),
            f.when(f.max(f.col("repayment_duration")).isNull(), None)
                .otherwise(f.count(f.when(f.col("repayment_duration") > 20, True))).alias("20_dpd_count"),
            f.when(f.max(f.col("repayment_duration")).isNull(), None)
                .otherwise(f.count(f.when(f.col("repayment_duration") > 25, True))).alias("25_dpd_count")
        )
    )

    return paid_takers_df_weekly


def create_weekly_airtime_loan_table(
    df_airtime_loan_takers: pyspark.sql.DataFrame,
    df_airtime_loan_payment: pyspark.sql.DataFrame,
    partition_num: int,
) -> pyspark.sql.DataFrame:

    takers_df_with_atl_count = fea_calc_airtime_loan_count_features(df_airtime_loan_takers)
    airtime_loan_weekly = compute_dpd_per_weeekstart(takers_df_with_atl_count, df_airtime_loan_payment)

    return airtime_loan_weekly.repartition(numPartitions=partition_num)


# App Internet Usage

In [None]:
def create_bcp_usage_weekly(
    df_smy_bcp_usage_dd: pyspark.sql.DataFrame,
    partition_num: int,
) -> pyspark.sql.DataFrame:
    """
    Creates Weekly BCP Usage for the partner MSISDNs and required categories

    :param df_smy_bcp_usage_dd: BCP Daily Usage DataFrame

    :param is_incremental: Boolean Flag for Incremental Approach

    :param partition_num: Number of Partition

    :return: BCP Weekly Usage DataFrame
    """

    df_bcp = df_smy_bcp_usage_dd.withColumn(
        "trx_date", f.to_date(f.col("trx_date").cast(t.StringType()), "yyyyMMdd")
    ).withColumn("weekstart", next_week_start_day(f.col("trx_date")))

    # Aggregating weekly
    df_bcp_usage_weekly = df_bcp.groupBy(
        "msisdn", "weekstart", "accessed_app", "component"
    ).agg(
        f.sum(f.coalesce("volume_in", f.lit(0))).alias("volume_in"),
        f.sum(f.coalesce("volume_out", f.lit(0))).alias("volume_out"),
        f.min("trx_date").alias("min_trx_date"),
        f.max("trx_date").alias("max_trx_date"),
    )

    return df_bcp_usage_weekly.repartition(numPartitions=partition_num)


In [None]:
def create_bcp_weekly_feature_aggregation(
    df_bcp_usage_weekly: pyspark.sql.DataFrame,
    df_bcp_feature_mapping: pyspark.sql.DataFrame,
    partition_num: int,
) -> pyspark.sql.DataFrame:
    """
    Creates Weekly BCP Usage for the partner MSISDNs and required categories

    :param df_bcp_usage_weekly: BCP Weekly Usage DataFrame with group By Features

    :param df_bcp_feature_mapping: BCP Mapping

    :param is_incremental: Boolean Flag for Incremental Approach

    :return: BCP Weekly Usage DataFrame
    """

    # Get All Categories
    df_categories = df_bcp_feature_mapping.select(
        "accessed_app", "component", "category"
    ).distinct()

    df_bcp = df_bcp_usage_weekly.join(
        f.broadcast(df_categories), ["accessed_app", "component"], how="inner"
    )

    # Aggregating weekly
    df_bcp_usage_weekly = df_bcp.groupBy("msisdn", "weekstart", "category").agg(
        f.sum(f.coalesce("volume_in", f.lit(0))).alias("volume_in"),
        f.sum(f.coalesce("volume_out", f.lit(0))).alias("volume_out"),
        f.collect_set(f.col("accessed_app")).alias("accessed_apps_01w"),
        f.min("min_trx_date").alias("min_trx_date"),
        f.max("max_trx_date").alias("max_trx_date"),
    )

    return df_bcp_usage_weekly.repartition(numPartitions=partition_num)


# Customer Profile

In [None]:
def agg_customer_profile_to_weekly(
    cb_multidim_df: pyspark.sql.DataFrame, partition_num: int,
) -> pyspark.sql.DataFrame:
    """
    Returns customer loyalty tier and points usage features
    :param cb_multidim_df: Multidim table
    :return: cb_multidim_df with msisdn, weekstart and selected features (in select statement below)
    """
    multidim_weekstart = (
        cb_multidim_df.withColumn(
            "trx_date", f.to_date(f.col("trx_date").cast(t.StringType()), "yyyy-MM-dd")
        )
        .withColumn("weekday", f.date_format(f.col("trx_date"), "EEE"))
        .filter(f.col("weekday") == "Sun")
    )
    customer_profile_output = multidim_weekstart.select(
        f.col("msisdn"),
        next_week_start_day(f.col("trx_date")).alias("weekstart"),
        f.col("lte_usim_user_flag").alias("fea_custprof_lte_usim_user_flag"),
        f.col("area_sales").alias("fea_custprof_area_sales"),
        f.col("region_sales").alias("fea_custprof_region_sales"),
        f.col("bill_responsibility_type").alias(
            "fea_custprof_bill_responsibility_type"
        ),
        f.col("segment_data_user").alias("fea_custprof_segment_data_user"),
        f.col("los").alias("fea_custprof_los"),
        f.col("status").alias("fea_custprof_status"),
        f.col("brand").alias("fea_custprof_brand"),
        f.col("price_plan").alias("fea_custprof_price_plan"),
        f.col("cust_type_desc").alias("fea_custprof_cust_type_desc"),
        f.col("cust_subtype_desc").alias("fea_custprof_cust_subtype_desc"),
        f.col("segment_hvc_mtd").alias("fea_custprof_segment_hvc_mtd"),
        f.col("segment_hvc_m1").alias("fea_custprof_segment_hvc_m1"),
        f.col("loyalty_tier").alias("fea_custprof_loyalty_tier"),
        f.col("nik_gender").alias("fea_custprof_nik_gender"),
        f.col("nik_age").alias("fea_custprof_nik_age"),
        f.col("bill_cycle").alias("fea_custprof_bill_cycle"),
        f.col("persona_los").alias("fea_custprof_persona_los"),
        f.col("prsna_quadrant").alias("fea_custprof_prsna_quadrant"),
        f.col("arpu_segment_name").alias("fea_custprof_arpu_segment_name"),
        f.col("mytsel_user_flag").alias("fe_custprofa_mytsel_user_flag"),
    )
    return customer_profile_output.repartition(numPartitions=partition_num)


# Handset 1

In [None]:
def create_handset_lookup_data(
    df_handset_dim: pyspark.sql.DataFrame, partition_num: int
) -> pyspark.sql.DataFrame:
    """
    Filters any one device per IMEI
    :param df_handset_dim: Lookup static table for each IMEI
    :param partition_num: Number of Partition
    :return df_device_lookup: One row (device) per IMEI
    """

    # Selecting one device per IMEI
    df_handset_dim = df_handset_dim.withColumn(
        "row_number",
        f.row_number().over(Window.partitionBy("tac").orderBy("device_type")),
    )

    df_handset_lookup = df_handset_dim.filter(f.col("row_number") == 1).select(
        "tac",
        "manufacturer",
        "market_name",
        "device_type",
        "band",
        "gprs_capable_flag",
        "edge_capable_flag",
        "umts_capable_flag",
        "hsdpa_capable_flag",
        f.col("lte_capable_flag").alias("dim_lte_capable_flag"),
        "highest_network",
        "data_capable_flag",
        "wlan_capable_flag",
        "bluetooth_capable_flag",
        "dual_sim_flag",
        "nfc_capable_flag",
        "form_factor",
        "os_version",
        "os_name",
        f.col("source_name").alias("dim_source_name"),
        "allocation_date",
        "otap_capable_flag",
        "volte_capable_flag",
    )

    return df_handset_lookup.repartition(numPartitions=partition_num)


# Handset 2

In [None]:
def create_handset_weekly_data(
    df_handset_dd: pyspark.sql.DataFrame,
    df_handset_lookup: pyspark.sql.DataFrame,
    partition_num: int,
) -> pyspark.sql.DataFrame:
    """
    Aggregates device_dd table to msisdn, weekstart
    :param df_handset_dd: Handset daily level data
    :param df_handset_lookup: Handset Lookup data
    :param is_incremental: Flag for Incremental Approach
    :param partition_num: Number of Partition
    :return df_handset_weekly: Weekly Aggregated Device data
    """
    df_handset_dd = (
        df_handset_dd.join(
            df_handset_lookup,
            df_handset_dd["imei"].substr(1, 8) == df_handset_lookup["tac"],
            how="inner",
        )
        .withColumn(
            "trx_date", f.to_date(f.col("event_date").cast(t.StringType()), "yyyy-MM-dd")
        )
        .withColumn("weekstart", next_week_start_day(f.col("trx_date")))
    )

    df_handset_weekly = df_handset_dd.groupBy("msisdn", "weekstart").agg(
        f.collect_set("manufacturer").alias("manufacturer"),
        f.collect_set("imei").alias("imeis"),
        f.collect_set("device_type").alias("device_types"),
        f.collect_set("market_name").alias("market_names"),
        f.collect_set("band").alias("bands"),
        f.collect_set("gprs_capable_flag").alias("gprs_capable_flags"),
        f.collect_set("edge_capable_flag").alias("edge_capable_flags"),
        f.collect_set("umts_capable_flag").alias("umts_capable_flags"),
        f.collect_set("hsdpa_capable_flag").alias("hsdpa_capable_flags"),
        f.collect_set("lte_capable_flag").alias("lte_capable_flags"),
        f.collect_set("highest_network").alias("highest_networks"),
        f.collect_set("data_capable_flag").alias("data_capable_flags"),
        f.collect_set("wlan_capable_flag").alias("wlan_capable_flags"),
        f.collect_set("bluetooth_capable_flag").alias("bluetooth_capable_flags"),
        f.collect_set("dual_sim_flag").alias("dual_sim_flags"),
        f.collect_set("nfc_capable_flag").alias("nfc_capable_flags"),
        f.collect_set("form_factor").alias("form_factors"),
        f.collect_set("os_version").alias("os_versions"),
        f.collect_set("os_name").alias("os_names"),
        f.collect_set("source_name").alias("source_names"),
        f.collect_set("allocation_date").alias("allocation_dates"),
        f.collect_set("otap_capable_flag").alias("otap_capable_flags"),
        f.collect_set("volte_capable_flag").alias("volte_capable_flags"),
    )

    return df_handset_weekly.repartition(numPartitions=partition_num)


# Handset 3

In [None]:
def calculate_first_2_handset(
    df_handset_mm: pyspark.sql.DataFrame,
    df_handset_lookup: pyspark.sql.DataFrame,
    partition_num: int,
) -> pyspark.sql.DataFrame:
    """
    Gets first and second IMEI per msisdn
    :param df_handset_mm: Handset monthly data
    :param df_handset_lookup: Lookup static table for each IMEI
    :param is_incremental: Flag for Incremental Approach
    :param partition_num: Number of Partition
    :return df_first_2_handset : first & second IMEI for each msisdn
    """

    # Fetch Latest Record for each MSISDN per month
    df_handset_mm = df_handset_mm.withColumn(
        "row_number",
        f.row_number().over(
            Window.partitionBy("msisdn", "mm_date").orderBy(f.col("event_date").desc())
        ),
    )

    df_handset_mm = df_handset_mm.filter(f.col("row_number") == 1).select(
        ["msisdn", "mm_date", "first_imei", "second_imei"]
    )

    # Joining lookup table to get manufacturer for First & Second IMEI
    df_first_2_handset = df_handset_mm.join(
        df_handset_lookup,
        df_handset_mm["first_imei"].substr(1, 8) == df_handset_lookup["tac"],
        how="inner",
    ).select(
        "msisdn",
        "mm_date",
        f.col("manufacturer").alias("first_handset_manufacturer"),
        "second_imei",
    )

    df_first_2_handset = df_first_2_handset.join(
        df_handset_lookup,
        df_first_2_handset["second_imei"].substr(1, 8) == df_handset_lookup["tac"],
        how="left",
    ).select(
        "msisdn",
        "mm_date",
        "first_handset_manufacturer",
        f.col("manufacturer").alias("second_handset_manufacturer"),
    )

    df_first_2_handset = df_first_2_handset.withColumn(
        "month", f.trunc("mm_date", "month")
    ).select(
        "msisdn", "month", "first_handset_manufacturer", "second_handset_manufacturer"
    )

    return df_first_2_handset.repartition(numPartitions=partition_num)


# Internet Usage

In [None]:
def preprocess_to_abt_format(df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri']
    weekends = ['Sat', 'Sun']

    out = (df
           .groupBy("msisdn", "trx_date", "access_type")
           .agg(
            F.col("msisdn"),
            F.col("trx_date"),
            F.sum("bucket_usage").alias("tot_kb"),
            F.sum("trx").alias("tot_trx"),
            F.when(F.col("access_type") == "2", F.sum(F.col("bucket_usage"))).otherwise(F.lit(0))
                .alias("vol_data_2g_kb"),
            F.when(F.col("access_type") == "1", F.sum(F.col("bucket_usage"))).otherwise(F.lit(0))
                .alias("vol_data_3g_kb"),
            F.when(F.col("access_type") == "6", F.sum(F.col("bucket_usage"))).otherwise(F.lit(0))
                .alias("vol_data_4g_kb")
            ).groupBy("msisdn", "trx_date").agg(
                F.col("msisdn"),
                F.col("trx_date"),
                F.sum(F.col("tot_kb")).alias("tot_kb"),
                F.sum(F.col("tot_trx")).alias("tot_trx"),
                F.sum(F.col("vol_data_2g_kb")).alias("vol_data_2g_kb"),
                F.sum(F.col("vol_data_3g_kb")).alias("vol_data_3g_kb"),
                F.sum(F.col("vol_data_4g_kb")).alias("vol_data_4g_kb"),
            )
           .withColumn("tot_kb_weekday",
                       F.when(F.date_format(F.col("trx_date"), "E").isin(weekdays), F.col("tot_kb")).otherwise(
                           F.lit(0)))
           .withColumn("tot_kb_weekend",
                       F.when(F.date_format(F.col("trx_date"), "E").isin(weekends), F.col("tot_kb")).otherwise(
                           F.lit(0)))
           .withColumn("weekstart", next_week_start_day("trx_date"))
           )

    return out


def calculate_max_day_of_week(df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    """
    On a weekly basis, take the max usage day. Then on a rolling window basis (e.g. 90 days), find the day of week which
    is maximum among previous N weeks
    :param df: input dataframe with rolling aggregations
    :return: dataframe with additional column of max day of week usage
    """

    pivot_df = (df
                .withColumn("day_of_week", F.date_format(F.col("trx_date"), 'E'))
                .groupBy("msisdn", "weekstart")
                .pivot("day_of_week")
                .agg(
                    F.coalesce(F.sum("tot_kb"), F.lit(0)).alias("daily_sum_tot_kb")
                ))

    if not 'Mon' in pivot_df.columns:
        pivot_df = pivot_df.withColumn('Mon', F.lit(0))
    if not 'Tue' in df.columns:
        pivot_df = pivot_df.withColumn('Tue', F.lit(0))
    if not 'Wed' in df.columns:
        pivot_df = pivot_df.withColumn('Wed', F.lit(0))
    if not 'Thu' in pivot_df.columns:
        pivot_df = pivot_df.withColumn('Thu', F.lit(0))
    if not 'Fri' in pivot_df.columns:
        pivot_df = pivot_df.withColumn('Fri', F.lit(0))
    if not 'Sat' in pivot_df.columns:
        pivot_df = pivot_df.withColumn('Sat', F.lit(0))
    if not 'Sun' in pivot_df.columns:
        pivot_df = pivot_df.withColumn('Sun', F.lit(0))

    argmax_udf = F.udf(lambda m: max(m, key=m.get), t.StringType())
    weekday_columns = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]

    output_df = (df.join(pivot_df, ["msisdn", "weekstart"])
                 .fillna(0, weekday_columns)
                 .withColumn("Mon", F.sum(F.col("Mon")).over(get_rolling_window(90)))
                 .withColumn("Tue", F.sum(F.col("Tue")).over(get_rolling_window(90)))
                 .withColumn("Wed", F.sum(F.col("Wed")).over(get_rolling_window(90)))
                 .withColumn("Thu", F.sum(F.col("Thu")).over(get_rolling_window(90)))
                 .withColumn("Fri", F.sum(F.col("Fri")).over(get_rolling_window(90)))
                 .withColumn("Sat", F.sum(F.col("Sat")).over(get_rolling_window(90)))
                 .withColumn("Sun", F.sum(F.col("Sun")).over(get_rolling_window(90)))
                 .withColumn("fea_max_usage_day_90d", F.when(F.col("tot_kb") > 0, argmax_udf(
                    F.create_map(list(
                        chain(*[(F.lit(c), F.col(c)) for c in weekday_columns]))))
                    ).otherwise(None)
                 ))

    return output_df


def week_grouping(df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    return df.groupBy("msisdn", "weekstart").agg(
        F.col("msisdn"),
        F.first(F.col("fea_max_usage_day_90d")).alias("fea_int_usage_max_usage_day_90d"),
        F.sum(F.col("tot_kb")).alias("tot_kb"),
        F.sum(F.col("tot_trx")).alias("tot_trx"),
        F.sum(F.col("tot_kb_weekday")).alias("tot_kb_weekday"),
        F.sum(F.col("tot_kb_weekend")).alias("tot_kb_weekend"),
        F.sum(F.col("vol_data_2g_kb")).alias("vol_data_2g_kb"),
        F.sum(F.col("vol_data_3g_kb")).alias("vol_data_3g_kb"),
        F.sum(F.col("vol_data_4g_kb")).alias("vol_data_4g_kb"),
    )


def create_weekly_internet_usage_table(
    df_internet_usage: pyspark.sql.DataFrame,
    partition_num: int
) -> pyspark.sql.DataFrame:

    preprocessed = preprocess_to_abt_format(df_internet_usage)
    preprocessed_dow = calculate_max_day_of_week(preprocessed)
    df_internet_usage_grouped = week_grouping(preprocessed_dow)

    return df_internet_usage_grouped.repartition(numPartitions=partition_num)


# Recharge 1

In [None]:
def create_recharge_weekly(
    df_recharge_daily: pyspark.sql.DataFrame, partition_num: int
) -> pyspark.sql.DataFrame:
    """
    :param df_recharge_daily: Daily Recharge Data
    :param is_incremental: Flag for Incremental Approach
    :param partition_num: Number of Partition
    :return: df_agg: Weekly aggregated recharge table
    """

    df_recharge_daily = df_recharge_daily.withColumn(
        "trx_date", f.to_date(f.col("trx_date").cast(t.StringType()), "yyyy-MM-dd"),
    ).withColumn("weekstart", next_week_start_day(f.col("trx_date")))

    df_recharge_weekly = df_recharge_daily.groupBy("msisdn", "weekstart").agg(
        f.sum("tot_amt").alias("tot_amt"), f.sum("tot_trx").alias("tot_trx")
    )

    return df_recharge_weekly.repartition(numPartitions=partition_num)


# Recharge 2

In [None]:
def create_weekly_account_balance(
    df_bal: pyspark.sql.DataFrame, partition_num: int
) -> pyspark.sql.DataFrame:
    """
    :param df_bal: Balance Dataframe containing balance at daily level
    :param is_incremental: Flag for Incremental Approach
    :param partition_num: Number of Partition
    :return: Dataframe with the count of Negative or Zero Balance Day count for each MSISDNs on weekly basis
    """

    df_neg_balance = df_bal.withColumn(
        "weekstart", next_week_start_day("event_date")
    ).withColumn(
        "is_zero_or_neg", F.when((F.col("account_balance") <= 0), 1).otherwise(0)
    )

    df_neg_balance_count = df_neg_balance.groupBy("msisdn", "weekstart").agg(
        F.sum("is_zero_or_neg").alias("zero_or_neg_count")
    )

    return df_neg_balance_count.repartition(numPartitions=partition_num)


# Recharge 3

In [None]:
def create_package_purchase_weekly(
    df_package_purchase_daily: pyspark.sql.DataFrame,
    partition_num: int,
) -> pyspark.sql.DataFrame:
    """
    :param df_package_purchase_daily: Package Purchase Daily Data
    :param is_incremental: Flag for Incremental Approach
    :param partition_num: Number of Partition
    :return: SMS, Data & Roaming Recharge Weekly Aggregated Data
    """
    df_pkg_purchase = (
        df_package_purchase_daily.withColumn(
            "trx_date", f.to_date(f.col("trx_date").cast(t.StringType()), "yyyy-MM-dd"),
        )
        .withColumn("weekstart", next_week_start_day(f.col("trx_date")))
        .select(
            "msisdn",
            "weekstart",
            "trx_pkg_prchse",
            "rev_pkg_prchse",
            "trx_voice_pkg_prchs",
            "trx_sms_pkg_prchs",
            "trx_data_pkg_prchs",
            "trx_roam_pkg_prchs",
            "rev_voice_pkg_prchs",
            "rev_sms_pkg_prchs",
            "rev_data_pkg_prchs",
            "rev_roam_pkg_prchs",
        )
    )

    df_pkg_purchase_weekly = df_pkg_purchase.groupBy("msisdn", "weekstart").agg(
        f.sum("trx_pkg_prchse").cast(t.LongType()).alias("trx_pkg_prchse"),
        f.sum("rev_pkg_prchse").cast(t.LongType()).alias("rev_pkg_prchse"),
        f.sum("trx_voice_pkg_prchs").cast(t.LongType()).alias("trx_voice_pkg_prchs"),
        f.sum("trx_sms_pkg_prchs").cast(t.LongType()).alias("trx_sms_pkg_prchs"),
        f.sum("trx_data_pkg_prchs").cast(t.LongType()).alias("trx_data_pkg_prchs"),
        f.sum("trx_roam_pkg_prchs").cast(t.LongType()).alias("trx_roam_pkg_prchs"),
        f.sum("rev_voice_pkg_prchs").cast(t.LongType()).alias("rev_voice_pkg_prchs"),
        f.sum("rev_sms_pkg_prchs").cast(t.LongType()).alias("rev_sms_pkg_prchs"),
        f.sum("rev_data_pkg_prchs").cast(t.LongType()).alias("rev_data_pkg_prchs"),
        f.sum("rev_roam_pkg_prchs").cast(t.LongType()).alias("rev_roam_pkg_prchs"),
    )

    return df_pkg_purchase_weekly.repartition(numPartitions=partition_num)


# Revenue

In [None]:
def create_weekly_revenue_table(
    df_rech_mkios_dd: pyspark.sql.DataFrame, df_rech_urp_dd: pyspark.sql.DataFrame,
    partition_num: int,
) -> pyspark.sql.DataFrame:
    """Creates weekly recharge table by:
        1. Unions rech_mkios_dd and rech_urp_dd
        2. Aggregates tot_trx and tot_amt to weekly level for each msisdn

    Args:
        df_rech_mkios_dd: rech_mkios_dd daily level table
        df_rech_urp_dd: ech_urp_dd daily level table

    Returns:
        df_agg: Weekly aggregated recharge table
    """
    df_rech_mkios_dd = (
        df_rech_mkios_dd.withColumn("trx_date", f.to_date("trx_date", "yyyyMMdd"))
        .withColumn("weekstart", next_week_start_day(f.col("trx_date")))
        .withColumn("is_weekend", dayofweek("trx_date").isin([1, 7]).cast("int"))
        .select(
            "msisdn",
            "trx_date",
            "weekstart",
            "rev_voice",
            "rev_data",
            "rev_sms",
            "is_weekend",
        )
    )

    df_rech_urp_dd = (
        df_rech_urp_dd.withColumn("trx_date", f.to_date("trx_date", "yyyyMMdd"))
        .withColumn("weekstart", next_week_start_day(f.col("trx_date")))
        .withColumn("is_weekend", dayofweek("trx_date").isin([1, 7]).cast("int"))
        .select(
            "msisdn",
            "trx_date",
            "weekstart",
            "rev_voice_pkg_prchs",
            "rev_data_pkg_prchs",
            "rev_sms_pkg_prchs",
            "is_weekend",
        )
    )

    df = df_rech_mkios_dd.union(df_rech_urp_dd)

    df_agg = df.groupBy("msisdn", "weekstart").agg(
        f.sum("rev_voice").alias("rev_voice"),
        f.sum("rev_data").alias("rev_data"),
        f.sum("rev_sms").alias("rev_sms"),
        f.sum(f.when(f.col("is_weekend") == 1, f.col("rev_voice"))).alias(
            "rev_voice_weekend"
        ),
        f.sum(f.when(f.col("is_weekend") == 1, f.col("rev_data"))).alias(
            "rev_data_weekend"
        ),
        f.sum(f.when(f.col("is_weekend") == 1, f.col("rev_sms"))).alias(
            "rev_sms_weekend"
        ),
    )

    return df_agg.repartition(numPartitions=partition_num)


# Text Messaging

In [None]:
def create_text_messaging_weekly(
    df_abt_usage_mss_dd: pyspark.sql.DataFrame,
    partition_num: int
) -> pyspark.sql.DataFrame:
    """
    Creates Weekly Text Messaging for the partner MSISDNs

    :param df_abt_usage_mss_dd: Daily Text Messaging Usage DataFrame

    :return: Text Messaging Weekly DataFrame
    """
    SMS_IN_FLAG = "05_sms_in"
    SMS_OUT_FLAG = "04_sms_out"
    CALL_IN_FLAG = "02_call_in"
    CALL_OUT_FLAG = "01_call_out"
    CALL_TYPE_COL = "calltype"

    count_cond = lambda cond: F.sum(F.when(cond, F.col("total_trx")).otherwise(0))

    df_txt_msg = (
        df_abt_usage_mss_dd.withColumn("trx_date", F.to_date("trx_date", "yyyy-MM-dd"))
        .withColumn("weekstart", next_week_start_day("trx_date"))
        .withColumn("msisdn", F.col("anumber"))
    )

    df_txt_msg_w = df_txt_msg.groupBy("msisdn", "weekstart").agg(
        count_cond(F.col(CALL_TYPE_COL) == SMS_IN_FLAG).alias("count_txt_msg_incoming"),
        count_cond(F.col(CALL_TYPE_COL) == SMS_OUT_FLAG).alias(
            "count_txt_msg_outgoing"
        ),
        count_cond(
            (F.col(CALL_TYPE_COL) == SMS_OUT_FLAG)
            | (F.col(CALL_TYPE_COL) == SMS_IN_FLAG)
        ).alias("count_txt_msg_all"),
        count_cond(F.col(CALL_TYPE_COL) == CALL_IN_FLAG).alias("count_voice_incoming"),
        count_cond(F.col(CALL_TYPE_COL) == CALL_OUT_FLAG).alias("count_voice_outgoing"),
        count_cond(
            (F.col(CALL_TYPE_COL) == CALL_IN_FLAG)
            | (F.col(CALL_TYPE_COL) == CALL_OUT_FLAG)
        ).alias("count_voice_all"),
    )

    return df_txt_msg_w.repartition(numPartitions=partition_num)


# Commercial Text Message

In [None]:
def convert_hexa_to_text(df):
    df = df.withColumn(
        "bnumber_decoded",
        F.when(
            F.length("bnumber") % 2 != 0,
            F.expr("substring(bnumber, 0, length(bnumber)-1)"),
        ).otherwise(df["bnumber"]),
    )
    df = df.withColumn(
        "bnumber_decoded",
        F.when(
            ~df.bnumber.startswith("628"),
            F.decode(
                F.unhex(F.translate(F.col("bnumber_decoded"), ":;<=>?", "ABCDEF")),
                "US-ASCII",
            ),
        ).otherwise(df["bnumber"]),
    )
    df = df.withColumn(
        "bnumber_decoded",
        F.when(F.lower(df["bnumber_decoded"]) == "krediv", "kredivo").otherwise(
            df["bnumber_decoded"]
        ),
    )

    return df


def create_commercial_text_messaging_weekly(
    df_abt_usage_mss_dd: pyspark.sql.DataFrame,
    df_comm_text_mapping: pyspark.sql.DataFrame,
    partition_num: int,
) -> pyspark.sql.DataFrame:
    """
    Creates Weekly Commercial Text Messaging for the partner MSISDNs

    :param df_abt_usage_mss_dd: Daily Commercial Text Messaging Usage DataFrame

    :param df_comm_text_mapping: Mapping between Sender and Category

    :return: Text Messaging Weekly DataFrame
    """
    count_cond = lambda cond: F.sum(F.when(cond, F.col("total_trx")).otherwise(0))

    df_abt_usage_mss_dd = (
        df_abt_usage_mss_dd.withColumn(
            "weekstart", next_week_start_day(F.col("trx_date"))
        )
        .withColumn(
            "trx_date", F.to_date(F.col("trx_date").cast(t.StringType()), "yyyyMMdd")
        )
        .withColumn("msisdn", F.col("anumber"))
    )

    df_abt_usage_mss_dd = convert_hexa_to_text(df_abt_usage_mss_dd)

    df_comm = df_abt_usage_mss_dd.join(
        F.broadcast(df_comm_text_mapping),
        (
            F.lower(df_abt_usage_mss_dd.bnumber_decoded)
            == F.lower(df_comm_text_mapping.sender)
        ),
        how="inner",
    )

    df_weekly = df_comm.groupBy("msisdn", "weekstart", "category").agg(
        F.collect_set("sender").alias("senders_07d"),
        count_cond((F.col("calltype") == "05_sms_in")).alias("incoming_count_07d"),
        count_cond(
            (
                (
                    F.col("evalueserve_comments").isin(
                        {
                            "bureau of taxation and taxation as a sub-ordinate of the financial administration - public sector",
                            "tax office",
                        }
                    )
                )
            )
            & (F.col("calltype") == "05_sms_in")
        ).alias("count_txt_msg_incoming_government_tax"),
    )

    return df_weekly.repartition(numPartitions=partition_num)


# Voice Calling

In [None]:
def aggregate_voice_calling_to_weekly(
    df_voice: pyspark.sql.DataFrame, partition_num: int,
) -> pyspark.sql.DataFrame:
    """
    Aggregated voice calling data to weekly (msisdn, week)
    :param df_voice: Input dataframe for voice calling
    :param is_incremental: Flag for Incremental Approach
    :param partition_num: Number of Partition
    :return df_voice_agg: Weekly aggregated voice calling dataFrame
    """
    if ("anumber" in df_voice.columns) and ("msisdn" not in df_voice.columns):
        df_voice = df_voice.withColumnRenamed("anumber", "msisdn")

    df_voice_agg = (
        df_voice.withColumn(
            "trx_date", f.to_date(f.col("trx_date").cast(t.StringType()), "yyyy-MM-dd"),
        )
        .withColumn("weekstart", next_week_start_day(f.col("trx_date")))
        .groupBy("msisdn", "weekstart")
        .agg(
            f.sum(
                f.when(
                    f.col("calltype") == "02_call_in", f.col("total_duration")
                ).otherwise(0)
            ).alias("inc_call_dur"),
            f.sum(
                f.when(
                    f.col("calltype") == "01_call_out", f.col("total_duration")
                ).otherwise(0)
            ).alias("out_call_dur"),
            f.sum(f.col("duration_early_morning")).alias("total_dur_12_5_am"),
            f.collect_list(
                f.when(f.col("calltype") == "02_call_in", f.col("bnumber"))
            ).alias("in_call_nums"),
            f.collect_list(
                f.when(f.col("calltype") == "01_call_out", f.col("bnumber"))
            ).alias("out_call_nums"),
            f.collect_list(
                f.when(
                    f.col("calltype").isin(["02_call_in", "01_call_out"]),
                    f.col("bnumber"),
                )
            ).alias("inc_out_call_nums"),
        )
        .select(
            "msisdn",
            "weekstart",
            "inc_call_dur",
            "out_call_dur",
            "total_dur_12_5_am",
            "in_call_nums",
            "out_call_nums",
            "inc_out_call_nums",
        )
    )

    return df_voice_agg.repartition(numPartitions=partition_num)


# Voice Calls Old

In [None]:
def aggregate_voice_calling_to_weekly_old(
    df_voice: pyspark.sql.DataFrame, partition_num: int,
) -> pyspark.sql.DataFrame:
    """
    Aggregated voice calling data to weekly (msisdn, week)
    :param df_voice: Input dataframe for voice calling
    :param partition_num: Number of Partition
    :return df_voice_agg: Weekly aggregated voice calling dataFrame
    """
    if ("anumber" in df_voice.columns) and ("msisdn" not in df_voice.columns):
        df_voice = df_voice.withColumnRenamed("anumber", "msisdn")

    df_voice_agg = (
        df_voice.withColumn(
            "trx_date", f.to_date(f.col("day").cast(t.StringType()), "yyyyMMdd"),
        )
        .withColumn("weekstart", next_week_start_day(f.col("trx_date")))
        .groupBy("msisdn", "weekstart")
        .agg(
            f.sum(
                f.when(
                    f.col("calltype") == "02_call_in", f.col("total_duration")
                ).otherwise(0)
            ).alias("inc_call_dur"),
            f.sum(
                f.when(
                    f.col("calltype") == "01_call_out", f.col("total_duration")
                ).otherwise(0)
            ).alias("out_call_dur"),
            f.sum(f.col("duration_early_morning")).alias("total_dur_12_5_am"),
            f.collect_list(
                f.when(f.col("calltype") == "02_call_in", f.col("bnumber"))
            ).alias("in_call_nums"),
            f.collect_list(
                f.when(f.col("calltype") == "01_call_out", f.col("bnumber"))
            ).alias("out_call_nums"),
            f.collect_list(
                f.when(
                    f.col("calltype").isin(["02_call_in", "01_call_out"]),
                    f.col("bnumber"),
                )
            ).alias("inc_out_call_nums"),
        )
        .select(
            "msisdn",
            "inc_call_dur",
            "out_call_dur",
            "total_dur_12_5_am",
            "in_call_nums",
            "out_call_nums",
            "inc_out_call_nums",
        )
    )

    return df_voice_agg.repartition(numPartitions=partition_num)


# Weekly Aggregation

In [None]:
def weekly_aggregation(pipeline, start_date='20190101', end_date=date.today().strftime("%Y%m%d"), date_col_format='%Y-%m-%d'):
    start_date = datetime.strptime(start_date, '%Y%m%d')
    start_date = start_date - timedelta(days=start_date.weekday())
    end_date = datetime.strptime(end_date, '%Y%m%d')
    end_date = end_date + timedelta(days=(7 - end_date.weekday()))

    while start_date < end_date:
        dates = []
        for i in range(7):
            dates.append(start_date.strftime(date_col_format))
            start_date += timedelta(days=1)
        weekstart = start_date.strftime('%Y-%m-%d')

        print(dates)
        if 'bcp-1' == pipeline:
            bcp_weekly_path = "hdfs:///data/landing/gx_pnt/mck_dmp_pipeline/01_aggregation/internet_apps_usage/bcp_usage_weekly.parquet"
            df = spark.read.table("mig.smy_bcp_usage_dd").filter(f.col("trx_date").isin(dates))
            week_df = create_bcp_usage_weekly(df, partition_size_5)
            week_df.write.partitionBy('weekstart').mode('overwrite').parquet(bcp_weekly_path)
        elif 'bcp-2' == pipeline:
            bcp_weekly_path_1 = "hdfs:///data/landing/gx_pnt/mck_dmp_pipeline/01_aggregation/internet_apps_usage/bcp_usage_weekly.parquet/weekstart={weekstart}"
            bcp_feature_mapping_path = "hdfs:///data/landing/gx_pnt/mck_dmp_pipeline/01_aggregation/internet_apps_usage/bcp_feature_category_mapping.parquet"
            bcp_weekly_path_2 = "hdfs:///data/landing/gx_pnt/mck_dmp_pipeline/01_aggregation/internet_apps_usage/bcp_feature_weekly_aggregation.parquet"
            df_bcp_feature_mapping = spark.read.parquet(bcp_feature_mapping_path)
            df = spark.read.parquet(bcp_weekly_path_1.format(weekstart=weekstart))
            df = df.withColumn("weekstart", f.lit(weekstart))
            week_df = create_bcp_weekly_feature_aggregation(df, df_bcp_feature_mapping, partition_size_5)
            week_df.write.partitionBy('weekstart').mode('overwrite').parquet(bcp_weekly_path_2)
        elif "cust_prof" == pipeline:
            print("weekstart={weekstart}".format(weekstart=weekstart))
            print("event_date={event_date}".format(event_date=dates[0]))
            df = spark.read.table("cb.cb_multidim").filter(f.col("event_date").isin(dates[0]))
            week_df = agg_customer_profile_to_weekly(df, partition_size_3)
            cust_prof_path = "hdfs:///data/landing/gx_pnt/mck_dmp_pipeline/01_aggregation/customer_profile/customer_profile_weekly.parquet"
            week_df.write.partitionBy('weekstart').mode('overwrite').parquet(cust_prof_path)
        elif 'handset-1' == pipeline:
            mapp_data = spark.read.table("dim.device_dim")
            mapp_df = create_handset_lookup_data(mapp_data, partition_size_0)
            mapp_path = "hdfs:///data/landing/gx_pnt/mck_dmp_pipeline/01_aggregation/handset/device_dim_single_rec_per_imei.parquet"
            mapp_df.write.mode('overwrite').parquet(mapp_path)
            df = spark.read.table("smy.device_dd").filter(f.col("event_date").isin(dates))
            mapp_df = spark.read.parquet(mapp_path)
            week_df = create_handset_weekly_data(df, mapp_df, partition_size_5)
            handset_1_weekly_path = "hdfs:///data/landing/gx_pnt/mck_dmp_pipeline/01_aggregation/handset/device_data_weekly_2.parquet"
            week_df.write.partitionBy('weekstart').mode('overwrite').parquet(handset_1_weekly_path)
        elif "internet_usage" == pipeline:
            df = spark.read.table("smy.usage_upcc_dd").filter(f.col("event_date").isin(dates))
            week_df = preprocess_to_abt_format(df, partition_size_4)
            int_usage_path = "hdfs:///data/landing/gx_pnt/mck_dmp_pipeline/01_aggregation/internet_usage/internet_usage_weekly.parquet"
            week_df.write.partitionBy('weekstart').mode('overwrite').parquet(int_usage_path)
        elif 'recharge-1' == pipeline:
            df = spark.read.table("abt.rech_daily_abt_dd").filter(f.col("event_date").isin(dates))
            week_df = create_recharge_weekly(df, partition_size_3)
            recharge_1_weekly_path = "hdfs:///data/landing/gx_pnt/mck_dmp_pipeline/01_aggregation/recharge/rech_weekly.parquet"
            week_df.write.partitionBy('weekstart').mode('overwrite').parquet(recharge_1_weekly_path)
        elif 'recharge-2' == pipeline:
            df = spark.read.table("base.ocs_bal").filter(f.col("event_date").isin(dates))
            week_df = create_weekly_account_balance(df, partition_size_5)
            recharge_2_weekly_path = "hdfs:///data/landing/gx_pnt/mck_dmp_pipeline/01_aggregation/recharge/acc_bal_weekly.parquet"
            week_df.write.partitionBy('weekstart').mode('overwrite').parquet(recharge_2_weekly_path)
        elif 'recharge-3' == pipeline:
            df = spark.read.table("abt.usage_chg_pkg_prchse_abt_dd").filter(f.col("trx_date") >= "2019-06-03")
            week_df = create_package_purchase_weekly(df, partition_size_2)
            recharge_3_weekly_path = "hdfs:///data/landing/gx_pnt/mck_dmp_pipeline/01_aggregation/recharge/chg_pkg_prchse_weekly.parquet"
            week_df.write.partitionBy('weekstart').mode('overwrite').parquet(recharge_3_weekly_path)
        elif "revenue" == pipeline:
            df = spark.read.table("smy.usage_upcc_dd").filter(f.col("event_date").isin(dates))
            week_df = create_weekly_revenue_table(df, partition_size_5)
            int_usage_path = "hdfs:///data/landing/gx_pnt/mck_dmp_pipeline/01_aggregation/revenue/revenue_weekly.parquet"
            week_df.write.partitionBy('weekstart').mode('overwrite').parquet(int_usage_path)
        elif "sms-1" == pipeline:
            df = spark.read.table("abt.usage_mss_abt_dd").filter(f.col("event_date").isin(dates))
            week_df = create_text_messaging_weekly(df, partition_size_1)
            int_usage_path = "hdfs:///data/landing/gx_pnt/mck_dmp_pipeline/01_aggregation/sms/text_messaging_weekly.parquet"
            week_df.write.partitionBy('weekstart').mode('overwrite').parquet(int_usage_path)
        elif "sms-2" == pipeline:
            df = spark.read.table("abt.usage_mss_abt_dd").filter(f.col("event_date").isin(dates))
            mapp_df = spark.read.parquet("hdfs:///data/landing/gx_pnt/mck_dmp_common/kedro/02_primary/text_messaging_mapping/sms_sender_mapping.parquet")
            week_df = create_commercial_text_messaging_weekly(df, mapp_df, partition_size_1)
            int_usage_path = "hdfs:///data/landing/gx_pnt/mck_dmp_pipeline/01_aggregation/sms/commercial_text_messaging_weekly.parquet"
            week_df.write.partitionBy('weekstart').mode('overwrite').parquet(int_usage_path)
        elif 'voice-new' == pipeline:
            if weekstart > '2019-05-27'
                df = spark.read.table("abt.usage_mss_abt_dd").filter(f.col("event_date").isin(dates))
                week_df = aggregate_voice_calling_to_weekly(df, partition_size_3)
                voice_path = "hdfs:///data/landing/gx_pnt/mck_dmp_pipeline/01_aggregation/voice_calls/voice_calls_weekly_agg_2.parquet"
                week_df.write.partitionBy('weekstart').mode('overwrite').parquet(voice_path)
        elif 'voice-old' == pipeline:
            if weekstart <= '2019-05-27'
                df = spark.read.table("mck.t_mss_call_full_hist").filter(f.col(date_col).isin(int(dates[0]), int(dates[1]), int(dates[2]), int(dates[3]), int(dates[4]), int(dates[5]), int(dates[6])))
                week_df = aggregate_voice_calling_to_weekly(df, partition_size_3)
                voice_path = "hdfs:///data/landing/gx_pnt/mck_dmp_pipeline/01_aggregation/voice_calls/voice_calls_weekly_agg_2.parquet"
                week_df.write.partitionBy('weekstart').mode('overwrite').parquet(voice_path)


# Weekly Aggregation Calls

In [None]:

# weekly_aggregation("cust_prof", start_date="20200203", date_col_format='%Y-%m-%d')
# weekly_aggregation("handset-1", start_date="20200120", date_col_format='%Y-%m-%d')
# weekly_aggregation("bcp", start_date="20200127", date_col_format='%Y%m%d')
# weekly_aggregation("internet_usage", start_date="20200113", date_col_format='%Y-%m-%d')
# weekly_aggregation("recharge-1", start_date="20200127", date_col_format='%Y-%m-%d')
# weekly_aggregation("recharge-2", start_date="20200127", date_col_format='%Y-%m-%d')
# weekly_aggregation("recharge-3", start_date="20190603", end_date="20190604", date_col_format='%Y-%m-%d')

# voice_old_weekly_aggregation(start_date='20190128', end_date='20190203')
# voice_old_weekly_aggregation(start_date='20190121', end_date='20190127')
# voice_old_weekly_aggregation(start_date='20190114', end_date='20190120')

# weekly_aggregation("sms-1", start_date="20200203", date_col_format='%Y-%m-%d')
# weekly_aggregation("sms-2", start_date="20200203", date_col_format='%Y-%m-%d')

# weekly_aggregation("bcp-2", start_date="20200127", end_date="20200202", date_col_format='%Y%m%d')
# weekly_aggregation("bcp-1", start_date="20200203", date_col_format='%Y%m%d')
# weekly_aggregation("bcp-2", start_date="20200203", date_col_format='%Y-%m-%d')

# weekly_aggregation("handset-1", start_date="20200120", date_col_format='%Y-%m-%d')
# weekly_aggregation("handset-1", start_date="20190101", end_date="20190203", date_col_format='%Y-%m-%d')

# weekly_aggregation("cust_prof", start_date="20200203", end_date="20200215", date_col_format='%Y-%m-%d')
# weekly_aggregation("cust_prof", start_date="20190401", end_date="20190526", date_col_format='%Y-%m-%d')

# weekly_aggregation("internet_usage", start_date="20200113", date_col_format='%Y-%m-%d')

# weekly_aggregation("recharge-1", start_date="20190101", date_col_format='%Y-%m-%d')
# weekly_aggregation("recharge-2", start_date="20200127", date_col_format='%Y-%m-%d')
# weekly_aggregation("recharge-3", start_date="20200203", date_col_format='%Y-%m-%d')
