In [21]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('weserve').getOrCreate()

In [22]:
df_call_logs = spark.read.csv('call log.csv', header = True)

df_call_logs.show(5)

+---+--------+-------+--------------------+----------+--------+-------------------------+
| id|callerID|agentID|      complaintTopic|assignedTo|  status|resolutionDurationInHours|
+---+--------+-------+--------------------+----------+--------+-------------------------+
|  1|CALLER_1|    163| Billing discrepancy|       122|  CLOSED|                       33|
|  2|CALLER_2|    133|Difficulty reachi...|      null|     new|                     null|
|  3|CALLER_3|    153| Wrong item received|       122|resolved|                     null|
|  4|CALLER_4|    161|Unauthorized charges|       151| pEnding|                     null|
|  5|CALLER_5|    148|Unprofessional be...|      null|     new|                     null|
+---+--------+-------+--------------------+----------+--------+-------------------------+
only showing top 5 rows



In [23]:
df_call_logs = df_call_logs.dropDuplicates()

## Data Exploration 

In [24]:
# columns and data types
df_call_logs.printSchema()  

root
 |-- id: string (nullable = true)
 |-- callerID: string (nullable = true)
 |-- agentID: string (nullable = true)
 |-- complaintTopic: string (nullable = true)
 |-- assignedTo: string (nullable = true)
 |-- status: string (nullable = true)
 |-- resolutionDurationInHours: string (nullable = true)



In [25]:
# Statistical information
df_call_logs.describe().show()

+-------+-----------------+---------+------------------+-------------------+------------------+--------+-------------------------+
|summary|               id| callerID|           agentID|     complaintTopic|        assignedTo|  status|resolutionDurationInHours|
+-------+-----------------+---------+------------------+-------------------+------------------+--------+-------------------------+
|  count|              600|      600|               600|                600|               372|     600|                      187|
|   mean|            300.5|     null|151.85166666666666|               null| 150.7983870967742|    null|       26.780748663101605|
| stddev|173.3493582335971|     null|17.966384783604724|               null|17.255014430535844|    null|       13.058596372559972|
|    min|                1| CALLER_1|               121|Billing discrepancy|               121|  CLOSED|                       10|
|    max|               99|CALLER_99|               181|Wrong item received|       

# Tasks to perform
1. Rename the columns for easy readability
2. Fill all null records with 'N/A'
3. Transform all values in 'status' column to sentence-case format
4. Change data type of 'resolutionDurationInHours' to integer type

In [26]:
# 1. Rename columns 
renamed_columns = {'callerID':'caller_id'
                  , 'agentID':'agent_id' 
                  ,'complaintTopic': 'complaint_topic'
                  ,'assignedTo':'assigned_to'
                  ,'resolutionDurationInHours': 'resolution_duration_in_hours'}

df_call_logs = df_call_logs.withColumnsRenamed(renamed_columns)

df_call_logs.show(3)

+---+----------+--------+--------------------+-----------+-------+----------------------------+
| id| caller_id|agent_id|     complaint_topic|assigned_to| status|resolution_duration_in_hours|
+---+----------+--------+--------------------+-----------+-------+----------------------------+
|181|CALLER_181|     126|   Incorrect charges|       null|pEnding|                        null|
|338|CALLER_338|     164| Wrong item received|       null|pEnding|                        null|
|462|CALLER_462|     166|Difficulty gettin...|       null| closed|                           9|
+---+----------+--------+--------------------+-----------+-------+----------------------------+
only showing top 3 rows



In [27]:
# 2. Filling all null records with 'N/A'

df_call_logs = df_call_logs.fillna({'assigned_to': 'N/A'})

In [28]:
df_call_logs.describe().show()

+-------+-----------------+---------+------------------+-------------------+------------------+--------+----------------------------+
|summary|               id|caller_id|          agent_id|    complaint_topic|       assigned_to|  status|resolution_duration_in_hours|
+-------+-----------------+---------+------------------+-------------------+------------------+--------+----------------------------+
|  count|              600|      600|               600|                600|               600|     600|                         187|
|   mean|            300.5|     null|151.85166666666666|               null| 150.7983870967742|    null|          26.780748663101605|
| stddev|173.3493582335971|     null|17.966384783604724|               null|17.255014430535844|    null|          13.058596372559972|
|    min|                1| CALLER_1|               121|Billing discrepancy|               121|  CLOSED|                          10|
|    max|               99|CALLER_99|               181|Wrong 

In [29]:

from pyspark.sql.types import IntegerType, StringType

from pyspark.sql.functions import col, when, lower

In [30]:
# 3. Changing data type of 'resolution_duration_in_hours' to int
df_call_logs = df_call_logs.withColumn('resolution_duration_in_hours', col('resolution_duration_in_hours').cast(IntegerType()))

In [31]:
## 4. Transforming all values in 'status' column to sentence case format

df_call_logs =df_call_logs.withColumn('status', when(lower('status')== 'closed', 'Closed').otherwise(df_call_logs.status))

df_call_logs =df_call_logs.withColumn('status', when(lower('status')== 'new', 'New').otherwise(df_call_logs.status))

df_call_logs =df_call_logs.withColumn('status', when(lower('status')== 'resolved', 'Resolved').otherwise(df_call_logs.status))

df_call_logs =df_call_logs.withColumn('status', when(lower('status')== 'pending', 'Pending').otherwise(df_call_logs.status))


In [32]:
df_call_logs.show(5)

+---+----------+--------+--------------------+-----------+-------+----------------------------+
| id| caller_id|agent_id|     complaint_topic|assigned_to| status|resolution_duration_in_hours|
+---+----------+--------+--------------------+-----------+-------+----------------------------+
|181|CALLER_181|     126|   Incorrect charges|        N/A|Pending|                        null|
|338|CALLER_338|     164| Wrong item received|        N/A|Pending|                        null|
|462|CALLER_462|     166|Difficulty gettin...|        N/A| Closed|                           9|
|593|CALLER_593|     171|Inadequate produc...|        144|    New|                        null|
| 32| CALLER_32|     157|   Lack of follow-up|        N/A|    New|                        null|
+---+----------+--------+--------------------+-----------+-------+----------------------------+
only showing top 5 rows



In [33]:
# write to csv
df_call_logs.write.csv('transformed_call_logs', header= True)

## Second dataset: Call details

In [34]:
df_call_details =  spark.read.csv("call details.csv", header = True)

df_call_details.show(5)

+-----------------+---------------------+----------------+--------+----------------+
|           callID|callDurationInSeconds|agentsGradeLevel|callType|callEndedByAgent|
+-----------------+---------------------+----------------+--------+----------------+
|ageentsGradeLevel|                   97|               A| Inbound|           FALSE|
|                2|                   64|               E|in-bound|            TRUE|
|                3|                  100|               C|in-bound|           FALSE|
|                4|                   69|               D|Outbound|            TRUE|
|                5|                   96|               D| Inbound|            TRUE|
+-----------------+---------------------+----------------+--------+----------------+
only showing top 5 rows



# Data exploration

In [35]:
df_call_details.printSchema()

root
 |-- callID: string (nullable = true)
 |-- callDurationInSeconds: string (nullable = true)
 |-- agentsGradeLevel: string (nullable = true)
 |-- callType: string (nullable = true)
 |-- callEndedByAgent: string (nullable = true)



In [36]:
df_call_details.describe().show()

+-------+------------------+---------------------+----------------+--------+----------------+
|summary|            callID|callDurationInSeconds|agentsGradeLevel|callType|callEndedByAgent|
+-------+------------------+---------------------+----------------+--------+----------------+
|  count|               600|                  600|             600|     600|             600|
|   mean|             301.0|              104.165|            null|    null|            null|
| stddev|173.06068299876782|   25.746700681705267|            null|    null|            null|
|    min|                10|                  100|               A| Inbound|           FALSE|
|    max| ageentsGradeLevel|                   99|               F|in-bound|            TRUE|
+-------+------------------+---------------------+----------------+--------+----------------+



In [37]:
df_call_details = df_call_details.dropDuplicates()

## 

## Tasks to perform
1. Rename columns for easy readability
2. Convert 'callDurationInSeconds' column to IntegerType
3. Transform values in 'callType' and 'callEndedByAgent' columns to sentence case format
4. Clean wrong input in callID column 

In [38]:
# 1. Rename columns

renamed_columns2 = {'callID':'call_id', 
                    'callDurationInSeconds' : 'call_duration_in_seconds'
                    , 'agentsGradeLevel': 'agents_grade_level'
                    , 'callType':'call_type'
                    , 'callEndedByAgent': 'call_ended_by_agent'}

df_call_details = df_call_details.withColumnsRenamed(renamed_columns2)

In [39]:
# 2. Convert 'callDurationInSeconds' column to IntegerType

df_call_details = df_call_details.withColumn('call_duration_in_seconds', col('call_duration_in_seconds').cast(IntegerType()))

In [40]:
# 3a. Transform values in 'call_type' column to sentence case format
df_call_details = df_call_details.withColumn('call_type', when(lower('call_type')== 'inbound', 'Inbound').otherwise(df_call_details.call_type))

df_call_details = df_call_details.withColumn('call_type', when(lower('call_type')== 'in-bound', 'Inbound').otherwise(df_call_details.call_type))



In [41]:
# 3b. Transform values in 'call_ended_by_agent' column to sentence case format

df_call_details = df_call_details.withColumn('call_ended_by_agent', when(lower('call_ended_by_agent')== 'true', 'True').otherwise(df_call_details.call_ended_by_agent))

df_call_details = df_call_details.withColumn('call_ended_by_agent', when(lower('call_ended_by_agent')== 'false', 'False').otherwise(df_call_details.call_ended_by_agent))


In [42]:
# 4. Clean wrong input in callID column 

df_call_details= df_call_details.na.replace('ageentsGradeLevel', '1')

In [43]:
df_call_details.show(5)

+-------+------------------------+------------------+---------+-------------------+
|call_id|call_duration_in_seconds|agents_grade_level|call_type|call_ended_by_agent|
+-------+------------------------+------------------+---------+-------------------+
|    242|                      77|                 C| Outbound|              False|
|    382|                     139|                 C|  Inbound|               True|
|    520|                     130|                 A|  Inbound|              False|
|    337|                     120|                 C| Outbound|              False|
|    522|                     111|                 F| Outbound|               True|
+-------+------------------------+------------------+---------+-------------------+
only showing top 5 rows



In [44]:
# write to csv
df_call_details.write.csv('transformed_call_details', header= True)