In [1]:
# import Libraries
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import DateType
import findspark
findspark.init()

In [2]:
# Create New Spark Session With app Name call center
spark = (SparkSession
         .builder
         .appName("call-center")
         .config("spark.jars", "mysql-connector-java-5.1.47.jar")
         .master("local").getOrCreate())

In [29]:
# Read Call Center Data From Path
call_center_df=spark.read.csv("data/CallCenter.csv",header=True)

In [30]:
#show First Two Rows From Data
call_center_df.limit(2).toPandas()

Unnamed: 0,id,customer_name,sentiment,csat_score,call_timestamp,reason,city,state,channel,response_time,call duration in minutes,call_center
0,DKK-57076809-w-055481-fU,Analise Gairdner,Neutral,7.0,10/29/2020,Billing Question,Detroit,Michigan,Call-Center,Within SLA,17,Los Angeles/CA
1,QGK-72219678-w-102139-KY,Crichton Kidsley,Very Positive,,10/05/2020,Service Outage,Spartanburg,South Carolina,Chatbot,Within SLA,23,Baltimore/MD


In [31]:
# Show Data Schema
call_center_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- csat_score: string (nullable = true)
 |-- call_timestamp: string (nullable = true)
 |-- reason: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- response_time: string (nullable = true)
 |-- call duration in minutes: string (nullable = true)
 |-- call_center: string (nullable = true)



### Extract Customer Data 
* customer_id
* customer_name
* city
* state
* sentiment


In [32]:
# Select Customer Data We Need
customer_data=(call_center_df.select('customer_name','city','state','sentiment')
               .withColumn('customer_id',monotonically_increasing_id()))
# Show First Two Rows From Customer Data 
customer_data.limit(2).toPandas()

Unnamed: 0,customer_name,city,state,sentiment,customer_id
0,Analise Gairdner,Detroit,Michigan,Neutral,0
1,Crichton Kidsley,Spartanburg,South Carolina,Very Positive,1


In [33]:
# Count Data
customer_data.count()

32941

In [34]:
# Show If We Have Duplicate Data
customer_data.select('customer_name').distinct().count()

32941

In [35]:
# Our Customer Schema Must Be 
# * customer_id   Int
# * customer_name String
# * city          String
# * state         String
# * sentiment     String
#reorganization of the structure
customer_data = customer_data.select('customer_id','customer_name','city','state','sentiment')
#check Our Schema
customer_data.printSchema()

root
 |-- customer_id: long (nullable = false)
 |-- customer_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- sentiment: string (nullable = true)



### Extract Time Data 
* call_date
* day
* week
* weekday
* month 
* year

In [36]:
# Check call_timestamp Distinct Values
call_center_df.select("call_timestamp").distinct().count()

31

In [37]:
# Select call_timestamp And Convert it To TimeStamp Type
time_data = (call_center_df.select('call_timestamp',
                F.from_unixtime(F.unix_timestamp('call_timestamp', 'MM/dd/yyy'))
                 .alias('date'))
                 .dropDuplicates())
# Select First Two rows 
time_data.limit(2).toPandas()

Unnamed: 0,call_timestamp,date
0,10/10/2020,2020-10-10 00:00:00
1,10/01/2020,2020-10-01 00:00:00


In [38]:
# Select Data We Need
time_data = (time_data.withColumn("day",F.dayofmonth("date"))
                      .withColumn('week',F.weekofyear('date'))
                      .withColumn('weekday',F.date_format("date", "E"))
                      .withColumn('month',F.month('date'))
                      .withColumn('year',F.year("date"))
                      .withColumn('call_date',F.to_date("date")))
time_data.limit(2).toPandas()

Unnamed: 0,call_timestamp,date,day,week,weekday,month,year,call_date
0,10/10/2020,2020-10-10 00:00:00,10,41,Sat,10,2020,2020-10-10
1,10/01/2020,2020-10-01 00:00:00,1,40,Thu,10,2020,2020-10-01


In [39]:
# Our Customer Schema Must Be 
# * call_date date
# * day       integer
# * week      integer
# * weekday   String
# * month     integer
# * year      integer
#reorganization of the structure
time_data = time_data.select('call_date','day','week','weekday','month','year')
#check Our Schema
time_data.printSchema()

root
 |-- call_date: date (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)



### Extract Call Information Data
* call_id
* reason
* channel
* call_center
* response_time

In [40]:
# Select Data We Need 
call_info = call_center_df.selectExpr('id as call_id','reason','channel','call_center','response_time')
# Select First Two rows 
call_info.limit(2).toPandas()

Unnamed: 0,call_id,reason,channel,call_center,response_time
0,DKK-57076809-w-055481-fU,Billing Question,Call-Center,Los Angeles/CA,Within SLA
1,QGK-72219678-w-102139-KY,Service Outage,Chatbot,Baltimore/MD,Within SLA


In [41]:
# Our Call Information Must Be 
# * call_id       string 
# * reason        string 
# * channel       string 
# * call_center   string 
# * response_time string 
# check Our Schema
call_info.printSchema()

root
 |-- call_id: string (nullable = true)
 |-- reason: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- call_center: string (nullable = true)
 |-- response_time: string (nullable = true)



### Extract Call Data
* call_id
* customer_id 
* call_date
* cast_score
* call_in_minutes


In [43]:
call_center_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- csat_score: string (nullable = true)
 |-- call_timestamp: string (nullable = true)
 |-- reason: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- response_time: string (nullable = true)
 |-- call duration in minutes: string (nullable = true)
 |-- call_center: string (nullable = true)



In [44]:
# Select Data We Need With Specific Data Type
call_data=(call_center_df
             .select(F.col('id').alias('call_id')
           ,F.to_date( F.from_unixtime(F.unix_timestamp('call_timestamp', 'MM/dd/yyy')))
             .alias('call_date')
           ,F.col('csat_score').cast('int')
           ,F.col('call duration in minutes').cast('int').alias("call_in_minutes"))
           .withColumn('customer_id',monotonically_increasing_id()))
# Select First Two rows 
call_data.limit(2).toPandas()

Unnamed: 0,call_id,call_date,csat_score,call_in_minutes,customer_id
0,DKK-57076809-w-055481-fU,2020-10-29,7.0,17,0
1,QGK-72219678-w-102139-KY,2020-10-05,,23,1


In [45]:
# Show csat_score Null Value Count
call_data.select('csat_score').where(F.col('csat_score').isNull()).count()

20670

In [46]:
# Replaice Null Valu With Zero
call_data=call_data.fillna({'csat_score':'0'})

In [47]:
# Check Null Values
call_data.select('csat_score').where(F.col('csat_score').isNull()).count()

0

In [48]:
# Our Call Information Must Be 
# * call_id       string 
# * Customer_id        string 
# * call_date       string 
# * csat_score   string 
# * call_in_minutes string 

# check Our Schema
#reorganization of the structure
call_data = call_data.select('call_id','Customer_id','call_date','csat_score','call_in_minutes')
call_data.printSchema()

root
 |-- call_id: string (nullable = true)
 |-- Customer_id: long (nullable = false)
 |-- call_date: date (nullable = true)
 |-- csat_score: integer (nullable = true)
 |-- call_in_minutes: integer (nullable = true)



# Load Data To DataBase

### 1 - Load Customer Data

In [22]:
# Load Customers Data To customers Table
(customer_data.write.format("jdbc")
    .option("url", "jdbc:mysql://127.0.0.1:3306/call_center")
    .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "customers") 
    .option("user", "root").option("password", "abdyassein").mode("append").save())

In [25]:
# Load call time Data To call_time Table
(time_data.write.format("jdbc")
    .option("url", "jdbc:mysql://127.0.0.1:3306/call_center")
    .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "call_time") 
    .option("user", "root").option("password", "abdyassein").mode("append").save())

In [49]:
# Load call fact Data To call_fact Table
(call_data.write.format("jdbc")
    .option("url", "jdbc:mysql://127.0.0.1:3306/call_center")
    .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "call_fact") 
    .option("user", "root").option("password", "abdyassein").mode("append").save())

In [50]:
# Load call info Data To call_info Table
(call_info.write.format("jdbc")
    .option("url", "jdbc:mysql://127.0.0.1:3306/call_center")
    .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "call_info") 
    .option("user", "root").option("password", "abdyassein").mode("append").save())

# Data Quality Checks

In [56]:
# Connect To call_info Table To Run Sql Query        
call_info_df = (spark.read.format("jdbc")
                  .option("url", "jdbc:mysql://localhost:3306/call_center") 
                  .option("driver", "com.mysql.jdbc.Driver")
                  .option("dbtable", "call_info") 
                  .option("user", "root").option("password", "abdyassein").load())

In [59]:
# Count call_info Rows
call_info_df.count()

32941

In [60]:
# show call_info Simple Data
call_info_df.limit(5).toPandas()

Unnamed: 0,call_id,reason,channel,call_center,response_time
0,AAA-03321706-1-866834-I1,Payments,Call-Center,Baltimore/MD,Within SLA
1,AAB-04923282-m-405308-yW,Service Outage,Web,Baltimore/MD,Below SLA
2,AAB-23102945-b-065985-xp,Billing Question,Call-Center,Chicago/IL,Above SLA
3,AAB-64454903-y-396859-bx,Service Outage,Web,Los Angeles/CA,Within SLA
4,AAB-68191584-X-296651-JM,Billing Question,Web,Chicago/IL,Within SLA


In [61]:
# Connect To call_time Table To Run Sql Query 
call_time_df = (spark.read.format("jdbc")
                  .option("url", "jdbc:mysql://localhost:3306/call_center") 
                  .option("driver", "com.mysql.jdbc.Driver")
                  .option("dbtable", "call_time") 
                  .option("user", "root").option("password", "abdyassein").load())

In [62]:
# Count call_time Rows
call_time_df.count()

31

In [63]:
# show call_time Simple Data
call_time_df.limit(5).toPandas()

Unnamed: 0,call_date,day,week,weekday,month,year
0,2020-10-01,1,40,Thu,10,2020
1,2020-10-02,2,40,Fri,10,2020
2,2020-10-03,3,40,Sat,10,2020
3,2020-10-04,4,40,Sun,10,2020
4,2020-10-05,5,41,Mon,10,2020


In [64]:
# Connect To customers Table To Run Sql Query 
customers_df = (spark.read.format("jdbc")
                  .option("url", "jdbc:mysql://localhost:3306/call_center") 
                  .option("driver", "com.mysql.jdbc.Driver")
                  .option("dbtable", "customers") 
                  .option("user", "root").option("password", "abdyassein").load())

In [65]:
# Count customers Rows
customers_df.count()

32941

In [66]:
# show customers Simple Data
customers_df.limit(5).toPandas()

Unnamed: 0,customer_id,customer_name,city,state,sentiment
0,0,Analise Gairdner,Detroit,Michigan,Neutral
1,1,Crichton Kidsley,Spartanburg,South Carolina,Very Positive
2,2,Averill Brundrett,Gainesville,Florida,Negative
3,3,Noreen Lafflina,Portland,Oregon,Very Negative
4,4,Toma Van der Beken,Fort Wayne,Indiana,Very Positive


In [67]:
# Connect To call_fact Table To Run Sql Query 
call_fact_df = (spark.read.format("jdbc")
                  .option("url", "jdbc:mysql://localhost:3306/call_center") 
                  .option("driver", "com.mysql.jdbc.Driver")
                  .option("dbtable", "call_fact") 
                  .option("user", "root").option("password", "abdyassein").load())

In [68]:
# Count call_fact Rows
call_fact_df.count()

32941

In [69]:
# show call_fact Simple Data
call_fact_df.limit(5).toPandas()

Unnamed: 0,call_id,customer_id,call_date,csat_score,call_in_minutes
0,AAA-03321706-1-866834-I1,23666,2020-10-15,3,12
1,AAB-04923282-m-405308-yW,12662,2020-10-22,0,5
2,AAB-23102945-b-065985-xp,19408,2020-10-06,0,9
3,AAB-64454903-y-396859-bx,15934,2020-10-15,0,20
4,AAB-68191584-X-296651-JM,14913,2020-10-02,0,43


In [70]:
spark.stop()