In [33]:
import json
import os
from datetime import datetime

import cml.data_v1 as cmldata
from pyspark import SparkContext
from openai import OpenAI


SPARK_DATA_LAKE_CONNECTION = os.getenv("SPARK_DATA_LAKE_CONNECTION")
DEMO_DATABASE_NAME = os.getenv("DEMO_DATABASE_NAME")
DEMO_TABLE_NAME = os.getenv("DEMO_TABLE_NAME")

data_lake_connection = cmldata.get_connection(SPARK_DATA_LAKE_CONNECTION)

SparkContext.setSystemProperty("spark.master", "local")
spark = data_lake_connection.get_spark_session()

# openai client
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

def retrieveCustomerInfo(name, dob, address):

    # name = 'Alex'
    # dob = '1990-03-13'
    # address = 'Sample Street 1'

    sql_query = f"""
    SELECT
        name,
        customer_id as Customer_ID,
        current_product as Current_Product,
        churn_risk as Churn_Risk,
        customer_since as Customer_Since,
        date_of_birth as Date_of_birth,
        address as Address,
        preapproved_for_discount as Preappoved_for_discount
    FROM {DEMO_DATABASE_NAME}.{DEMO_TABLE_NAME}
    WHERE LOWER(name) = LOWER('{name}')
    AND date_of_birth = '{dob}'
    AND LOWER(address) = LOWER('{address}')
    """

    results_dataframe = spark.sql(sql_query)
    results_values = results_dataframe.collect()
    if results_values:
        results_context = dict(zip(results_dataframe.columns, results_values[0]))
        return results_context
    else:
        return False


def is_valid_date(date_str):
    try:
        # Attempt to parse the string into a datetime object
        datetime.strptime(date_str, '%Y-%m-%d')
        return True
    except ValueError:
        # If parsing fails, it's not a valid date
        return False


def predict(data: dict[str, str]) -> dict:
    if not isinstance(data, dict):
        raise TypeError("data must be a dictionary")
    if "text" not in data:
        raise TypeError("data must contain a key of 'text'")

    if "task" not in data:
        raise TypeError("data must contain a key of 'task'")

    task = data['task']
    if not isinstance(task, str):
        raise TypeError("text must be a string")

    text = data["text"]
    if not isinstance(text, str):
        raise TypeError("text must be a string")

    if task == 'ai_help':
        completion = client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[
                {"role": "system", "content": "You are helping a call center worker for a telco company called airwave. You will receive the user text content, which will include the call center worker and the customers spoken words converted to text. You are called upon when teh call center agent needs some sort of help, like details about the available products or suggestions for troubleshooting etc. It is your job to provide helpful suggestions. Make sure they are short so the call center worker can easily look at them and read them out to the customer. Do not include multiple suggestions, just one with enough information that the call center agent can use. If the call center agend mentions they are looking for some information, or the customer sais they need additional information, provide that information (if available) in your response. You may receive additional information about the customer such as name and currently used products. The company has currently has 3 products: " + """
                AirSpeed Advanced
                High Speed Wireless Broadband

                Special Offer € 45 per month
                Cost is € 45 p/m for the first 3 months and € 60 thereafter
                12 Month contract
                Up to 70 Mbps / 7 Mbps

                FREE Fritzbox Router
                Installation fee of € 150
                Optional Home Phone Service

                and

                AirSpeed Plus
                High Speed Wireless Broadband

                Special Offer € 35 per month
                Cost is € 35 p/m for the first 3 months and € 50 thereafter
                12 Month contract
                Up to 50 Mbps / 5 Mbps

                FREE Fritzbox Router
                Installation fee of € 150
                Optional Home Phone Service

                and

                AirSpeed Home
                High Speed Wireless Broadband

                Special Offer € 25 per month
                Cost is € 25 p/m for the first 3 months and € 40 thereafter
                12 Month contract
                Up to 30 Mbps / 3 Mbps

                FREE Fritzbox Router
                Installation fee of €150
                Optional Home Phone Service  

                There are currently no active promotions other than the one's mentioned above. There is an option to give customers a discount of 5 percent but only if they sign up for a 2 year contract. This should only be offered to customers that are at risk of churning (churn_risk 1 or 2) or have very negative conversations.         
                """},
                {"role": "user", "content": f"{text}"}
            ]
        )

        output = {"recommendationText": completion.choices[0].message.content}

    if task == 'summarize':
        completion = client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[
                {"role": "system", "content": "You are helping a call center worker for a telco company called airwave. You will receive the user text content, which will include the call center worker and the customers spoken words converted to text. You are called at the end of a conversation to summarize the call. Make sure the summary is as short as possible but always includes all relevant parts of the conversation. You may receive information about the customer such as name and currently used products. The company has currently has 3 products: " + """
                AirSpeed Advanced
                High Speed Wireless Broadband

                Special Offer € 45 per month
                Cost is € 45 p/m for the first 3 months and € 60 thereafter
                12 Month contract
                Up to 70 Mbps / 7 Mbps

                FREE Fritzbox Router
                Installation fee of € 150
                Optional Home Phone Service

                and

                AirSpeed Plus
                High Speed Wireless Broadband

                Special Offer € 35 per month
                Cost is € 35 p/m for the first 3 months and € 50 thereafter
                12 Month contract
                Up to 50 Mbps / 5 Mbps

                FREE Fritzbox Router
                Installation fee of € 150
                Optional Home Phone Service

                and

                AirSpeed Home
                High Speed Wireless Broadband

                Special Offer € 25 per month
                Cost is € 25 p/m for the first 3 months and € 40 thereafter
                12 Month contract
                Up to 30 Mbps / 3 Mbps

                FREE Fritzbox Router
                Installation fee of €150
                Optional Home Phone Service

                Summarize the converastion short but include all important information.             
                """},
                {"role": "user", "content": f"{text}"}
            ]
        )

        output = {"recommendationText": completion.choices[0].message.content}

    if task == 'getCustomerInfo':
        completion = client.chat.completions.create(
            model="gpt-3.5-turbo-1106",
            response_format={"type": "json_object"},
            messages=[
                {"role": "system", "content": 'You are a helpful assistant for call center agents designed to analyze text from a conversation and figure out information about the customer who is calling outputing a JSON. Provide your answer in JSON structure like this {"name": "<The name of the customer>", "address": "<The street of the customer> <The house number of the customer as number, not string>", "dob": "<The date of birth of the customer as YYYY-MM-DD>". You will be given the entire conversation so far, do not worry if the information is not complete yet, just fill out whatever you can identify.'},
                {"role": "user", "content": f"{text}"}
            ]
        )

        info = json.loads(completion.choices[0].message.content)
        info_complete = False
        needed_informattion = ['name', 'address', 'dob']

        for info_elem in needed_informattion:
            if info_elem in info and info[info_elem] and info[info_elem] != "":
                info_complete = True
                if info_elem == 'dob' and not is_valid_date(info['dob']):
                    info_complete = False
            else:
                info_complete = False

        if info_complete:
            customer_info_from_db = retrieveCustomerInfo(info['name'], info['dob'], info['address'])
            if customer_info_from_db == False:
                output = {"recommendationText": completion.choices[0].message.content,
                          "foundCustomer": 0}
            else:
                output = {"recommendationText": completion.choices[0].message.content,
                          "foundCustomer": 1,
                          "customerInfo": customer_info_from_db}

        else:
            output = {
                "recommendationText": completion.choices[0].message.content}

    return output

Spark Application Id:spark-application-1730977332207


In [34]:
# good case
predict({
    "text":"Here is the entire conversation: name Alex address Sample Street 1 date of birth 13 March 1990 .",
    "task":"getCustomerInfo"
})

{'recommendationText': '{\n  "name": "Alex",\n  "address": "Sample Street 1",\n  "dob": "1990-03-13"\n}',
 'foundCustomer': 1,
 'customerInfo': {'name': 'Alex',
  'Customer_ID': 13,
  'Current_Product': 'AirSpeed Home',
  'Churn_Risk': 1,
  'Customer_Since': '2022-01-01',
  'Date_of_birth': '1990-03-13',
  'Address': 'Sample Street 1',
  'Preappoved_for_discount': 'No'}}

In [35]:
# good case
text = "Here is the entire conversation: name Alex address Sample Street 1 date of birth 13 March 1990 ."

completion = client.chat.completions.create(
    model="gpt-3.5-turbo-1106",
    response_format={"type": "json_object"},
    messages=[
        {"role": "system", "content": 'You are a helpful assistant for call center agents designed to analyze text from a conversation and figure out information about the customer who is calling outputing a JSON. Provide your answer in JSON structure like this {"name": "<The name of the customer>", "address": "<The street of the customer> <The house number of the customer as number, not string>", "dob": "<The date of birth of the customer as YYYY-MM-DD>". You will be given the entire conversation so far, do not worry if the information is not complete yet, just fill out whatever you can identify.'},
        {"role": "user", "content": f"{text}"}
    ]
)

info = json.loads(completion.choices[0].message.content)
info_complete = False
needed_informattion = ['name', 'address', 'dob']

for info_elem in needed_informattion:
    if info_elem in info and info[info_elem] and info[info_elem] != "":
        info_complete = True
        if info_elem == 'dob' and not is_valid_date(info['dob']):
            info_complete = False
    else:
        info_complete = False

info

{'name': 'Alex', 'address': 'Sample Street 1', 'dob': '1990-03-13'}

In [36]:
if info_complete:
    customer_info_from_db = retrieveCustomerInfo(info['name'], info['dob'], info['address'])
    if customer_info_from_db == False:
        output = {"recommendationText": completion.choices[0].message.content,
                  "foundCustomer": 0}
    else:
        output = {"recommendationText": completion.choices[0].message.content,
                  "foundCustomer": 1,
                  "customerInfo": customer_info_from_db}

else:
    output = {
        "recommendationText": completion.choices[0].message.content}

In [37]:
customer_info_from_db

{'name': 'Alex',
 'Customer_ID': 13,
 'Current_Product': 'AirSpeed Home',
 'Churn_Risk': 1,
 'Customer_Since': '2022-01-01',
 'Date_of_birth': '1990-03-13',
 'Address': 'Sample Street 1',
 'Preappoved_for_discount': 'No'}

In [42]:
# good case
retrieveCustomerInfo("Alex", "1990-03-13", "Sample Street 1")

{'name': 'Alex',
 'Customer_ID': 13,
 'Current_Product': 'AirSpeed Home',
 'Churn_Risk': 1,
 'Customer_Since': '2022-01-01',
 'Date_of_birth': '1990-03-13',
 'Address': 'Sample Street 1',
 'Preappoved_for_discount': 'No'}

In [40]:
# bad case
retrieveCustomerInfo("Alex", "foo", "Sample Street 1")

False