## s1 environment setup

In [0]:
### libraries
import pyspark
import os

import re
import numpy as np

from pyspark import sql 
from pyspark.sql import Window
from pyspark.sql import functions as f

### utility functions

In [0]:
%run "../utility_functions/spkdf_utils"

In [0]:
%run "../utility_functions/utility_functions"

In [0]:
%run "../utility_functions/misc"

### directories

In [0]:
dir_data_parent = "/mnt/feature-store-dev/feature-store-business"
dir_data_parent_shared = os.path.join(dir_data_parent, "dev_shared")
dir_data_parent_users = os.path.join(dir_data_parent, "dev_users/dev_sc/2025q1_bfs")

In [0]:
dir_data_raw =  os.path.join(dir_data_parent_shared, 'd100_raw')
dir_data_meta = os.path.join(dir_data_parent_users, 'd000_meta')
dir_data_stg = os.path.join(dir_data_parent_users, "d200_staging")
dir_data_int =  os.path.join(dir_data_parent_users, "d200_intermediate")
dir_data_prm =  os.path.join(dir_data_parent_users, "d300_primary")
dir_data_fea =  os.path.join(dir_data_parent_users, "d400_feature")
dir_data_mvmt = os.path.join(dir_data_parent_users, "d500_movement")
dir_data_serv = os.path.join(dir_data_parent_users, "d600_serving")
dir_data_tmp =  os.path.join(dir_data_parent_users, "d999_tmp")

## s2 data import

In [0]:
df_global_calendar_meta = spark.read.format('delta').load(os.path.join(dir_data_meta, "d001_global_cycle_calendar"))
df_fsr_field_meta = spark.read.format("delta").load(os.path.join(dir_data_meta, "d004_fsr_meta/fsr_field_meta"))

df_stg_cust = spark.read.format("delta").load(os.path.join(dir_data_stg, "staging_customer"))
df_stg_srvc = spark.read.format("delta").load(os.path.join(dir_data_stg, "staging_service"))

In [0]:
display(df_stg_cust.limit(10))
display(df_stg_srvc.limit(10))
display(df_global_calendar_meta.limit(10))

## s3 data processing

In [0]:
vt_param_ssc_start_date = '2025-03-24'
vt_param_ssc_end_date = '2025-04-06'
vt_param_ssc_reporting_date = '2025-04-06'
vt_param_ssc_reporting_cycle_type = 'rolling cycle'

In [0]:
df_base_stag_cust_curr = (
    df_stg_cust
    # identify the active customer base within the required period
    .filter(
        (f.col("record_start_date_time") <= vt_param_ssc_end_date)
        & (f.col("record_end_date_time") >= vt_param_ssc_start_date)
        & (f.col('converged_status')!= 'Services Inactive')
    )
    # pick the latest record within required period per customer_id 
    .withColumn(
        'row_index'
        , f.row_number().over(
            Window
            .partitionBy('customer_id')
            .orderBy(
                f.desc('record_start_date_time')
                , f.desc('record_end_date_time')
                , f.desc('record_update_date_time')
            )
        )
    )
    .withColumn(
        'cust_status_change_cnt'
        , f.count('*').over(
                        Window
                        .partitionBy("customer_id")
        )
    )
    .withColumn(
        'cust_status_change_cnt'
        , f.col("cust_status_change_cnt") - 1
    )
   .filter(f.col('row_index') == 1)
   .drop('row_index')
   .drop('customer_status_name')
   .withColumnRenamed('customer_id', 'fs_cust_id')
   .withColumnRenamed('account_source_id', 'sf_id')
   .withColumnRenamed('record_start_date_time', 'cust_status_start_date_time')
   .withColumnRenamed('record_end_date_time', 'cust_status_end_date_time')
   .withColumnRenamed('record_update_date_time', 'cust_status_update_date_time')
)

In [0]:
df_base_stag_srvc_curr = (
    df_stg_srvc
    # identify the unit base within the required period
    .filter(
        (f.col("record_start_date_time") <= vt_param_ssc_end_date)
        & (f.col("record_end_date_time") >= vt_param_ssc_start_date)
        & (f.col("plan_status") != 'Inactive')
        & (f.col("plan_status") != 'Deleted')
    )
    # pick the latest record within required period per customer_id 
    .withColumn(
        'row_index'
        , f.row_number().over(
            Window
            .partitionBy('service_id')
            .orderBy(
                f.desc('record_start_date_time')
                , f.desc('record_end_date_time')
                , f.desc('record_update_date_time')
            )
        )
    )
    .withColumn(
        'srvc_status_change_cnt'
        , f.count('*').over(
                        Window
                        .partitionBy("customer_id")
        )
    )
    .withColumn(
        'srvc_status_change_cnt'
        , f.col("srvc_status_change_cnt") - 1
    )
   .filter(f.col('row_index') == 1)
   .drop('row_index')
   .withColumnRenamed('service_id', 'fs_srvc_id')
   .withColumnRenamed('customer_id', 'fs_cust_id')
   .withColumnRenamed('billing_account_number', 'fs_acct_id')
   .withColumnRenamed('record_start_date_time', 'srvc_status_start_date_time')
   .withColumnRenamed('record_end_date_time', 'srvc_status_end_date_time')
   .withColumnRenamed('record_update_date_time', 'srvc_status_update_date_time')
)

In [0]:
display(
    df_base_stag_srvc_curr
    .agg(
        f.count('*')
        , f.countDistinct('fs_srvc_id')
        #, f.countDistinct('account_source_id')
    ) 
)


display(
    df_base_stag_cust_curr
    .agg(
        f.count('*')
        , f.countDistinct('fs_cust_id')
        , f.countDistinct('sf_id')
    )    
)

In [0]:
df_output_curr = (
    df_base_stag_cust_curr
    .join(df_base_stag_srvc_curr, ['fs_cust_id'], 'inner')
    .withColumn('reporting_date', f.lit(vt_param_ssc_reporting_date))
    .withColumn('reporting_cycle_type', f.lit(vt_param_ssc_reporting_cycle_type))
    .withColumn('data_update_date', f.current_date())
    .withColumn('data_udpate_dttm', f.current_timestamp())
)

In [0]:
display(df_output_curr.limit(10))

In [0]:
display(
    df_output_curr
    .agg(
        f.count('*')
        , f.countDistinct('sf_id')
        , f.countDistinct('fs_cust_id')
        , f.countDistinct('fs_acct_id')
        , f.countDistinct('fs_srvc_id')
    )     
)

In [0]:
(
    df_output_curr
    .write 
    .format("delta") 
    .mode("overwrite") 
    .partitionBy("reporting_date") 
    .save(os.path.join(dir_data_int,'int_scc_service'))
)

In [0]:
df_test = spark.read.format('delta').load('/mnt/feature-store-dev/feature-store-business/dev_users/dev_sc/2025q1_bfs/d200_intermediate/int_scc_service')

In [0]:
display(
    df_test
    .agg(
        f.count('*')
        , f.countDistinct('sf_id')
        , f.countDistinct('fs_cust_id')
        , f.countDistinct('fs_acct_id')
        , f.countDistinct('fs_srvc_id')
    )     
)

display(df_test.limit(10))

In [0]:
def show_group_counts(df, group_cols, id_cols):
    """
    For each column in group_cols, groups df by that column and shows:
     - total row count
     - distinct counts of each col in id_cols
    """
    # pre-build the aggregate expressions once
    agg_exprs = [f.count("*").alias("total_count")] + [
        f.countDistinct(c).alias(f"distinct_{c}") for c in id_cols
    ]

    for col in group_cols:
        print(f"▶︎ grouping by: {col}")
        display(
            df
            #.filter(f.col('sf_id')!= '001w0000014sdZsAAI')
            .groupBy(col)
            .agg(*agg_exprs)
            .withColumn("pct", f.col("total_count") / f.sum("total_count").over(Window.partitionBy()))
            .withColumn('pct%', f.col("pct")*100)
            #.drop("pct")
            .orderBy(f.desc("total_count"))
        )

# then just call it with your lists:
group_columns = [
    "customer_type",
    "customer_mkt_segment",
    "sales_segment_new",
    "service_segment",
    "source_system_code",
    "service_type_name",
    "sf_ent_account_status_name", 
    "converged_status", 
    "paymt_meth_cd", 
    "proposition_name", 
    "plan_status", 
    "contract_term", 
    "number_of_employees",
    "owner_email", 
    "plan_name"
]

id_columns = [
    "sf_id",
    "fs_cust_id",
    "fs_acct_id",
    "fs_srvc_id",
]

show_group_counts(df_test, group_columns, id_columns)


In [0]:
def show_group_counts(df, group_cols, id_cols):
    """
    For each column in group_cols, groups df by that column and shows:
     - total row count
     - distinct counts of each col in id_cols
    """
    # pre-build the aggregate expressions once
    agg_exprs = [f.count("*").alias("total_count")] + [
        f.countDistinct(c).alias(f"distinct_{c}") for c in id_cols
    ]

    for col in group_cols:
        print(f"▶︎ grouping by: {col}")
        display(
            df
            .filter(f.col('sf_id')!= '001w0000014sdZsAAI')
            .groupBy(col)
            .agg(*agg_exprs)
            .withColumn("pct", f.col("total_count") / f.sum("total_count").over(Window.partitionBy()))
            .withColumn('pct%', f.col("pct")*100)
            #.drop("pct")
            .orderBy(f.desc("total_count"))
        )

# then just call it with your lists:
group_columns = [
    "customer_type",
    "customer_mkt_segment",
    "sales_segment_new",
    "service_segment",
    "source_system_code",
    "service_type_name",
    "sf_ent_account_status_name", 
    "converged_status", 
    "paymt_meth_cd", 
    "proposition_name", 
    "plan_status", 
    "contract_term", 
    "number_of_employees",
    "owner_email", 
    "plan_name"
]

id_columns = [
    "sf_id",
    "fs_cust_id",
    "fs_acct_id",
    "fs_srvc_id",
]

show_group_counts(df_test, group_columns, id_columns)


In [0]:
display(
    df_test
    .filter(f.col('plan_status') =='Unknown')
    .groupBy(
        f.col('proposition_name')
        , f.col('plan_name')
    )
    .agg(f.count('*'))
)

display(
    df_test
    .filter(f.col('number_of_employees') == '7882817')
    .limit(10)
)

In [0]:
display(
    df_test
    .filter(f.col('source_system_code')!= 'SBL')
    #.filter(f.col('converged_status') == 'Unknown')
    .groupBy('converged_status','source_system_code')
    .agg(f.count('*'))
    .limit(100)
)