In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5


import sys
import os
import json
import boto3
import logging
import functools
import numpy as np
import pandas as pd
from datetime import datetime
from awsglue.transforms import *
from pyspark.storagelevel import StorageLevel
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType, DateType
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
 
glueContext._jsc.hadoopConfiguration().set("fs.s3.useRequesterPaysHeader","true") ## this is needed for permissions
spark._jsc.hadoopConfiguration().set("fs.s3.useRequesterPaysHeader","true") ## this is needed for permissions
 
spark = glueContext.spark_session
spark.catalog.clearCache()
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 06ce504c-353f-459c-8752-70629e3ddef2
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
Waiting for session 06ce504c-353f-459c-8752-70629e3ddef2 to get into ready status...
Session 06ce504c-353f-459c-8752-70629e3ddef2 ha

In [2]:
# -------- AWS S3 Configuration -------- #
# Initialize the S3 client for data access
s3_client = boto3.client('s3')




In [3]:
# Define your S3 bucket name and the file path within the bucket
bucket_name = 'cci-dig-aicoe-data-sb'
file_key = 'processed/containment_metric/containment_metric_config_json/containment_metric.json'




In [4]:
read_json = s3_client.get_object(Bucket=bucket_name, Key=file_key)

# Load the file content into the config_data variable
config_data = json.loads(read_json['Body'].read().decode('utf-8'))
print(config_data)

{'features': [{'feature_name': 'SMART HELP', 'feature_id': '15', 'metrics': [{'Primary_intent': 'Equipment Support', 'Primary_intent_detail': 'SmartHelp', 'cont_display_metric_name': 'Smarthelp_containment_rate', 'cont_display_metric_seq': '1'}, {'Primary_intent': 'Equipment Support', 'Primary_intent_detail': 'PnP', 'cont_display_metric_name': 'PnP_containment_rate', 'cont_display_metric_seq': '2'}]}, {'feature_name': 'BIL LING', 'feature_id': '1', 'metrics': [{'Primary_intent': 'Billing', 'Primary_intent_detail': 'Billing Concern', 'cont_display_metric_name': 'Billing_containment_rate', 'cont_display_metric_seq': '1'}, {'Primary_intent': 'Billing', 'Primary_intent_detail': 'Billing General', 'cont_display_metric_name': 'Billing_containment_rate', 'cont_display_metric_seq': '1'}, {'Primary_intent': 'Billing', 'Primary_intent_detail': 'Billing Preferences', 'cont_display_metric_name': 'Billing_containment_rate', 'cont_display_metric_seq': '1'}, {'Primary_intent': 'Billing', 'Primary_int

In [5]:
def create_dataframe_from_json(data):
    # Flatten the JSON and create a list of dictionaries
    flattened_data = []

    for feature in data["features"]:
        feature_name = feature["feature_name"]
        feature_id = feature["feature_id"]

        for metric in feature["metrics"]:
            flattened_data.append({
                "hs_feature_name": feature_name,
                "hs_feature_id": feature_id,
                "primary_intent": metric["Primary_intent"],
                "primary_intent_detail": metric["Primary_intent_detail"],
                "cont_display_metric_name": metric["cont_display_metric_name"],
                "cont_display_metric_seq": metric["cont_display_metric_seq"]
            })

    # Create the DataFrame
    df = pd.DataFrame(flattened_data)

    # Add the 'Containment_metric_id' column as a sequence number
    df["id"] = range(1, len(df) + 1)
    df['create_dt'] = pd.to_datetime('today').normalize().date()

    # Reorder the columns if needed
    df = df[["id", "hs_feature_name", "hs_feature_id", "primary_intent", "primary_intent_detail", "cont_display_metric_name", "cont_display_metric_seq", "create_dt"]]

    return df




In [6]:
# Call the function with JSON data
df = create_dataframe_from_json(config_data)

# Print the DataFrame
pd.set_option('display.max_columns', None)  # Show all columns
pd.set_option('display.max_rows', 500)
print(df.head())

   id hs_feature_name hs_feature_id     primary_intent primary_intent_detail  \
0   1      SMART HELP            15  Equipment Support             SmartHelp   
1   2      SMART HELP            15  Equipment Support                   PnP   
2   3        BIL LING             1            Billing       Billing Concern   
3   4        BIL LING             1            Billing       Billing General   
4   5        BIL LING             1            Billing   Billing Preferences   

     cont_display_metric_name cont_display_metric_seq   create_dt  
0  Smarthelp_containment_rate                       1  2025-02-27  
1        PnP_containment_rate                       2  2025-02-27  
2    Billing_containment_rate                       1  2025-02-27  
3    Billing_containment_rate                       1  2025-02-27  
4    Billing_containment_rate                       1  2025-02-27


In [7]:
first_table_spark_df = spark.createDataFrame(df)
first_table_spark_df.show()

+---+---------------+-------------+-----------------+---------------------+------------------------+-----------------------+----------+
| id|hs_feature_name|hs_feature_id|   primary_intent|primary_intent_detail|cont_display_metric_name|cont_display_metric_seq| create_dt|
+---+---------------+-------------+-----------------+---------------------+------------------------+-----------------------+----------+
|  1|     SMART HELP|           15|Equipment Support|            SmartHelp|    Smarthelp_contain...|                      1|2025-02-27|
|  2|     SMART HELP|           15|Equipment Support|                  PnP|    PnP_containment_rate|                      2|2025-02-27|
|  3|       BIL LING|            1|          Billing|      Billing Concern|    Billing_containme...|                      1|2025-02-27|
|  4|       BIL LING|            1|          Billing|      Billing General|    Billing_containme...|                      1|2025-02-27|
|  5|       BIL LING|            1|          Bil

In [8]:
first_table_spark_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- hs_feature_name: string (nullable = true)
 |-- hs_feature_id: string (nullable = true)
 |-- primary_intent: string (nullable = true)
 |-- primary_intent_detail: string (nullable = true)
 |-- cont_display_metric_name: string (nullable = true)
 |-- cont_display_metric_seq: string (nullable = true)
 |-- create_dt: date (nullable = true)


In [9]:
first_table_spark_df.count()

174


In [10]:
from pyspark.sql.functions import col

# Cast the columns to integer
first_table_spark_df_transformed = first_table_spark_df.withColumn("id", col("id").cast("int")) \
       .withColumn("hs_feature_id", col("hs_feature_id").cast("int")) \
       .withColumn("cont_display_metric_seq", col("cont_display_metric_seq").cast("int"))

# Show the schema to verify the changes
first_table_spark_df_transformed.printSchema()


root
 |-- id: integer (nullable = true)
 |-- hs_feature_name: string (nullable = true)
 |-- hs_feature_id: integer (nullable = true)
 |-- primary_intent: string (nullable = true)
 |-- primary_intent_detail: string (nullable = true)
 |-- cont_display_metric_name: string (nullable = true)
 |-- cont_display_metric_seq: integer (nullable = true)
 |-- create_dt: date (nullable = true)


In [12]:
first_output_path = "s3://cci-dig-aicoe-data-sb/processed/containment_metric/containment_metric_master_new/"
firsttable_writedf = first_table_spark_df_transformed.coalesce(1).write.format("parquet").mode("append").save(first_output_path)




In [11]:
omni_raw_table_query = f"""
    SELECT 
        primary_intent,
        primary_intent_detail,
        sub_contact_id,	
        contact_id,
        selfservice_containment,
        initial_channel,
        lob,
        contact_dt 
    FROM 
        ota_data_assets_temp.omni_intent_cntct_fact 
    WHERE 
        CAST(contact_dt AS DATE) BETWEEN 
            date_add((SELECT max(CAST(contact_dt AS DATE)) FROM ota_data_assets_temp.omni_intent_cntct_fact), -90) 
            AND (SELECT max(CAST(contact_dt AS DATE)) FROM ota_data_assets_temp.omni_intent_cntct_fact)
        AND initial_channel = 'CoxApp'
        AND lob = 'R'
    """




In [12]:
# Execute the query (assuming using Spark SQL)
omni_raw_table_df = spark.sql(omni_raw_table_query)




In [13]:
omni_raw_table_df.show()

+-----------------+---------------------+--------------------+--------------------+-----------------------+---------------+---+----------+
|   primary_intent|primary_intent_detail|      sub_contact_id|          contact_id|selfservice_containment|initial_channel|lob|contact_dt|
+-----------------+---------------------+--------------------+--------------------+-----------------------+---------------+---+----------+
|          Billing|             Pay Bill|02ee7603a58240f19...|02ee7603a58240f19...|                      1|         CoxApp|  R|2024-10-13|
|        No Intent|                 None|033199356d914396a...|033199356d914396a...|                      1|         CoxApp|  R|2024-10-13|
|        No Intent|                 None|036487a2cf5c43208...|036487a2cf5c43208...|                      1|         CoxApp|  R|2024-10-13|
|        No Intent|                 None|044099498a994f75b...|044099498a994f75b...|                      1|         CoxApp|  R|2024-10-13|
|Equipment Support|        

In [14]:
omni_raw_table_df.printSchema()

root
 |-- primary_intent: string (nullable = true)
 |-- primary_intent_detail: string (nullable = true)
 |-- sub_contact_id: string (nullable = true)
 |-- contact_id: string (nullable = true)
 |-- selfservice_containment: integer (nullable = true)
 |-- initial_channel: string (nullable = true)
 |-- lob: string (nullable = true)
 |-- contact_dt: string (nullable = true)


In [15]:
omni_raw_table_df.count()

18696841


In [17]:
from pyspark.sql.functions import lit, current_date

# Add the "create_dt" column with today's date
omni_raw_table_df = omni_raw_table_df.withColumn("create_dt", current_date())

# Add the "is_active" column with True
omni_raw_table_df = omni_raw_table_df.withColumn("is_active", lit(True))




In [18]:
omni_raw_table_df.printSchema()

root
 |-- primary_intent: string (nullable = true)
 |-- primary_intent_detail: string (nullable = true)
 |-- sub_contact_id: string (nullable = true)
 |-- contact_id: string (nullable = true)
 |-- selfservice_containment: integer (nullable = true)
 |-- initial_channel: string (nullable = true)
 |-- lob: string (nullable = true)
 |-- contact_dt: string (nullable = true)
 |-- create_dt: date (nullable = false)
 |-- is_active: boolean (nullable = false)


In [19]:
omni_raw_table_df.show()

+-----------------+---------------------+--------------------+--------------------+-----------------------+---------------+---+----------+----------+---------+
|   primary_intent|primary_intent_detail|      sub_contact_id|          contact_id|selfservice_containment|initial_channel|lob|contact_dt| create_dt|is_active|
+-----------------+---------------------+--------------------+--------------------+-----------------------+---------------+---+----------+----------+---------+
|          Billing|             Pay Bill|02ee7603a58240f19...|02ee7603a58240f19...|                      1|         CoxApp|  R|2024-10-13|2025-02-27|     true|
|        No Intent|                 None|033199356d914396a...|033199356d914396a...|                      1|         CoxApp|  R|2024-10-13|2025-02-27|     true|
|        No Intent|                 None|036487a2cf5c43208...|036487a2cf5c43208...|                      1|         CoxApp|  R|2024-10-13|2025-02-27|     true|
|        No Intent|                 None

In [20]:
omni_raw_table_df.count()

18696841


In [21]:
from pyspark.sql.functions import to_date

# Assuming df is your DataFrame
omni_raw_table_df = omni_raw_table_df.withColumn('contact_dt', to_date(omni_raw_table_df['contact_dt'], 'yyyy-MM-dd'))




In [22]:
omni_raw_table_df.printSchema()

root
 |-- primary_intent: string (nullable = true)
 |-- primary_intent_detail: string (nullable = true)
 |-- sub_contact_id: string (nullable = true)
 |-- contact_id: string (nullable = true)
 |-- selfservice_containment: integer (nullable = true)
 |-- initial_channel: string (nullable = true)
 |-- lob: string (nullable = true)
 |-- contact_dt: date (nullable = true)
 |-- create_dt: date (nullable = false)
 |-- is_active: boolean (nullable = false)


In [23]:
omni_raw_table_df.show()

+-----------------+---------------------+--------------------+--------------------+-----------------------+---------------+---+----------+----------+---------+
|   primary_intent|primary_intent_detail|      sub_contact_id|          contact_id|selfservice_containment|initial_channel|lob|contact_dt| create_dt|is_active|
+-----------------+---------------------+--------------------+--------------------+-----------------------+---------------+---+----------+----------+---------+
|          Billing|             Pay Bill|02ee7603a58240f19...|02ee7603a58240f19...|                      1|         CoxApp|  R|2024-10-13|2025-02-27|     true|
|        No Intent|                 None|033199356d914396a...|033199356d914396a...|                      1|         CoxApp|  R|2024-10-13|2025-02-27|     true|
|        No Intent|                 None|036487a2cf5c43208...|036487a2cf5c43208...|                      1|         CoxApp|  R|2024-10-13|2025-02-27|     true|
|        No Intent|                 None

In [24]:
omni_raw_table_df.count()

18696841


In [25]:
omni_raw_output_path = "s3://cci-dig-aicoe-data-sb/processed/containment_metric/containment_metric_raw_data/"
omnitable_writedf = omni_raw_table_df.coalesce(1).write.format("parquet").mode("append").save(omni_raw_output_path)




In [13]:
from pyspark.sql.functions import lit  # Import the lit function




In [14]:
# Create an empty list to store the result DataFrames
result_dfs = []

# Loop over each unique cont_display_metric_name in the DataFrame
for metric_name in df['cont_display_metric_name'].unique():
    # Filter the DataFrame to get the corresponding primary_intent and primary_intent_detail values
    filtered_df = df[df['cont_display_metric_name'] == metric_name]

    
    # Get unique primary_intent and primary_intent_detail for the current cont_display_metric_name
    primary_intent_values = filtered_df['primary_intent'].unique()
    primary_intent_detail_values = filtered_df['primary_intent_detail'].unique()
    
    # Convert arrays to strings formatted for SQL IN clauses
    primary_intent_str = "', '".join(primary_intent_values)
    primary_intent_detail_str = "', '".join(primary_intent_detail_values)
    
    # Create the query
    second_table_query = f"""
    SELECT 
        initial_channel, lob,  
        CAST(contact_dt AS DATE) AS contact_dt, 
        COUNT(DISTINCT sub_contact_id) AS sub_contact_id, 
        COUNT(DISTINCT CASE WHEN selfservice_containment = 1 THEN sub_contact_id END) AS selfservice_containment,
        CASE 
            WHEN COUNT(DISTINCT sub_contact_id) > 0 THEN
                ROUND(CAST(SUM(CASE WHEN selfservice_containment = 1 THEN 1 ELSE 0 END) AS DOUBLE) 
                / COUNT(DISTINCT sub_contact_id) * 100, 2)
            ELSE
                0
        END AS containment_rate
    FROM 
        ota_data_assets_temp.omni_intent_cntct_fact 
    WHERE 
        CAST(contact_dt AS DATE) BETWEEN 
            date_add((SELECT max(CAST(contact_dt AS DATE)) FROM ota_data_assets_temp.omni_intent_cntct_fact), -60) 
            AND (SELECT max(CAST(contact_dt AS DATE)) FROM ota_data_assets_temp.omni_intent_cntct_fact)
        AND primary_intent IN ('{primary_intent_str}')
        AND initial_channel = 'CoxApp'
        AND lob = 'R'
        AND primary_intent_detail IN ('{primary_intent_detail_str}')
    GROUP BY 
        contact_dt, initial_channel, lob
    ORDER BY 
        contact_dt DESC
    """
    
    # Execute the query (assuming using Spark SQL)
    second_table_df = spark.sql(second_table_query)
    
    # Add the cont_display_metric_name column to the second_table_df using withColumn
    second_table_df = second_table_df.withColumn("cont_display_metric_name", 
                                                 lit(metric_name))
    
    # Append the result DataFrame to the list
    result_dfs.append(second_table_df.toPandas())

# Combine all the DataFrames into a single DataFrame
final_df = pd.concat(result_dfs, ignore_index=True)

# Display the final result
final_df.head()


  initial_channel lob  contact_dt  sub_contact_id  selfservice_containment  \
0          CoxApp   R  2024-11-02            1495                     1386   
1          CoxApp   R  2024-11-01            1546                     1440   
2          CoxApp   R  2024-10-31            1446                     1344   
3          CoxApp   R  2024-10-30            1408                     1323   
4          CoxApp   R  2024-10-29            1452                     1372   

   containment_rate    cont_display_metric_name  
0             92.71  Smarthelp_containment_rate  
1             93.14  Smarthelp_containment_rate  
2             92.95  Smarthelp_containment_rate  
3             93.96  Smarthelp_containment_rate  
4             94.49  Smarthelp_containment_rate


In [15]:
final_df.shape

(790, 7)


In [16]:
first_table_unique_df = df[[ 'hs_feature_name', 'cont_display_metric_name', 'create_dt']].drop_duplicates(subset = 'cont_display_metric_name')




In [17]:
first_table_unique_df.head()

   hs_feature_name    cont_display_metric_name   create_dt
0       SMART HELP  Smarthelp_containment_rate  2025-02-24
1       SMART HELP        PnP_containment_rate  2025-02-24
2         BIL LING    Billing_containment_rate  2025-02-24
13      DATA USAGE  Datausage_containment_rate  2025-02-24
14          LOG IN      Login_containment_rate  2025-02-24


In [18]:
first_table_unique_df.shape

(13, 3)


In [19]:
pd.set_option('display.max_columns', None)  # Show all columns
pd.set_option('display.max_rows', 300)      # Show 100 rows
first_table_unique_df.head(20)

          hs_feature_name       cont_display_metric_name   create_dt
0              SMART HELP     Smarthelp_containment_rate  2025-02-24
1              SMART HELP           PnP_containment_rate  2025-02-24
2                BIL LING       Billing_containment_rate  2025-02-24
13             DATA USAGE     Datausage_containment_rate  2025-02-24
14                 LOG IN         Login_containment_rate  2025-02-24
20             E COMMERCE     Ecommerce_containment_rate  2025-02-24
32               S UPPORT       Support_containment_rate  2025-02-24
80          EASY CON NECT   Easyconnect_containment_rate  2025-02-24
83            RESET MODEM    Resetmodem_containment_rate  2025-02-24
84           RESET TV BOX    Resettvbox_containment_rate  2025-02-24
85          SERVICE ALERT  Servicealert_containment_rate  2025-02-24
86  TWO-STEP VERIFICATION           TSV_containment_rate  2025-02-24
87               OVER ALL       Overall_containment_rate  2025-02-24


In [20]:
# Merge the two dataframes on 'primary_intent_detail' using a left join
combined_df = pd.merge(final_df, first_table_unique_df, on='cont_display_metric_name', how='left')
combined_df.head()

  initial_channel lob  contact_dt  sub_contact_id  selfservice_containment  \
0          CoxApp   R  2024-11-02            1495                     1386   
1          CoxApp   R  2024-11-01            1546                     1440   
2          CoxApp   R  2024-10-31            1446                     1344   
3          CoxApp   R  2024-10-30            1408                     1323   
4          CoxApp   R  2024-10-29            1452                     1372   

   containment_rate    cont_display_metric_name hs_feature_name   create_dt  
0             92.71  Smarthelp_containment_rate      SMART HELP  2025-02-24  
1             93.14  Smarthelp_containment_rate      SMART HELP  2025-02-24  
2             92.95  Smarthelp_containment_rate      SMART HELP  2025-02-24  
3             93.96  Smarthelp_containment_rate      SMART HELP  2025-02-24  
4             94.49  Smarthelp_containment_rate      SMART HELP  2025-02-24


In [21]:
combined_df.shape

(790, 9)


In [22]:
combined_df.columns

Index(['initial_channel', 'lob', 'contact_dt', 'sub_contact_id',
       'selfservice_containment', 'containment_rate',
       'cont_display_metric_name', 'hs_feature_name', 'create_dt'],
      dtype='object')


In [25]:
second_table_columns = ['sub_contact_id', 'selfservice_containment', 'initial_channel', 'lob', 'contact_dt', 'containment_rate', 'hs_feature_name', 'cont_display_metric_name','create_dt']
second_tabledf = combined_df[second_table_columns]
second_tabledf.head()

   sub_contact_id  selfservice_containment initial_channel lob  contact_dt  \
0            1495                     1386          CoxApp   R  2024-11-02   
1            1546                     1440          CoxApp   R  2024-11-01   
2            1446                     1344          CoxApp   R  2024-10-31   
3            1408                     1323          CoxApp   R  2024-10-30   
4            1452                     1372          CoxApp   R  2024-10-29   

   containment_rate hs_feature_name    cont_display_metric_name   create_dt  
0             92.71      SMART HELP  Smarthelp_containment_rate  2025-02-24  
1             93.14      SMART HELP  Smarthelp_containment_rate  2025-02-24  
2             92.95      SMART HELP  Smarthelp_containment_rate  2025-02-24  
3             93.96      SMART HELP  Smarthelp_containment_rate  2025-02-24  
4             94.49      SMART HELP  Smarthelp_containment_rate  2025-02-24


In [26]:
second_table_spark_df = spark.createDataFrame(second_tabledf)
second_table_spark_df.show()

+--------------+-----------------------+---------------+---+----------+----------------+---------------+------------------------+----------+
|sub_contact_id|selfservice_containment|initial_channel|lob|contact_dt|containment_rate|hs_feature_name|cont_display_metric_name| create_dt|
+--------------+-----------------------+---------------+---+----------+----------------+---------------+------------------------+----------+
|          1495|                   1386|         CoxApp|  R|2024-11-02|           92.71|     SMART HELP|    Smarthelp_contain...|2025-02-24|
|          1546|                   1440|         CoxApp|  R|2024-11-01|           93.14|     SMART HELP|    Smarthelp_contain...|2025-02-24|
|          1446|                   1344|         CoxApp|  R|2024-10-31|           92.95|     SMART HELP|    Smarthelp_contain...|2025-02-24|
|          1408|                   1323|         CoxApp|  R|2024-10-30|           93.96|     SMART HELP|    Smarthelp_contain...|2025-02-24|
|          14

In [27]:
second_table_spark_df.printSchema()

root
 |-- sub_contact_id: long (nullable = true)
 |-- selfservice_containment: long (nullable = true)
 |-- initial_channel: string (nullable = true)
 |-- lob: string (nullable = true)
 |-- contact_dt: date (nullable = true)
 |-- containment_rate: double (nullable = true)
 |-- hs_feature_name: string (nullable = true)
 |-- cont_display_metric_name: string (nullable = true)
 |-- create_dt: date (nullable = true)


In [28]:
second_table_spark_df.count()

790


In [29]:
from pyspark.sql import functions as F

# Assuming second_table_spark_df is your DataFrame
second_table_transformed_df = second_table_spark_df \
    .withColumn("sub_contact_id", F.col("sub_contact_id").cast("int")) \
    .withColumn("selfservice_containment", F.col("selfservice_containment").cast("int")) 
    #.withColumn("create_dt", F.to_date(F.col("create_dt"), "yyyy-MM-dd HH:mm:ss"))

# Show the transformed DataFrame
second_table_transformed_df.show()


+--------------+-----------------------+---------------+---+----------+----------------+---------------+------------------------+----------+
|sub_contact_id|selfservice_containment|initial_channel|lob|contact_dt|containment_rate|hs_feature_name|cont_display_metric_name| create_dt|
+--------------+-----------------------+---------------+---+----------+----------------+---------------+------------------------+----------+
|          1495|                   1386|         CoxApp|  R|2024-11-02|           92.71|     SMART HELP|    Smarthelp_contain...|2025-02-24|
|          1546|                   1440|         CoxApp|  R|2024-11-01|           93.14|     SMART HELP|    Smarthelp_contain...|2025-02-24|
|          1446|                   1344|         CoxApp|  R|2024-10-31|           92.95|     SMART HELP|    Smarthelp_contain...|2025-02-24|
|          1408|                   1323|         CoxApp|  R|2024-10-30|           93.96|     SMART HELP|    Smarthelp_contain...|2025-02-24|
|          14

In [30]:
second_table_transformed_df.printSchema()

root
 |-- sub_contact_id: integer (nullable = true)
 |-- selfservice_containment: integer (nullable = true)
 |-- initial_channel: string (nullable = true)
 |-- lob: string (nullable = true)
 |-- contact_dt: date (nullable = true)
 |-- containment_rate: double (nullable = true)
 |-- hs_feature_name: string (nullable = true)
 |-- cont_display_metric_name: string (nullable = true)
 |-- create_dt: date (nullable = true)


In [32]:
second_output_path = "s3://cci-dig-aicoe-data-sb/processed/containment_metric/containment_metric_data_new/"
secondtable_writedf = second_table_transformed_df.coalesce(1).write.format("parquet").mode("append").save(second_output_path)




In [33]:
third_table_columns = ['cont_display_metric_name', 'contact_dt', 'containment_rate']
third_tabledf = second_tabledf[third_table_columns]
third_tabledf.head()

     cont_display_metric_name  contact_dt  containment_rate
0  Smarthelp_containment_rate  2024-11-02             92.71
1  Smarthelp_containment_rate  2024-11-01             93.14
2  Smarthelp_containment_rate  2024-10-31             92.95
3  Smarthelp_containment_rate  2024-10-30             93.96
4  Smarthelp_containment_rate  2024-10-29             94.49


In [34]:
# Pivot the data
df_pivot = third_tabledf.pivot_table(index=['cont_display_metric_name'],
                          columns='contact_dt',
                          values='containment_rate',
                          aggfunc='first').reset_index()

# Display the result
pd.set_option('display.max_columns', None)  # Show all columns
pd.set_option('display.max_rows', 200)      # Show 100 rows
df_pivot.head()

contact_dt      cont_display_metric_name  2024-09-03  2024-09-04  2024-09-05  \
0               Billing_containment_rate       98.48       98.61       98.42   
1             Datausage_containment_rate       98.85       99.04       98.94   
2           Easyconnect_containment_rate       90.05       91.11       86.89   
3             Ecommerce_containment_rate       88.35       88.94       89.39   
4                 Login_containment_rate       97.84       97.77       97.33   

contact_dt  2024-09-06  2024-09-07  2024-09-08  2024-09-09  2024-09-10  \
0                98.72       99.38       99.50       98.43       98.63   
1                99.12       99.65       99.61       98.76       99.04   
2                88.11       89.56       88.41       90.91       86.02   
3                89.62       93.51       95.65       87.95       88.82   
4                98.34       98.81       99.53       97.74       97.71   

contact_dt  2024-09-11  2024-09-12  2024-09-13  2024-09-14  2024-09-15  \


In [40]:
print(len(df_pivot.columns))

62


In [41]:
df_pivot.shape

(13, 62)


In [42]:
df_pivot.dtypes

contact_dt
cont_display_metric_name     object
2024-09-03                  float64
2024-09-04                  float64
2024-09-05                  float64
2024-09-06                  float64
2024-09-07                  float64
2024-09-08                  float64
2024-09-09                  float64
2024-09-10                  float64
2024-09-11                  float64
2024-09-12                  float64
2024-09-13                  float64
2024-09-14                  float64
2024-09-15                  float64
2024-09-16                  float64
2024-09-17                  float64
2024-09-18                  float64
2024-09-19                  float64
2024-09-20                  float64
2024-09-21                  float64
2024-09-22                  float64
2024-09-23                  float64
2024-09-24                  float64
2024-09-25                  float64
2024-09-26                  float64
2024-09-27                  float64
2024-09-28                  float64
2024-09-29       

In [43]:
pd.set_option('display.max_columns', None)  # Show all columns
pd.set_option('display.max_rows', 300)      # Show 100 rows
df_pivot.head(20)

contact_dt       cont_display_metric_name  2024-09-03  2024-09-04  2024-09-05  \
0                Billing_containment_rate       98.48       98.61       98.42   
1              Datausage_containment_rate       98.85       99.04       98.94   
2            Easyconnect_containment_rate       90.05       91.11       86.89   
3              Ecommerce_containment_rate       88.35       88.94       89.39   
4                  Login_containment_rate       97.84       97.77       97.33   
5                Overall_containment_rate       97.50       97.82       97.66   
6                    PnP_containment_rate       89.20       87.94       89.02   
7             Resetmodem_containment_rate       96.88       97.21       97.45   
8             Resettvbox_containment_rate       96.88       97.21       97.45   
9           Servicealert_containment_rate       96.33       97.47       96.77   
10             Smarthelp_containment_rate       93.17       93.33       92.69   
11               Support_con

In [44]:
import numpy as np
import pandas as pd

def calculate_last7_and_30_days(df):
    # Check the actual columns in the dataframe to ensure we are extracting date columns correctly
    print("Columns in df:", df.columns)

    # Try to detect the date columns using the column names
    datecolumn = []
    for col in df.columns:
        try:
            # Try converting the column name to a date if it's a string
            pd.to_datetime(col, format='%Y-%m-%d', errors='raise')
            datecolumn.append(col)
        except (ValueError, TypeError):
            # Ignore columns that cannot be converted to datetime
            continue
    
    print("Date columns detected:", datecolumn)

    if not datecolumn:
        raise ValueError("No valid date columns found in the dataframe.")

    # Exclude the latest date column (the first one)
    last_7_columns = datecolumn[-8:-1]  # Get the last 7 columns excluding the latest date
    last_30_columns = datecolumn[-31:-1]
    print("Last 7 columns:", last_7_columns)
    print("Last 30 columns:", last_30_columns)

    # Convert the date columns to datetime objects
    date_objects = [pd.to_datetime(col, format='%Y-%m-%d') for col in datecolumn]
    print("Date objects:", date_objects)

    # Get the column corresponding to the latest date
    latest_date = max(date_objects)
    latest_date_column = datecolumn[date_objects.index(latest_date)]

    # Create a new column 'Yesterday' with the values from the latest date column
    df.loc[:, 'Yesterday'] = df[latest_date_column]
    
    # Ensure numeric columns before performing mean calculation
    df[last_7_columns] = df[last_7_columns].apply(pd.to_numeric, errors='coerce')
    df[last_30_columns] = df[last_30_columns].apply(pd.to_numeric, errors='coerce')

    # Calculate the mean for each row across the last 7 and 30 date columns
    df.loc[:, 'last_7_days'] = df[last_7_columns].mean(axis=1)
    df.loc[:, 'last_30_days'] = df[last_30_columns].mean(axis=1)

    # Replace 0 and NaN values in 'last_7_days' and 'last_30_days' with NaN to avoid division by zero
    df['last_7_days'] = df['last_7_days'].replace(0, np.nan)
    df['last_30_days'] = df['last_30_days'].replace(0, np.nan)

    # Calculate the percentage change for 'Last 7 Days' with a check for NaN
    df.loc[:, '% Change Last 7 Days'] = np.where(
        df['last_7_days'].isna(), 0, 
        (df['Yesterday'] - df['last_7_days']) / df['last_7_days'] * 100
    )

    # Calculate the percentage change for 'Last 30 Days' with a check for NaN
    df.loc[:, '% Change Last 30 Days'] = np.where(
        df['last_30_days'].isna(), 0, 
        (df['Yesterday'] - df['last_30_days']) / df['last_30_days'] * 100
    )

    # Round the percentage changes to 1 decimal place
    df.loc[:, '% Change Last 7 Days'] = df['% Change Last 7 Days'].round(1)
    df.loc[:, '% Change Last 30 Days'] = df['% Change Last 30 Days'].round(1)
    
    return df




In [45]:
last_7_and_30_days_df = calculate_last7_and_30_days(df_pivot)
print(last_7_and_30_days_df)

Columns in df: Index(['cont_display_metric_name',                 2024-09-03,
                       2024-09-04,                 2024-09-05,
                       2024-09-06,                 2024-09-07,
                       2024-09-08,                 2024-09-09,
                       2024-09-10,                 2024-09-11,
                       2024-09-12,                 2024-09-13,
                       2024-09-14,                 2024-09-15,
                       2024-09-16,                 2024-09-17,
                       2024-09-18,                 2024-09-19,
                       2024-09-20,                 2024-09-21,
                       2024-09-22,                 2024-09-23,
                       2024-09-24,                 2024-09-25,
                       2024-09-26,                 2024-09-27,
                       2024-09-28,                 2024-09-29,
                       2024-09-30,                 2024-10-01,
                       2024-10-02,      

In [46]:
last_7_and_30_days_df.head()

contact_dt      cont_display_metric_name  2024-09-03  2024-09-04  2024-09-05  \
0               Billing_containment_rate       98.48       98.61       98.42   
1             Datausage_containment_rate       98.85       99.04       98.94   
2           Easyconnect_containment_rate       90.05       91.11       86.89   
3             Ecommerce_containment_rate       88.35       88.94       89.39   
4                 Login_containment_rate       97.84       97.77       97.33   

contact_dt  2024-09-06  2024-09-07  2024-09-08  2024-09-09  2024-09-10  \
0                98.72       99.38       99.50       98.43       98.63   
1                99.12       99.65       99.61       98.76       99.04   
2                88.11       89.56       88.41       90.91       86.02   
3                89.62       93.51       95.65       87.95       88.82   
4                98.34       98.81       99.53       97.74       97.71   

contact_dt  2024-09-11  2024-09-12  2024-09-13  2024-09-14  2024-09-15  \


In [47]:
last_7_and_30_days_df = last_7_and_30_days_df[['cont_display_metric_name', 'Yesterday', 'last_7_days', 'last_30_days', '% Change Last 7 Days', '% Change Last 30 Days']]
pd.set_option('display.max_columns', None)  # Show all columns
pd.set_option('display.max_rows', 100)
last_7_and_30_days_df.head()

contact_dt      cont_display_metric_name  Yesterday  last_7_days  \
0               Billing_containment_rate      99.58    98.977143   
1             Datausage_containment_rate      99.62    99.138571   
2           Easyconnect_containment_rate      86.32    88.837143   
3             Ecommerce_containment_rate      92.64    89.585714   
4                 Login_containment_rate      98.99    98.234286   

contact_dt  last_30_days  % Change Last 7 Days  % Change Last 30 Days  
0              98.994667                   0.6                    0.6  
1              99.159000                   0.5                    0.5  
2              88.966000                  -2.8                   -3.0  
3              90.678000                   3.4                    2.2  
4              98.234000                   0.8                    0.8


In [48]:
last_7_and_30_days_df.shape

(13, 6)


In [49]:
first_table_unique_df.shape

(13, 3)


In [50]:
result_df = pd.merge(last_7_and_30_days_df, first_table_unique_df, on="cont_display_metric_name", how="left")
result_df.head()

       cont_display_metric_name  Yesterday  last_7_days  last_30_days  \
0      Billing_containment_rate      99.58    98.977143     98.994667   
1    Datausage_containment_rate      99.62    99.138571     99.159000   
2  Easyconnect_containment_rate      86.32    88.837143     88.966000   
3    Ecommerce_containment_rate      92.64    89.585714     90.678000   
4        Login_containment_rate      98.99    98.234286     98.234000   

   % Change Last 7 Days  % Change Last 30 Days hs_feature_name   create_dt  
0                   0.6                    0.6        BIL LING  2025-02-24  
1                   0.5                    0.5      DATA USAGE  2025-02-24  
2                  -2.8                   -3.0   EASY CON NECT  2025-02-24  
3                   3.4                    2.2      E COMMERCE  2025-02-24  
4                   0.8                    0.8          LOG IN  2025-02-24


In [51]:
result_df.shape

(13, 8)


In [52]:
result_df.head(20)

         cont_display_metric_name  Yesterday  last_7_days  last_30_days  \
0        Billing_containment_rate      99.58    98.977143     98.994667   
1      Datausage_containment_rate      99.62    99.138571     99.159000   
2    Easyconnect_containment_rate      86.32    88.837143     88.966000   
3      Ecommerce_containment_rate      92.64    89.585714     90.678000   
4          Login_containment_rate      98.99    98.234286     98.234000   
5        Overall_containment_rate      98.54    98.121429     98.166333   
6            PnP_containment_rate      85.00    86.857143     87.695667   
7     Resetmodem_containment_rate      97.60    97.542857     97.633000   
8     Resettvbox_containment_rate      97.60    97.542857     97.633000   
9   Servicealert_containment_rate      96.60    96.771429     97.018667   
10     Smarthelp_containment_rate      92.71    94.124286     94.232667   
11       Support_containment_rate      95.44    95.778571     96.009333   
12           TSV_containm

In [53]:
result_df = result_df[['hs_feature_name', 'cont_display_metric_name', 'Yesterday', 'last_7_days', 'last_30_days', '% Change Last 7 Days', '% Change Last 30 Days', 'create_dt']]
result_df.head()

  hs_feature_name      cont_display_metric_name  Yesterday  last_7_days  \
0        BIL LING      Billing_containment_rate      99.58    98.977143   
1      DATA USAGE    Datausage_containment_rate      99.62    99.138571   
2   EASY CON NECT  Easyconnect_containment_rate      86.32    88.837143   
3      E COMMERCE    Ecommerce_containment_rate      92.64    89.585714   
4          LOG IN        Login_containment_rate      98.99    98.234286   

   last_30_days  % Change Last 7 Days  % Change Last 30 Days   create_dt  
0     98.994667                   0.6                    0.6  2025-02-24  
1     99.159000                   0.5                    0.5  2025-02-24  
2     88.966000                  -2.8                   -3.0  2025-02-24  
3     90.678000                   3.4                    2.2  2025-02-24  
4     98.234000                   0.8                    0.8  2025-02-24


In [54]:
result_df[[ 'last_7_days', 'last_30_days']] = result_df[[ 'last_7_days', 'last_30_days']].round(3)




In [55]:
result_df.head()

  hs_feature_name      cont_display_metric_name  Yesterday  last_7_days  \
0        BIL LING      Billing_containment_rate      99.58       98.977   
1      DATA USAGE    Datausage_containment_rate      99.62       99.139   
2   EASY CON NECT  Easyconnect_containment_rate      86.32       88.837   
3      E COMMERCE    Ecommerce_containment_rate      92.64       89.586   
4          LOG IN        Login_containment_rate      98.99       98.234   

   last_30_days  % Change Last 7 Days  % Change Last 30 Days   create_dt  
0        98.995                   0.6                    0.6  2025-02-24  
1        99.159                   0.5                    0.5  2025-02-24  
2        88.966                  -2.8                   -3.0  2025-02-24  
3        90.678                   3.4                    2.2  2025-02-24  
4        98.234                   0.8                    0.8  2025-02-24


In [56]:
third_table_spark_df = spark.createDataFrame(result_df)
third_table_spark_df.show()

+--------------------+------------------------+---------+-----------+------------+--------------------+---------------------+----------+
|     hs_feature_name|cont_display_metric_name|Yesterday|last_7_days|last_30_days|% Change Last 7 Days|% Change Last 30 Days| create_dt|
+--------------------+------------------------+---------+-----------+------------+--------------------+---------------------+----------+
|            BIL LING|    Billing_containme...|    99.58|     98.977|      98.995|                 0.6|                  0.6|2025-02-24|
|          DATA USAGE|    Datausage_contain...|    99.62|     99.139|      99.159|                 0.5|                  0.5|2025-02-24|
|       EASY CON NECT|    Easyconnect_conta...|    86.32|     88.837|      88.966|                -2.8|                 -3.0|2025-02-24|
|          E COMMERCE|    Ecommerce_contain...|    92.64|     89.586|      90.678|                 3.4|                  2.2|2025-02-24|
|              LOG IN|    Login_containme

In [57]:
third_table_spark_df.printSchema()

root
 |-- hs_feature_name: string (nullable = true)
 |-- cont_display_metric_name: string (nullable = true)
 |-- Yesterday: double (nullable = true)
 |-- last_7_days: double (nullable = true)
 |-- last_30_days: double (nullable = true)
 |-- % Change Last 7 Days: double (nullable = true)
 |-- % Change Last 30 Days: double (nullable = true)
 |-- create_dt: date (nullable = true)


In [58]:
third_table_spark_df.count()

13


In [59]:
third_table_spark_df.printSchema()

root
 |-- hs_feature_name: string (nullable = true)
 |-- cont_display_metric_name: string (nullable = true)
 |-- Yesterday: double (nullable = true)
 |-- last_7_days: double (nullable = true)
 |-- last_30_days: double (nullable = true)
 |-- % Change Last 7 Days: double (nullable = true)
 |-- % Change Last 30 Days: double (nullable = true)
 |-- create_dt: date (nullable = true)


In [60]:
third_output_path = "s3://cci-dig-aicoe-data-sb/processed/containment_metric/containment_metric_calculation_new/"
third_df_table_write_df = third_table_spark_df.coalesce(1).write.format("parquet").mode("append").save(third_output_path)


