In [4]:
# pip install pyspark

In [5]:
# Import necessary PySpark libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.types import StringType
from pyspark.sql.types import TimestampType
from pyspark.sql import functions as F
from pyspark.sql.functions import to_timestamp


In [6]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("707 Assessment2 ETL") \
    .getOrCreate()

# Check Spark session
print(spark)

# Print the application ID and name
print(f"Application ID: {spark.sparkContext.applicationId}")
print(f"Application Name: {spark.sparkContext.appName}")

24/08/01 21:25:45 WARN Utils: Your hostname, YiliadeMacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.20.8 instead (on interface en0)
24/08/01 21:25:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/01 21:25:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


<pyspark.sql.session.SparkSession object at 0x1324a3440>
Application ID: local-1722504346676
Application Name: 707 Assessment2 ETL


In [7]:
# load data
customer_df= spark.read.csv('customer_support.csv',header=True,inferSchema=True)
agent_df= spark.read.csv('agent_info.csv',header=True,inferSchema=True)
customer_df.show()
agent_df.show()

+--------------------+------------+-----------------+--------------------+--------------------+--------------------+----------------+-----------------+----------------+--------------------+-------------+----------------+----------+-----------------------+-------------------+
|           Unique id|channel_name|         category|        Sub-category|    Customer Remarks|            Order_id| order_date_time|Issue_reported at| issue_responded|Survey_response_Date|Customer_City|Product_category|Item_price|connected_handling_time|         Agent_name|
+--------------------+------------+-----------------+--------------------+--------------------+--------------------+----------------+-----------------+----------------+--------------------+-------------+----------------+----------+-----------------------+-------------------+
|7e9ae164-6a8b-452...|     Outcall|  Product Queries|      Life Insurance|                NULL|c27c9bb4-fa36-414...|            NULL| 01/08/2023 11:13|01/08/2023 11:47|    

In [8]:
customer_df.printSchema()
agent_df.printSchema()

root
 |-- Unique id: string (nullable = true)
 |-- channel_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- Sub-category: string (nullable = true)
 |-- Customer Remarks: string (nullable = true)
 |-- Order_id: string (nullable = true)
 |-- order_date_time: string (nullable = true)
 |-- Issue_reported at: string (nullable = true)
 |-- issue_responded: string (nullable = true)
 |-- Survey_response_Date: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Product_category: string (nullable = true)
 |-- Item_price: string (nullable = true)
 |-- connected_handling_time: string (nullable = true)
 |-- Agent_name: string (nullable = true)

root
 |-- Agent_name: string (nullable = true)
 |-- Supervisor: string (nullable = true)
 |-- Manager: string (nullable = true)
 |-- Tenure Bucket: string (nullable = true)
 |-- Agent Shift: string (nullable = true)
 |-- CSAT Score: integer (nullable = true)



In [9]:
# data format
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
customer_df = customer_df.withColumn("Issue_reported at", to_timestamp(col("Issue_reported at"), "dd/MM/yyyy HH:mm"))
customer_df = customer_df.withColumn("issue_responded", to_timestamp(col("issue_responded"), "dd/MM/yyyy HH:mm"))

# Display the converted timestamps
customer_df.select("Issue_reported at",'issue_responded').show()

+-------------------+-------------------+
|  Issue_reported at|    issue_responded|
+-------------------+-------------------+
|2023-08-01 11:13:00|2023-08-01 11:47:00|
|2023-08-01 12:52:00|2023-08-01 12:54:00|
|2023-08-01 20:16:00|2023-08-01 20:38:00|
|2023-08-01 20:56:00|2023-08-01 21:16:00|
|2023-08-01 10:30:00|2023-08-01 10:32:00|
|2023-08-01 15:13:00|2023-08-01 18:39:00|
|2023-08-01 15:31:00|2023-08-01 23:52:00|
|2023-08-01 16:17:00|2023-08-01 16:23:00|
|2023-08-01 21:03:00|2023-08-01 21:07:00|
|2023-08-01 23:31:00|2023-08-01 23:36:00|
|2023-08-02 18:14:00|2023-08-02 18:16:00|
|2023-08-02 10:44:00|2023-08-02 11:14:00|
|2023-08-01 10:09:00|2023-08-01 10:12:00|
|2023-08-01 10:15:00|2023-08-01 11:21:00|
|2023-08-02 11:26:00|2023-08-02 11:44:00|
|2023-08-02 19:56:00|2023-08-02 20:06:00|
|2023-08-01 09:01:00|2023-08-01 09:03:00|
|2023-08-01 10:00:00|2023-08-01 10:04:00|
|2023-08-01 21:05:00|2023-08-01 21:07:00|
|2023-08-02 20:03:00|2023-08-02 20:05:00|
+-------------------+-------------

In [10]:
# calculate connected_handling_time
customer_df = customer_df.withColumn(
    "connected_handling_time",
    (col("issue_responded").cast("long") - col("Issue_reported at").cast("long")))
customer_df = customer_df.withColumn(
    "connected_handling_time",
    F.expr("connected_handling_time / 60")) #show in minutes
customer_df.select("connected_handling_time").show()

+-----------------------+
|connected_handling_time|
+-----------------------+
|                   34.0|
|                    2.0|
|                   22.0|
|                   20.0|
|                    2.0|
|                  206.0|
|                  501.0|
|                    6.0|
|                    4.0|
|                    5.0|
|                    2.0|
|                   30.0|
|                    3.0|
|                   66.0|
|                   18.0|
|                   10.0|
|                    2.0|
|                    4.0|
|                    2.0|
|                    2.0|
+-----------------------+
only showing top 20 rows



In [11]:
# data cleaning
null_columns = [column for column in customer_df.columns if customer_df.filter(col(column).isNull()).count() > 0]
print(null_columns)

['Customer Remarks', 'Order_id', 'order_date_time', 'Issue_reported at', 'issue_responded', 'Survey_response_Date', 'Customer_City', 'Product_category', 'Item_price', 'connected_handling_time', 'Agent_name']


In [12]:
null_columns2 = [column for column in agent_df.columns if agent_df.filter(col(column).isNull()).count() > 0]
print(null_columns2)

[]


In [13]:
missing_agent_name = customer_df.filter(col("Agent_name").isNull())
missing_agent_name.show()

+--------------------+------------+-----------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+----------------+-------------------+----------------+-----------------------+----------+
|           Unique id|channel_name|         category|        Sub-category|    Customer Remarks|            Order_id|     order_date_time|  Issue_reported at|    issue_responded|Survey_response_Date|   Customer_City|   Product_category|      Item_price|connected_handling_time|Agent_name|
+--------------------+------------+-----------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+----------------+-------------------+----------------+-----------------------+----------+
|fd7e6d24-aa18-4c0...|     Inbound|          Returns|Reverse Pickup En...|"The shoe is diff...| now it is not ac...|                NULL

In [14]:
customer_df = customer_df.filter(col("Agent_name").isNotNull())

In [15]:
columns_to_drop = ['Order_id', 'order_date_time', 'Survey_response_Date', 'Customer_City', 'Product_category', 'Item_price']
customer_df = customer_df.drop(*columns_to_drop)
customer_df = customer_df.fillna({"Customer Remarks": "No Remarks"})

In [16]:
customer_df.show()

+--------------------+------------+-----------------+--------------------+--------------------+-------------------+-------------------+-----------------------+-------------------+
|           Unique id|channel_name|         category|        Sub-category|    Customer Remarks|  Issue_reported at|    issue_responded|connected_handling_time|         Agent_name|
+--------------------+------------+-----------------+--------------------+--------------------+-------------------+-------------------+-----------------------+-------------------+
|7e9ae164-6a8b-452...|     Outcall|  Product Queries|      Life Insurance|          No Remarks|2023-08-01 11:13:00|2023-08-01 11:47:00|                   34.0|   Richard Buchanan|
|b07ec1b0-f376-43b...|     Outcall|  Product Queries|Product Specific ...|          No Remarks|2023-08-01 12:52:00|2023-08-01 12:54:00|                    2.0|      Vicki Collins|
|200814dd-27c7-414...|     Inbound|    Order Related|   Installation/demo|          No Remarks|2023-

In [17]:
num_rows = customer_df.count()
num_columns = len(customer_df.columns)
shape = (num_rows, num_columns)
print(shape)

(85898, 9)


In [18]:
num_rows2 = agent_df.count()
num_columns2 = len(agent_df.columns)
shape2 = (num_rows2, num_columns2)
print(shape2)

(85907, 6)


In [19]:
# 聚合每个代理的Tenure Bucket和Agent Shift的唯一数量
tenure_shift_counts = agent_df.groupBy("Agent_name").agg(
    F.countDistinct("Tenure Bucket").alias("Unique_Tenure_Buckets"),
    F.countDistinct("Agent Shift").alias("Unique_Agent_Shifts"),
    F.countDistinct("Supervisor").alias("Unique_Supervisors"),
    F.countDistinct("Manager").alias("Unique_Managers"),
    F.countDistinct("CSAT Score").alias("Unique_CSAT_Scores")
)
# 显示结果
tenure_shift_counts.show()

+-----------------+---------------------+-------------------+------------------+---------------+------------------+
|       Agent_name|Unique_Tenure_Buckets|Unique_Agent_Shifts|Unique_Supervisors|Unique_Managers|Unique_CSAT_Scores|
+-----------------+---------------------+-------------------+------------------+---------------+------------------+
|      Jenna Rivas|                    1|                  1|                 1|              1|                 4|
|   Desiree Torres|                    1|                  1|                 1|              1|                 4|
|  Christine Myers|                    1|                  1|                 1|              1|                 5|
| Alexandra Harris|                    1|                  1|                 1|              1|                 5|
|        John Dean|                    1|                  1|                 1|              1|                 4|
|       John Wells|                    1|                  1|           

In [20]:
# data aggregation
aggregated_agents = agent_df.groupBy("Agent_name").agg(
    F.avg("CSAT Score").alias("Average_CSAT"),
    F.first("Supervisor").alias("Supervisor"),
    F.first("Manager").alias("Manager"),
    F.first("Tenure Bucket").alias("Tenure_Bucket"),
    F.first("Agent Shift").alias("Agent_Shift"))
aggregated_agents.show()

+------------------+------------------+----------------+---------------+---------------+-----------+
|        Agent_name|      Average_CSAT|      Supervisor|        Manager|  Tenure_Bucket|Agent_Shift|
+------------------+------------------+----------------+---------------+---------------+-----------+
|     Aaron Edwards| 4.397849462365591|       Mia Patel|     Emily Chen|          61-90|    Evening|
|      Aaron Romero|3.9491525423728815|     Mason Gupta|Jennifer Nguyen|On Job Training|    Morning|
|  Abigail Gonzalez|              4.08|      Jacob Sato|Jennifer Nguyen|On Job Training|    Morning|
|      Adam Barnett| 4.339285714285714|  Abigail Suzuki|Jennifer Nguyen|On Job Training|    Morning|
|      Adam Hammond| 4.733333333333333|   Olivia Suzuki|     John Smith|          31-60|    Morning|
|    Adam Henderson|3.8947368421052633|        Ava Wong|    William Kim|On Job Training|    Evening|
|    Adam Hernandez| 4.722222222222222|    Nathan Patel|     Emily Chen|           0-30|   

In [21]:
merged_df = customer_df.join(aggregated_agents, on="Agent_name", how="left")
merged_df.show()

+-------------------+--------------------+------------+-----------------+--------------------+--------------------+-------------------+-------------------+-----------------------+------------------+--------------+---------------+---------------+-----------+
|         Agent_name|           Unique id|channel_name|         category|        Sub-category|    Customer Remarks|  Issue_reported at|    issue_responded|connected_handling_time|      Average_CSAT|    Supervisor|        Manager|  Tenure_Bucket|Agent_Shift|
+-------------------+--------------------+------------+-----------------+--------------------+--------------------+-------------------+-------------------+-----------------------+------------------+--------------+---------------+---------------+-----------+
|   Richard Buchanan|7e9ae164-6a8b-452...|     Outcall|  Product Queries|      Life Insurance|          No Remarks|2023-08-01 11:13:00|2023-08-01 11:47:00|                   34.0| 4.285714285714286|   Mason Gupta|Jennifer Nguy

In [22]:
rows = merged_df.count()
columns = len(merged_df.columns)
shape_merged = (rows, columns)
print(shape_merged)

(85898, 14)


In [23]:
# Save the combined DataFrame to a new CSV file
merged_df.write.csv("merged_data.csv", header=True, mode="overwrite")
print(f"Combined data saved to merged_data.csv")

Combined data saved to merged_data.csv


In [24]:
# Stop Spark session
spark.stop()