In [1]:
from pyspark.sql import SparkSession
from pandas import set_option

set_option('display.max.columns', None)

spark = SparkSession.builder.getOrCreate()

In [3]:
from pyspark.sql import Column
from pyspark.sql.functions import col, udf, when
from typing import Optional

import phonenumbers
from phonenumbers import PhoneNumberFormat


# returns phone number in format (987) 654-3210
@udf(returnType='string')
def parsePhoneNumber(number: str, region: str='US') -> Optional[str]:
    try:
        phoneNumber = phonenumbers.parse(number, region)
        return phonenumbers.format_number(phoneNumber, PhoneNumberFormat.NATIONAL)
    except:
        return None


# retains value when value matches pattern
def matchPattern(colName: str, pattern: str) -> Column:
    column = col(colName) 
    return when(column.rlike(pattern), column)

In [4]:
import os

SILVER = '/tmp/phone-calls/silver'

### Employees

In [38]:
%%time

path = os.path.join(SILVER, 'employees')

(
    spark
    .read
    .parquet('/tmp/phone-calls/Employees')
    .selectExpr(
        'ID as EMPLOYEE_ID',
        'FullName as EMPLOYEE_NAME',
        'PrimaryRoleName as ROLE',
        'lower(Email) as EMAIL',
        'ManagerName as MANAGER_NAME',
        'PodName as TEAM'
    )
    .write
    .mode('overwrite')
    .parquet(path)
)

employees = spark.read.parquet(path)
employees.createOrReplaceTempView('employees')
employees.limit(5).toPandas()

CPU times: user 14.5 ms, sys: 2.99 ms, total: 17.5 ms
Wall time: 619 ms


Unnamed: 0,EMPLOYEE_ID,EMPLOYEE_NAME,ROLE,EMAIL,MANAGER_NAME,TEAM
0,194728,Andi Prins,Audit Supervisor,aprins@spendmend.com,Travis Wheeler,Gold
1,194729,Bob VanGoor,Audit Supervisor,bvangoor@spendmend.com,Dan Hutchins,Red
2,194730,Colleen Kretowicz,Audit Supervisor,ckretowicz@spendmend.com,Travis Wheeler,Gold
3,194731,Cindy Allen,WNC Auditor,callen@spendmend.com,,
4,194732,Dan Hutchins,Audit Manager,dhutchins@spendmend.com,Dan Hutchins,Red


### Contacts

In [39]:
%%time
from pyspark.sql.functions import col, lower, regexp_replace, when

pattern = r'\(\d+\) \d+-\d+' # (987) 654-3210

column = when(lower('Email').contains('@'), lower('Email')) # email must have @

path = os.path.join(SILVER, 'contacts')

(
    spark
    .read
    .parquet('/tmp/phone-calls/VendorContacts')
    .withColumn('Email', column)
    .withColumn('Phone', regexp_replace('Phone', '^([^0-9]+)', ''))
    .withColumn('Phone', regexp_replace('Phone', '\s+(?=[^0-9])(.*)$', ''))
    .withColumn('Phone', parsePhoneNumber('Phone'))
    .withColumn('Phone', matchPattern('Phone', pattern))
    .selectExpr(
        'ObjectID as CONTACT_ID',
        # 'CustVendorNo as VENDOR_NUMBER',
        'CustomerName as CUSTOMER_NAME',
        'FullName as CONTACT_NAME',
        'Phone as PHONE',
        'Email as EMAIL'
    )
    .write
    .mode('overwrite')
    .parquet(path)
)

contacts = spark.read.parquet(path)
contacts.createOrReplaceTempView('contacts')
contacts.limit(5).toPandas()

CPU times: user 33.7 ms, sys: 7.8 ms, total: 41.5 ms
Wall time: 1min 6s


Unnamed: 0,CONTACT_ID,CUSTOMER_NAME,CONTACT_NAME,PHONE,EMAIL
0,1016896,Stanly Regional,,,invoiceinquiries@premierinc.com
1,1016899,Stanly Regional,,,tabbie.alvarado@henryschein.com
2,1016843,Stanly Regional,,(888) 882-9942,theracomar@icsconnect.com
3,1017021,Sentara Healthcare,Lynne Hanrahan,(757) 217-1381,lynne.hanrahan@esi.net
4,1017024,Sentara Healthcare,,(804) 347-8839,scoleman@ajccpas.com


### Activities

In [118]:
%%time
from pyspark.sql.functions import col, create_map, date_trunc, lit, when

path = os.path.join(SILVER, 'activities')

column = when(col('ContactType') != 'N/A', col('ContactType'))

mapping = create_map(
    lit('Called Vendor'), lit(True),
    lit('Received Call / Email'), lit(False)
)

(
    spark
    .read
    .parquet('/tmp/phone-calls/StatementRequestActivityRecords')
    .where(col('ActivityType').isin('Called Vendor', 'Received Call / Email'))
    .withColumn('ActivityType', mapping[col('ActivityType')])
    .withColumn('ContactType', column)
    .withColumn('CreatedDate', date_trunc('second', 'CreatedDate'))
    .withColumnRenamed('ActivityUser', 'EMPLOYEE_NAME')
    .join(
        employees
            .select('EMPLOYEE_ID', 'EMPLOYEE_NAME'),
        on='EMPLOYEE_NAME',
        how='inner'
    )
    .selectExpr(
        'ObjectID as ACTIVITY_ID',
        'ReferenceNumber as REFERENCE_ID',
        'VendorContactObjectID as CONTACT_ID',
        'StatementRequestObjectID as REQUEST_ID',
        'EMPLOYEE_ID',
        'CreatedDate as ACTIVITY_DATE',
        'ActivityType as IS_OUTGOING',
        'JobNumber as JOB_NUMBER',
        'JobName as JOB_NAME',
        'CustomerVendorName as VENDOR_NAME',
        'cast(ReferenceNumber as string) as REFERENCE_NUMBER',
        'Outcome as OUTCOME'
    )
    .write
    .mode('overwrite')
    .parquet(path)
)


activities = spark.read.parquet(path)
activities.createOrReplaceTempView('activities')
activities.limit(5).toPandas()

CPU times: user 38.6 ms, sys: 6.17 ms, total: 44.8 ms
Wall time: 2.58 s


Unnamed: 0,ACTIVITY_ID,REFERENCE_ID,CONTACT_ID,REQUEST_ID,EMPLOYEE_ID,ACTIVITY_DATE,IS_OUTGOING,JOB_NUMBER,JOB_NAME,VENDOR_NAME,REFERENCE_NUMBER,OUTCOME
0,36228229,1306574,33991530,34427094,33107601,2021-08-20 07:52:35,False,3766,Cooper University Health Care - 3766,STERLING INFOSYSTEMS INC,1306574,Sent Authorization Letter
1,36229304,1305099,33740838,34061321,33107601,2021-08-20 08:11:01,True,3815,SwedishAmerican Hospital - 3815,MARINA MEDICAL INSTRUMENTS,1305099,Sent Authorization Letter
2,36229442,1281479,29551031,30714692,35385742,2021-08-20 08:13:25,True,3319,Catholic Health Initiatives - 3319,TIERPOINT LLC,1281479,Sent Authorization Letter
3,36229553,1242989,31401519,30686048,35385742,2021-08-20 08:15:04,True,3319,Catholic Health Initiatives - 3319,WCP SOLUTIONS,1242989,Left Voicemail
4,36229838,1243349,29553618,29126627,35385742,2021-08-20 08:19:41,True,3319,Catholic Health Initiatives - 3319,BIOPTICS INC,1243349,Left Voicemail


### Calls

In [123]:
from pyspark.sql.functions import col

column = when(col('duration') > 0, col('duration'))

(
    spark
    .read
    .parquet('/tmp/phone-calls/AccountCallLogs')
    .withColumn('duration', column)
    .createOrReplaceTempView('calls')
)

In [124]:
query = """
SELECT
    abs(xxhash64(id)) as CALL_ID,
    EMPLOYEE_ID,
    date_time as CALL_DATE,
    true as IS_OUTGOING,
    call_type as CALL_TYPE,
    result as CALL_RESULT,
    duration as CALL_DURATION,
    callee_number as PHONE
FROM
    calls
        INNER JOIN
            employees ON
                calls.caller_name = employees.EMPLOYEE_NAME
                AND direction = 'outbound'
                AND callee_number IS NOT NULL
"""

outbound = spark.sql(query)

In [125]:
query = """
SELECT
    abs(xxhash64(id)) as CALL_ID,
    EMPLOYEE_ID,
    date_time as CALL_DATE,
    false as IS_OUTGOING,
    call_type as CALL_TYPE,
    result as CALL_RESULT,
    duration as CALL_DURATION,
    caller_number as PHONE
FROM
    calls
        INNER JOIN
            employees ON
                calls.callee_name = employees.EMPLOYEE_NAME
                AND direction = 'inbound'
                AND caller_number IS NOT NULL
"""

inbound = spark.sql(query)

In [126]:
%%time

path = os.path.join(SILVER, 'calls')

(
    outbound
    .union(inbound)
    .write
    .mode('overwrite')
    .parquet(path)
)

calls = spark.read.parquet(path)
calls.createOrReplaceTempView('calls')
calls.limit(5).toPandas()

CPU times: user 21.1 ms, sys: 3.89 ms, total: 25 ms
Wall time: 646 ms


Unnamed: 0,CALL_ID,EMPLOYEE_ID,CALL_DATE,IS_OUTGOING,CALL_TYPE,CALL_RESULT,CALL_DURATION,PHONE
0,7362212912651137723,23561006,2022-05-11 19:36:35,True,pstn,Call Cancel,,(828) 737-7552
1,591728025720393858,8406307,2022-05-11 18:09:01,True,pstn,Call connected,12.0,(800) 417-3747
2,1445124709734510472,8406307,2022-05-11 18:05:58,True,pstn,Call connected,102.0,(602) 288-0031
3,701486195797491244,8406307,2022-05-11 18:05:20,True,pstn,Call connected,8.0,(602) 288-0060
4,8326658045653164631,34745413,2022-05-11 18:04:48,True,pstn,Call connected,8.0,(800) 431-1055


In [127]:
query = """
SELECT
    activities.ACTIVITY_ID,
    calls.CALL_ID,
    abs(
        cast(ACTIVITY_DATE as long)
        - cast(CALL_DATE as long)
    ) as _seconds
FROM
    activities
        INNER JOIN
            contacts ON
                activities.CONTACT_ID = contacts.CONTACT_ID
        INNER JOIN
            calls ON
                activities.EMPLOYEE_ID = calls.EMPLOYEE_ID
                AND cast(ACTIVITY_DATE as date) = cast(CALL_DATE as date)
                AND split(contacts.PHONE, ' ')[0] = split(calls.PHONE, ' ')[0]
                AND split(contacts.PHONE, ' ')[1] = split(calls.PHONE, ' ')[1]
                AND activities.IS_OUTGOING
                AND calls.IS_OUTGOING
"""

outbound = spark.sql(query)

In [128]:
query = """
SELECT
    activities.ACTIVITY_ID,
    calls.CALL_ID,
    abs(
        cast(ACTIVITY_DATE as long)
        - cast(CALL_DATE as long)
    ) as _seconds
FROM
    activities
        INNER JOIN
            contacts ON
                activities.CONTACT_ID = contacts.CONTACT_ID
        INNER JOIN
            calls ON
                activities.EMPLOYEE_ID = calls.EMPLOYEE_ID
                AND cast(ACTIVITY_DATE as date) = cast(CALL_DATE as date)
                AND split(contacts.PHONE, ' ')[0] = split(calls.PHONE, ' ')[0]
                AND split(contacts.PHONE, ' ')[1] = split(calls.PHONE, ' ')[1]
                AND NOT activities.IS_OUTGOING
                AND NOT calls.IS_OUTGOING
"""

inbound = spark.sql(query)

In [129]:
%%time
from pyspark.sql import Window
from pyspark.sql.functions import row_number

window = Window.partitionBy('CALL_ID').orderBy('_seconds')

path = os.path.join(SILVER, 'bridge')

(
    outbound
    .union(inbound)
    .withColumn('_row_number', row_number().over(window))
    .where('_row_number = 1')
    .where(col('_seconds') < 600)
    .drop('_seconds', '_row_number')
    .write
    .mode('overwrite')
    .parquet(path)
)


bridge = spark.read.parquet(path)
bridge.createOrReplaceTempView('bridge')
bridge.limit(5).toPandas()

CPU times: user 14.6 ms, sys: 3.84 ms, total: 18.5 ms
Wall time: 5.23 s


Unnamed: 0,ACTIVITY_ID,CALL_ID
0,47838048,1051802932520424510
1,47857699,1743345290262729629
2,47860359,2602406100287577751
3,47858148,3583402434295487432
4,47835280,4267368663090694063


### Statements

In [137]:
%%time
# from pyspark.sql import Window
# from pyspark.sql.functions import coalesce, date_trunc, lag, lit

# window = (
#     Window
#     .partitionBy('ReferenceNumber', 'StatementDate', coalesce('EmailMessageID', lit('0')))
#     .orderBy('CreatedDate')
# )

path = os.path.join(SILVER, 'statements')


( 
    spark
    .read
    .parquet('/tmp/phone-calls/Statements')
    .withColumn('CreatedDate', date_trunc('second', 'CreatedDate'))
    # .withColumn('StatementDate', col('CreatedDate').cast('date'))
    # .withColumn('_timestamp', col('CreatedDate').cast('long'))
    # .withColumn('seconds', col('_timestamp') - lag('_timestamp', 1).over(window))
    # .where('seconds is null or seconds > 600')
    .selectExpr(
        'ReferenceNumber as REFERENCE_ID',
        'abs(xxhash64(EmailMessageID)) as EMAIL_ID',
        'CreatedDate as STATEMENT_DATE',
    )
    .join(
        activities,
        on='REFERENCE_ID',
        how='left_semi'
    )
    .write
    .mode('overwrite')
    .parquet(path)
)


statements = spark.read.parquet(path)
statements.createOrReplaceTempView('statements')
statements.limit(5).toPandas()

CPU times: user 22.1 ms, sys: 2.18 ms, total: 24.2 ms
Wall time: 1.36 s


Unnamed: 0,REFERENCE_ID,EMAIL_ID,STATEMENT_DATE
0,1224005,8479961916226615153,2019-08-14 11:18:01
1,1247505,4632731259035230330,2019-08-14 11:19:08
2,1248087,5187333399925037547,2019-08-14 11:19:43
3,1224005,8479961916226615153,2019-08-14 11:17:48
4,1244129,1320498900618370180,2019-08-14 09:40:30


### Requests

In [132]:
%%time
from pyspark.sql.functions import col, create_map, lit, lower

mapping = create_map(
    lit(1), lit(True),
    lit(0), lit(False)
)

column = when(lower('Contact').contains('@'), lower('Contact'))

path = os.path.join(SILVER, 'requests')

(
    spark
    .read
    .parquet('/tmp/phone-calls/StatementRequests')
    .withColumn('WNC', mapping[col('WNC')])
    .withColumn('Contact', column)
    .withColumnRenamed('RequesterFullName', 'EMPLOYEE_NAME')
    .join(
        employees
            .select('EMPLOYEE_ID', 'EMPLOYEE_NAME'),
        on='EMPLOYEE_NAME',
        how='inner'
    )
    .selectExpr(
        'ObjectID as REQUEST_ID',
        'EMPLOYEE_ID',
        'cast(RequestDate as date) as REQUEST_DATE',
        'Status as REQUEST_STATUS',
        'RequestMethod as REQUEST_METHOD',
        'RequestType as REQUEST_TYPE',
        'cast(LastActivityDate as date) as LAST_ACTIVITY_DATE',
        'cast(LastStatementReceivedDate as date) as LAST_RECEIVED_DATE',
        'WNC as WILL_NOT_COMPLY'
    )
    .write
    .mode('overwrite')
    .parquet(path)
)


requests = spark.read.parquet(path)
requests.createOrReplaceTempView('requests')
requests.limit(5).toPandas()

CPU times: user 27.4 ms, sys: 5.96 ms, total: 33.3 ms
Wall time: 2.32 s


Unnamed: 0,REQUEST_ID,EMPLOYEE_ID,REQUEST_DATE,REQUEST_STATUS,REQUEST_METHOD,REQUEST_TYPE,LAST_ACTIVITY_DATE,LAST_RECEIVED_DATE,WILL_NOT_COMPLY
0,26681846,8398642,2020-07-24,Superceded,MassEmail,,2020-07-24,,
1,26819776,8398642,2020-07-31,Superceded,MassEmail,Mass 1,2020-07-31,,
2,30690711,8398642,2021-01-19,Superceded,MassEmail,Caller,2020-07-31,,
3,36365526,194731,2021-08-19,Superceded,MassEmail,Mass 1,2021-08-23,,
4,36846273,194731,2021-09-03,No Receipt,MassEmail,Caller,2021-08-23,,


### Emails

In [155]:
%%time
from pyspark.sql import Window
from pyspark.sql.functions import regexp_replace, row_number

# window = Window.partitionBy('ReferenceNumber', 'MAILMessageID').orderBy('DateCreated')

path = os.path.join(SILVER, 'emails')

(
    spark
    .read
    .parquet('/tmp/phone-calls/StatementEmailDocs')
    .withColumnRenamed('S-Ref#', 'ReferenceNumber')
    .where('MAILMessageID is not null')
    .withColumn('MAILDateTime', date_trunc('second', 'MAILDateTime'))
    # .withColumn('_row_number', row_number().over(window))
    # .where('_row_number = 1')
    .selectExpr(
        'ReferenceNumber as REFERENCE_ID',
        'abs(xxhash64(MAILMessageID)) as EMAIL_ID',
        'MAILDateTime as EMAIL_DATE'
    )
    .join(
        statements,
        on=['REFERENCE_ID', 'EMAIL_ID'],
        how='left_semi'
    )
    .write
    .mode('overwrite')
    .parquet(path)
)


emails = spark.read.parquet(path)
emails.createOrReplaceTempView('emails')
emails.limit(5).toPandas()

CPU times: user 18.1 ms, sys: 4.29 ms, total: 22.4 ms
Wall time: 1.13 s


Unnamed: 0,REFERENCE_ID,EMAIL_ID,EMAIL_DATE
0,1244129,1320498900618370180,2019-08-13 16:06:15
1,1224005,8479961916226615153,2019-08-13 16:33:10
2,1248087,5187333399925037547,2019-08-13 16:45:53
3,1247505,4632731259035230330,2019-08-13 16:45:50
4,1247949,8348275066776007683,2019-08-13 17:05:51


### Analysis

In [170]:
query = """
SELECT
    employees.EMPLOYEE_NAME,
    cast(CALL_DATE as date) as CALL_DATE,
    employees.TEAM,
    cast(floor(sum(CALL_DURATION)) as int) as SECONDS,
    date_format(
        to_utc_timestamp(
            from_unixtime(
                cast(floor(sum(CALL_DURATION)) as int),
                'yyyy-MM-dd HH:mm:ss'
            ),
            'America/New_York'
        ),
        'HH:mm:ss'
    ) as CALL_TIME,
    count(*) as CALLS
FROM
    calls
        INNER JOIN
            employees ON
                calls.EMPLOYEE_ID = employees.EMPLOYEE_ID
                AND CALL_RESULT = 'Call connected'
                AND IS_OUTGOING
GROUP BY
    employees.EMPLOYEE_NAME,
    cast(CALL_DATE as date),
    employees.TEAM
ORDER BY
    TEAM ASC,
    CALL_DATE ASC,
    CALL_TIME DESC,
    EMPLOYEE_NAME ASC
"""

spark.sql(query).limit(5).toPandas()

Unnamed: 0,EMPLOYEE_NAME,CALL_DATE,TEAM,SECONDS,CALL_TIME,CALLS
0,Michelle Regnier,2022-05-09,,5159,01:25:59,61
1,Zachary Markham,2022-05-09,,2697,00:44:57,4
2,Beth Rinehart,2022-05-09,,2037,00:33:57,1
3,Becky Williams,2022-05-09,,255,00:04:15,4
4,Nicole Thompson,2022-05-09,,103,00:01:43,3


In [147]:
(
    spark
    .sql(query)
    .toPandas()
    .to_csv('/tmp/outbound.csv', index=False)
)

In [173]:
query = """
SELECT
    employees.EMPLOYEE_NAME,
    cast(CALL_DATE as date) as CALL_DATE,
    employees.TEAM,
    cast(floor(mean(CALL_DURATION)) as int) as SECONDS,
    from_unixtime(
        cast(floor(mean(CALL_DURATION)) as int),
        'mm:ss'
    ) as CALL_TIME
FROM
    calls
        INNER JOIN
            employees ON
                calls.EMPLOYEE_ID = employees.EMPLOYEE_ID
                AND CALL_RESULT = 'Call connected'
                AND NOT IS_OUTGOING
GROUP BY
    employees.EMPLOYEE_NAME,
    cast(CALL_DATE as date),
    employees.TEAM
ORDER BY
    TEAM ASC,
    CALL_DATE ASC,
    CALL_TIME DESC,
    EMPLOYEE_NAME ASC
"""

spark.sql(query).limit(5).toPandas()

Unnamed: 0,EMPLOYEE_NAME,CALL_DATE,TEAM,SECONDS,CALL_TIME
0,Beth Rinehart,2022-05-09,,264,04:24
1,Rebekah Dykema,2022-05-09,,259,04:19
2,Zachary Markham,2022-05-09,,94,01:34
3,LeAnne Hoekstra,2022-05-09,,67,01:07
4,Cindy Allen,2022-05-09,,42,00:42


In [190]:
query = """
SELECT
    employees.EMPLOYEE_NAME,
    cast(activities.ACTIVITY_DATE as date) as ACTIVITY_DATE,
    count(statements.STATEMENT_DATE) as STATEMENTS,
    count(emails.EMAIL_DATE) as EMAILS
FROM
    activities
        INNER JOIN
            employees ON
                activities.EMPLOYEE_ID = employees.EMPLOYEE_ID
                -- AND IS_OUTGOING
                AND activities.ACTIVITY_DATE BETWEEN
                    '2022-05-09' AND '2022-05-11'
        LEFT JOIN
            statements ON
                activities.REFERENCE_ID = statements.REFERENCE_ID
                AND statements.STATEMENT_DATE BETWEEN
                    '2022-05-09' AND '2022-05-11'
        LEFT JOIN
            emails ON
                statements.REFERENCE_ID = emails.REFERENCE_ID
                AND statements.EMAIL_ID = emails.EMAIL_ID
                AND emails.EMAIL_DATE BETWEEN
                    '2022-05-09' AND '2022-05-11'
GROUP BY
    employees.EMPLOYEE_NAME,
    cast(activities.ACTIVITY_DATE as date)
"""

(
    spark
    .sql(query)
    .orderBy('ACTIVITY_DATE', desc('STATEMENTS'), 'EMPLOYEE_NAME')

    .toPandas()
)

Unnamed: 0,EMPLOYEE_NAME,ACTIVITY_DATE,STATEMENTS,EMAILS
0,Ashton Sower,2022-05-09,18,18
1,Carla Roark,2022-05-09,17,7
2,Cierra Kilpatrick,2022-05-09,11,11
3,Josephine Dagher,2022-05-09,10,6
4,Toni Engle,2022-05-09,7,7
5,Jamie Spurlock,2022-05-09,3,3
6,Briana Louck,2022-05-09,2,2
7,Chenoa Marklevitz,2022-05-09,2,0
8,Xavier Baron,2022-05-09,1,1
9,Becky Williams,2022-05-09,0,0


In [191]:
query = """
SELECT
    employees.EMPLOYEE_NAME,
    cast(activities.ACTIVITY_DATE as date) as ACTIVITY_DATE,
    coalesce(sum(statement.STATEMENTS), 0) as STATEMENTS,
    coalesce(sum(email.EMAILS), 0) as EMAILS
FROM
    activities
        INNER JOIN
            employees ON
                activities.EMPLOYEE_ID = employees.EMPLOYEE_ID
                -- AND IS_OUTGOING
                AND activities.ACTIVITY_DATE BETWEEN
                    '2022-05-09' AND '2022-05-11'
        LEFT JOIN
            (
                SELECT
                    REFERENCE_ID,
                    EMAIL_ID,
                    count(STATEMENT_DATE) as STATEMENTS
                FROM
                    statements
                WHERE
                    STATEMENT_DATE BETWEEN
                        '2022-05-09' AND '2022-05-11'
                GROUP BY
                    REFERENCE_ID,
                    EMAIL_ID
            ) as statement ON
                activities.REFERENCE_ID = statement.REFERENCE_ID
        LEFT JOIN
            (
                SELECT
                    REFERENCE_ID,
                    EMAIL_ID,
                    count(EMAIL_DATE) as EMAILS
                FROM
                    emails
                WHERE
                    EMAIL_DATE BETWEEN
                        '2022-05-09' AND '2022-05-11'
                GROUP BY
                    REFERENCE_ID,
                    EMAIL_ID
            ) as email ON
                statement.REFERENCE_ID = email.REFERENCE_ID
                AND statement.EMAIL_ID = email.EMAIL_ID
GROUP BY
    employees.EMPLOYEE_NAME,
    cast(activities.ACTIVITY_DATE as date)
"""

(
    spark
    .sql(query)
    .orderBy('ACTIVITY_DATE', desc('STATEMENTS'), 'EMPLOYEE_NAME')
    .toPandas()
)

Unnamed: 0,EMPLOYEE_NAME,ACTIVITY_DATE,STATEMENTS,EMAILS
0,Ashton Sower,2022-05-09,18,11
1,Carla Roark,2022-05-09,17,5
2,Cierra Kilpatrick,2022-05-09,11,2
3,Josephine Dagher,2022-05-09,10,5
4,Toni Engle,2022-05-09,7,2
5,Jamie Spurlock,2022-05-09,3,2
6,Briana Louck,2022-05-09,2,2
7,Chenoa Marklevitz,2022-05-09,2,0
8,Xavier Baron,2022-05-09,1,1
9,Becky Williams,2022-05-09,0,0


In [None]:
spark.stop()