# 0. Setup

In [0]:
%pip install openai

In [0]:
import pandas as pd
from datetime import date, datetime, timedelta
import re
import functools as func
from pyspark.sql.functions import *
import pyspark.sql.functions as f 
from delta.tables import DeltaTable
from delta.tables import *
from pyspark.sql.window import Window
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,DoubleType,TimestampType,ArrayType,DateType,BooleanType,NullType,LongType
import json


# 1. Data cleaning

In [0]:
P_CHATHIST = "abfss://standardized@odpweursensadls004p.dfs.core.windows.net/lhg/customer/cognigy_v4/chatbot_chathistory/"

In [0]:
df_his_all = (spark.read.load(P_CHATHIST)
          .filter((col("create_dt") >= "2024-10-01") & (col("create_dt") <= "2024-10-07"))
          .select("session_id","timestamp","source","input_text","input_data","flow_name")
)

# Remove session_ids with test and mock data **********************************************************
# filter out session_ids with channel = e2eteam4, adminconsole & enablemock (as these are for testing purpose)
df_test_ids = (df_his_all.select(f.col("session_id"))
                  .filter((f.col("channel").isin(["e2eteam4","adminconsole"]))
                  | (f.lower(f.col("input_text")) == "enablemock"))
)
print(df_test_ids.select(f.countDistinct("session_id")).collect()[0][0]," : MOCK SESSIONS")
# REMOVE all the filtered sessions from chatbot inputs
df_his_all = df_his_all.join(df_test_ids,"session_id","leftanti")
# ******************************************************************************************************

df_his_all.cache()
print(df_his_all.select(f.countDistinct("session_id")).collect()[0][0]," : REMAINING DISTINCT SESSIONS")

In [0]:
from pyspark.sql.functions import min, max

# Assuming df_his_all has the create_dt column
min_max_dates = df_his_all.agg(
    min("timestamp").alias("min_timestamp"),
    max("timestamp").alias("max_timestamp")
).collect()[0]

min_date = min_max_dates["min_timestamp"]
max_date = min_max_dates["max_timestamp"]

print(f"Min timestamp: {min_date}")
print(f"Max cretimestampate_dt: {max_date}")

In [0]:
# take only those sessions that contains '{"handoverStarted":true}' in input_data 
df_his_ho = df_his_all.filter(f.col("input_data").contains('{"handoverStarted":true}')) #.filter(col("flow_name").isin(["Handover","HandoverGenesys"])) #31087
df_his_ho = df_his_ho.select("session_id").dropDuplicates(["session_id"])
print("handover sessions            :",df_his_ho.count())
df_his_ho = df_his_ho.join(df_his_all,"session_id","left")

In [0]:
# only pick part of the conversation that is relevant for analysis.
# Start of handover: from "Would you please briefly describe your concern?" line in input_text column
# End of handover: till line "{"handoverStarted":false}" in input_data column

#df_ho_start = df_ho_ci.withColumn("has_handover", when(col("input_data").contains('{"handoverStarted":true}'), 1).otherwise(0))
df_ho_start = df_his_ho.withColumn("has_handover", when(col("input_text") == "Would you please briefly describe your concern?", 1).otherwise(0))

# Define window specification
window_spec = Window.partitionBy("session_id").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Add a cumulative sum column
df_ho_start = df_ho_start.withColumn("handover_num", sum(col("has_handover")).over(window_spec))
df_ho_start = df_ho_start.withColumn("session_handover_id", f.concat_ws('-h',df_ho_start.session_id, df_ho_start.handover_num))
#df_ho_start.orderBy("session_id","timestamp").display()

In [0]:
SAMPLE_SESSIONS = [
    "session-100187137356.33586", #has 3 handovers {"handoverProvider":"cla"}
    "session-1005182475952.2295", # 1 ho {"handoverProvider":"genesys"}
    "session-1005632292306.0394", # 1 ho {"handoverProvider":"genesys"} 
]

In [0]:
#df_ho_start1 = df_ho_start.filter(col("session_id").isin(SAMPLE_SESSIONS))
df_ho_start1 = df_ho_start

In [0]:
# Add a column to identify end of Live Chat
df_ho_start1 = df_ho_start1.withColumn("handover_end", when(col("input_data").contains('{"handoverStarted":false}'), 1).otherwise(0))
#df_ho_start1.display()

In [0]:
# Define window specification
window_spec1 = Window.partitionBy("session_handover_id").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Add a cumulative sum column
df_ho_start1 = df_ho_start1.withColumn("handover_ended", sum(col("handover_end")).over(window_spec1))
#df_ho_start1.display()

In [0]:
# drop all lines before "Would you please briefly describe your concern?" and all lines AFTER {"handoverStarted":false}
df_ho_start1 = (df_ho_start1
                .filter(col("handover_num") > 0) # drop all lines before
                .filter(col("handover_ended") == 0) # drop all lines AFTER {"handoverStarted":false}
                .drop("has_handover","handover_num","handover_end","handover_ended")
)
#df_ho_start1.display()

In [0]:
#filter unwanted rows
df_ho_start2 = (df_ho_start1
          .filter(col("input_text").isNotNull())
          .filter(~col("input_text").contains("cIntent:"))
          .filter(~col("input_text").startswith("START_"))
          .filter(~(col("input_text") == "GET_STARTED"))
)

#cleanup
df_ho_start2 = (df_ho_start2
      .withColumn("input_text",regexp_replace("input_text", "[\s\t\r\n]+", " "))
      .withColumn("input_text",regexp_replace("input_text", '\[TRANSL\]', ''))
      .withColumn("input_text", trim(col("input_text")))
      .withColumn("input_text",regexp_replace("input_text", '"', "'"))
      .withColumn("input_text",regexp_replace("input_text", ';', ",")) # as we will combine full conversation using ;
      .withColumn("input_text",regexp_replace("input_text", ':', ",")) # as we will each conversation line (input_text) with source (bot, user, agent) using :
      .filter(col("input_text") != "")
      .orderBy("session_handover_id","timestamp")
)

In [0]:
df_ho_start2 = df_ho_start2.withColumn("conversation_line", f.concat_ws(': ',df_ho_start2.source, df_ho_start2.input_text))
#df_ho_start2.display()


In [0]:
# complete user conversation
df_conv = df_ho_start2.groupBy("session_handover_id").agg(collect_list("conversation_line").alias("full_conversation_list"), count("session_handover_id").alias("agent_interaction_count"))

df_conv = (df_conv
           #.filter(col("interaction_count") > 1)
           .withColumn("full_conversation", concat_ws(". ", col("full_conversation_list")))
           .withColumn("full_conversation",regexp_replace("full_conversation", r'\.{2,}', '.')) #remove consecutive periods
           .withColumn("full_conversation",regexp_replace("full_conversation", r'([!?,;:])\.', '$1')) #if a punctuation mark followed by a period, remove the period
)

print(df_conv.count())
df_conv.orderBy(col("agent_interaction_count").desc()).display()

In [0]:
# Select only the first 5000 rows
df_conv1 = df_conv.select("session_handover_id","full_conversation").filter(col("agent_interaction_count").between(0,5000)).limit(5000)
print(df_conv1.count())
df_conv1.display()

In [0]:
### SAVE FULL DATA OR SAMPLE HERE