## Targeted Marketing Agent

This notebook guides you through the steps of creating an AI Agent that will create targeted social media campaigns for hotels.

### Step 1: Create tools for AI Agent

Follow the instructions in each cell below to generate the tools - user-defined functions in this case - to be used by the agent to help it generate the targeted social media posts.

Each cell will contain instructions and sql or python code to execute. Ensure to execute the cells in order sequentially.


In [0]:
# Create Databricks widgets to store names and values to be used throughout this Notebook

# These are placeholder, please use the UI form fields at the top of this Notebook to enter in their values
dbutils.widgets.text("warehouse_id", "")
dbutils.widgets.text("catalog", "")
dbutils.widgets.text("database", "")
dbutils.widgets.text("hotel_to_promote", "")

dbutils.widgets.dropdown("hotel_class",
    "Resort", # Default value
    [
    "Resort",
    "Extended Stay",
    "Luxury",
    "Economy",
    "Airport"
])


In [0]:
catalog = dbutils.widgets.get("catalog")
database = dbutils.widgets.get("database")

# Set this for subsequent cells
spark.sql(f"USE CATALOG `{catalog}`")
spark.sql(f"USE DATABASE `{database}`")

DataFrame[]

In [0]:
%sql
-- This function takes a hotel class as an input and finds the lowest performing hotel in that class that has at least 3 customer reviews and has an above-average customer satisfaction rating

CREATE OR REPLACE FUNCTION get_hotel_to_promote (input_hotel_class STRING COMMENT 'Hotel class to filter by')
RETURNS TABLE (
  hotel_id STRING,
  hotel_name STRING,
  hotel_city STRING,
  hotel_country STRING,
  hotel_description STRING,
  hotel_class STRING,
  average_review_rating DOUBLE,
  review_count INT
)
LANGUAGE SQL
COMMENT 'This function takes a hotel class as an input and finds the lowest performing hotel in that class that has at least 3 customer reviews and has an above-average customer satisfaction rating'
RETURN
  (SELECT
    `HOTEL_ID`,
    `HOTEL_NAME`,
    `HOTEL_CITY`,
    `HOTEL_COUNTRY`,
    `HOTEL_DESCRIPTION`,
    `HOTEL_CLASS`,
    `AVERAGE_REVIEW_RATING`,
    `REVIEW_COUNT`
    FROM
      hotel_stats
    WHERE HOTEL_CLASS = input_hotel_class
    AND `REVIEW_COUNT` > 2
    AND `AVERAGE_REVIEW_RATING` > (
      SELECT
        AVG(`AVERAGE_REVIEW_RATING`)
      FROM
        hotel_stats
    )
    ORDER BY
      `TOTAL_BOOKINGS_COUNT` ASC
    LIMIT 1
  )

In [0]:
%sql
-- Test out the `get_hotel_to_promote` function

SELECT *
  FROM get_hotel_to_promote(:hotel_class);

-- Copy the `hotel_id` value and paste it into the "hotel_to_promote" parameter at the top of this page

-- If you get a "No rows returned" message, then change the hotel_class value to the one with the next lowest performance from your previous Genie prompt

In [0]:
%sql
-- This statement creates a function that takes a HOTEL_ID as input and generates a summary of the top 3 reasons why customers enjoyed their hotel stay.

CREATE OR REPLACE FUNCTION
summarize_customer_reviews(input_hotel_id STRING COMMENT 'ID of the hotel to be searched')
RETURNS STRING
LANGUAGE SQL
COMMENT 'This function takes a HOTEL_ID as input and generates a summary of the top 3 reasons why customers enjoyed their hotel stay'
RETURN (
  SELECT AI_GEN(
    SUBSTRING('Extract the top 3 reasons people like the hotels based on this list of reviews:' || ARRAY_JOIN(COLLECT_LIST(REVIEW_TEXT), ' - '), 1, 80000)
  ) AS all_reviews
  FROM denormalized_hotel_bookings
    WHERE `HOTEL_ID` = input_hotel_id
    -- Try to exclude negative reviews
    AND `REVIEW_RATING` >= 3
)

In [0]:
%sql
-- Try out this function to see the top 3 summary of customer reviews for a hotel

SELECT summarize_customer_reviews(:hotel_to_promote)

In [0]:
%sql
-- This function finds the top 10 customers who transacted the fewest bookings but showed the most interest (via page-views and page-clicks) for a given hotel class

CREATE OR REPLACE FUNCTION identify_target_customers (input_hotel_class STRING COMMENT 'Hotel class to filter by')
RETURNS TABLE (
  customer_email STRING,
  page_views INT,
  page_clicks INT,
  bookings INT
)
LANGUAGE SQL
COMMENT 'This function finds the top 10 customers who transacted the fewest bookings but showed the most interest (via page-views and page-clicks) for a given hotel class'
RETURN
  (
WITH filtered_clickstream AS (
  SELECT
    `CUSTOMER_EMAIL`,
    `ACTION`
  FROM
    `clickstream`
  WHERE
    `ACTION` IN ('page-view', 'page-click', 'booking-click')
),
filtered_dhb AS (
  SELECT
    `CUSTOMER_EMAIL`
  FROM
    `denormalized_hotel_bookings`
  WHERE
    `HOTEL_CLASS` = input_hotel_class
),
joined_table AS (
  SELECT
    a.`CUSTOMER_EMAIL`,
    a.`ACTION`
  FROM
    filtered_clickstream a
      JOIN filtered_dhb b
        ON a.`CUSTOMER_EMAIL` = b.`CUSTOMER_EMAIL`
),
ranked_customers AS (
  SELECT
    `CUSTOMER_EMAIL`,
    COUNT(
      CASE
        WHEN `ACTION` = 'page-view' THEN 1
      END
    ) AS `page_views`,
    COUNT(
      CASE
        WHEN `ACTION` = 'page-click' THEN 1
      END
    ) AS `page_clicks`,
    COUNT(
      CASE
        WHEN `ACTION` = 'booking-click' THEN 1
      END
    ) AS `bookings`,
    ROW_NUMBER() OVER (
        ORDER BY
          COUNT(
            CASE
              WHEN `ACTION` = 'booking-click' THEN 1
            END
          ) ASC,
          COUNT(
            CASE
              WHEN `ACTION` = 'page-view' THEN 1
            END
          ) DESC,
          COUNT(
            CASE
              WHEN `ACTION` = 'page-click' THEN 1
            END
          ) DESC
      ) AS `rank`
  FROM
    joined_table
  GROUP BY
    `CUSTOMER_EMAIL`
)
SELECT
  `CUSTOMER_EMAIL`,
  `page_views`,
  `page_clicks`,
  `bookings`
FROM
  ranked_customers
WHERE
  `rank` <= 10
ORDER BY
  `rank`
)

In [0]:
%sql
-- Test out the identify_target_customers function

SELECT * FROM identify_target_customers(:hotel_class);

## Step 2: Create the AI Agent

In this step you are going to combine these three cruicial parts of our agent:

1. Tools for the Agent to use (from step 1)
2. LLM to serve as the agent's "brains"
3. System prompt that defines guidelines for the agent's tasks

In [0]:
%pip install -U databricks-sdk==0.39.0 langchain-community==0.2.16 langchain-openai==0.1.19 mlflow==2.19.0

In [0]:
dbutils.library.restartPython()

In [0]:
from langchain_community.tools.databricks import UCFunctionToolkit
from langchain.tools import BaseTool
from databricks.sdk import WorkspaceClient
import pandas as pd
from typing import Type, Any
from pydantic import BaseModel, Field
import json

# Retrieve IDs from widget
warehouse_id = dbutils.widgets.get("warehouse_id")
catalog = dbutils.widgets.get("catalog")
db = dbutils.widgets.get("database")

# Custom tool classes that handle SQL quoting properly
class GetHotelToPromoteInput(BaseModel):
    input_hotel_class: str = Field(description="Hotel class to filter by")

class GetHotelToPromoteTool(BaseTool):
    name = "get_hotel_to_promote"
    description = "This function takes a hotel class as an input and finds the lowest performing hotel in that class that has at least 3 customer reviews and has an above-average customer satisfaction rating"
    args_schema: Type[BaseModel] = GetHotelToPromoteInput

    def _run(self, input_hotel_class: str) -> str:
        sql = f"SELECT * FROM `{catalog}`.`{db}`.get_hotel_to_promote('{input_hotel_class}')"
        result = spark.sql(sql).collect()
        if result:
            return json.dumps([row.asDict() for row in result])
        return "[]"

class SummarizeCustomerReviewsInput(BaseModel):
    input_hotel_id: str = Field(description="ID of the hotel to be searched")

class SummarizeCustomerReviewsTool(BaseTool):
    name = "summarize_customer_reviews"
    description = "This function takes a HOTEL_ID as input and generates a summary of the top 3 reasons why customers enjoyed their hotel stay"
    args_schema: Type[BaseModel] = SummarizeCustomerReviewsInput

    def _run(self, input_hotel_id: str) -> str:
        sql = f"SELECT `{catalog}`.`{db}`.summarize_customer_reviews('{input_hotel_id}') as summary"
        result = spark.sql(sql).collect()
        if result and result[0]['summary']:
            return result[0]['summary']
        return "No reviews found for this hotel."

class IdentifyTargetCustomersInput(BaseModel):
    input_hotel_class: str = Field(description="Hotel class to filter by")

class IdentifyTargetCustomersTool(BaseTool):
    name = "identify_target_customers"
    description = "This function finds the top 10 customers who transacted the fewest bookings but showed the most interest (via page-views and page-clicks) for a given hotel class"
    args_schema: Type[BaseModel] = IdentifyTargetCustomersInput

    def _run(self, input_hotel_class: str) -> str:
        sql = f"SELECT * FROM `{catalog}`.`{db}`.identify_target_customers('{input_hotel_class}')"
        result = spark.sql(sql).collect()
        if result:
            return json.dumps([row.asDict() for row in result])
        return "[]"

def get_tools():
    return [
        GetHotelToPromoteTool(),
        SummarizeCustomerReviewsTool(),
        IdentifyTargetCustomersTool()
    ]

In [0]:
from langchain_community.chat_models.databricks import ChatDatabricks

# We're going to use llama 3.3 because it's tool-enabled and is available. Keep temp at 0 to make it more deterministic.
llm = ChatDatabricks(endpoint="databricks-meta-llama-3-3-70b-instruct",
    temperature=0.0,
    streaming=False)

In [0]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.chat_models import ChatDatabricks

#This defines our agent's system prompt. Here we can tell it what we expect it to do and guide it on using specific functions.

def get_prompt(history = [], prompt = None):
    if not prompt:
            prompt = """You are a helpful assistant for a global hotel company. Your task is to assist the marketing leadership in understanding and acting on their products and sales metrics. You can retrieve and analyze relevant data using specific functions.

            You have these three main tasks:

        1. Determine which the hotel to promote, let's call it the `selected hotel`, based on the result of the get_hotel_to_promote function based on the value of the hotel_class parameter.

        2. Use the customer review summary from the summarize_customer_reviews function and the hotel description to craft the content for a positive social marketing post to promote the `selected hotel`. Mention the hotel by its name, but do not mention that the hotel is not performing well nor mention any of its flaws.

        3. Create a list of potential customers to send the social marketing post to by using the identify_target_customers function.

        Format the results of all three tasks into a single cohesive output.

        Follow these guidelines:
        1. Call the appropriate function at each step and ensure results are retrieved before proceeding.
        2. Provide clear, coherent responses without mentioning the underlying functions.
        3. Do not reference the hotel by its ID.
        4. Mention the hotel's city and country.
        5. Answer only what the user asks for, no unnecessary information.
        6. If asked to generate Instagram posts, first determine what customers like most to ensure relevance.
        """
    return ChatPromptTemplate.from_messages([
            ("system", prompt),
            ("human", "{messages}"),
            ("placeholder", "{agent_scratchpad}"),
    ])

In [0]:
from langchain.agents import AgentExecutor, create_openai_tools_agent

prompt = get_prompt()
tools = get_tools()
agent = create_openai_tools_agent(llm, tools, prompt)

def model_parsing_error_handler(e):
    print(f"This error occurred during model parsing: {e}")

#Put the pieces together to create our Agent
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=model_parsing_error_handler)

In [0]:
from operator import itemgetter
from langchain.schema.runnable import RunnableLambda
from langchain_core.output_parsers import StrOutputParser

# Very basic chain that allows us to pass the input (messages) into the Agent and collect the (output) as a string

agent_str = ({ "messages": itemgetter("messages")} | agent_executor | itemgetter("output") | StrOutputParser())


In [0]:
import logging
# Uncomment the line below if you are experiencing errors
# logging.basicConfig(level=logging.DEBUG)

answer = ""

# Retrieve the hotel class from the widget
hotel_class = dbutils.widgets.get("hotel_class")

# Construct the message
message = f"Use the available tools to find the ideal hotel to promote from the {hotel_class} class, then create a positive social marketing post to promote it and lastly find a list of prospective customers to send the post to. Mention the hotel by its name and location."

# Invoke the agent with the constructed message
try:
    answer = agent_str.invoke({"messages": message})
except AssertionError as e:
    logging.error("AssertionError: Statement execution failed but no error message was provided.")
    raise e

display(answer)