In [13]:
# import libraries
import os
import re
import requests
import tiktoken  # for counting tokens
import pandas as pd  # for storing text and embeddings data
import string  # for removing punctuation
import unicodedata  # for normalizing text
import urllib.request  # for downloading html
from bs4 import BeautifulSoup  # for parsing html
from collections import deque
from html.parser import HTMLParser
from openai import OpenAI  # for calling the OpenAI API
from scipy import spatial  # for calculating vector similarities for search
from tenacity import (
    retry,
    wait_random_exponential,
    stop_after_attempt,
)  # for retrying API calls
from urllib.parse import urlparse
from utils.embeddings_utils import cosine_similarity

# models
EMBEDDING_MODEL = "text-embedding-3-small"
GPT_MODEL = "gpt-4o"

In [14]:
# constants
domain = "www.popchill.com"
full_url = "https://www.popchill.com"
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])

# Crawl Web Pages

Given a list of URLs, crawl each page and return the content of the page.


In [15]:
# Regex pattern to match a URL
HTTP_URL_PATTERN = r"^http[s]*://.+"


# Create a class to parse the HTML and get the hyperlinks
class HyperlinkParser(HTMLParser):
    def __init__(self):
        super().__init__()
        # Create a list to store the hyperlinks
        self.hyperlinks = []

    # Override the HTMLParser's handle_starttag method to get the hyperlinks
    def handle_starttag(self, tag, attrs):
        attrs = dict(attrs)

        # If the tag is an anchor tag and it has an href attribute, add the href attribute to the list of hyperlinks
        if tag == "a" and "href" in attrs:
            self.hyperlinks.append(attrs["href"])


# Function to get the hyperlinks from a URL
def get_hyperlinks(url):

    user_agent = "Mozilla/5.0"
    headers = {"User-Agent": user_agent}

    # Try to open the URL and read the HTML
    try:
        request = urllib.request.Request(url=url, headers=headers)
        # Open the URL and read the HTML
        with urllib.request.urlopen(request) as response:

            # If the response is not HTML, return an empty list
            if not response.info().get("Content-Type").startswith("text/html"):
                return []

            # Decode the HTML
            html = response.read().decode("utf-8")
    except Exception as e:
        print(e)
        return []

    # Create the HTML Parser and then Parse the HTML to get hyperlinks
    parser = HyperlinkParser()
    parser.feed(html)

    return parser.hyperlinks


# Function to get the hyperlinks from a URL that are within the same domain
def get_domain_hyperlinks(local_domain, url):
    clean_links = []
    for link in set(get_hyperlinks(url)):
        clean_link = None

        # If the link is a URL, check if it is within the same domain
        if re.search(HTTP_URL_PATTERN, link):
            # Parse the URL and check if the domain is the same
            url_obj = urlparse(link)
            if url_obj.netloc == local_domain:
                clean_link = link

        # If the link is not a URL, check if it is a relative link
        else:
            if link.startswith("/"):
                link = link[1:]
            elif link.startswith("#") or link.startswith("mailto:"):
                continue
            clean_link = "https://" + local_domain + "/" + link

        if clean_link is not None:
            if clean_link.endswith("/"):
                clean_link = clean_link[:-1]
            clean_links.append(clean_link)

    # Return the list of hyperlinks that are within the same domain
    return list(set(clean_links))


def get_content(soup):
    try:
        name = soup.find("h2", class_="chakra-text").text
        description = soup.find("div", class_="css-icjp7f").text
        data = soup.find("div", class_="css-4cxybv")
        tags = [
            span.get_text().lower()
            for span in data.find_all("span", class_="css-1ny2kle")
        ]
        return name, description, tags
    except:
        return None


def crawl(url):
    # Parse the URL and get the domain
    local_domain = urlparse(url).netloc

    # Create a queue to store the URLs to crawl
    queue = deque([url])

    # Create a set to store the URLs that have already been seen (no duplicates)
    seen = set([url])

    # Create a directory to store the text files
    if not os.path.exists("text/"):
        os.mkdir("text/")

    if not os.path.exists("text/" + local_domain + "/"):
        os.mkdir("text/" + local_domain + "/")

    # Create a directory to store the csv files
    if not os.path.exists("processed"):
        os.mkdir("processed")

    # While the queue is not empty, continue crawling
    while queue:

        # Get the next URL from the queue
        url = queue.pop()
        print(url)  # for debugging and to see the progress

        # Save text from the url to a <url>.txt file
        with open(
            "text/" + local_domain + "/" + url[8:].replace("/", "_") + ".txt", "w"
        ) as f:

            # Get the text from the URL using BeautifulSoup
            soup = BeautifulSoup(requests.get(url).text, "html.parser")

            # Get the content from the URL
            text = soup.get_text()

            # If the crawler gets to a page that requires JavaScript, it will stop the crawl
            if "You need to enable JavaScript to run this app." in text:
                print(
                    "Unable to parse page " + url + " due to JavaScript being required"
                )

            # Write the data to the file
            f.write(text)

        # Get the hyperlinks from the URL and add them to the queue
        for link in get_domain_hyperlinks(local_domain, url):
            if link not in seen:
                queue.append(link)
                seen.add(link)

In [None]:
crawl(full_url)

# Data Cleaning


In [17]:
def remove_newlines(serie):
    """Remove newlines from a pandas series for better processing."""
    serie = serie.str.replace("\n", " ")
    serie = serie.str.replace("\\n", " ")
    serie = serie.str.replace("  ", " ")
    serie = serie.str.replace("  ", " ")
    return serie

In [19]:
# Create a list to store the data records
data = []

# Get all the text files in the text directory
for file in os.listdir("text/" + domain + "/"):

    # Open the file and read the text
    with open("text/" + domain + "/" + file, "r") as f:
        text = f.read()

        # Omit the first 11 lines and the last 4 lines, then replace -, _, and #update with spaces.
        data.append(
            (
                file[len(domain) + 7 : -4]
                .replace("-", " ")
                .replace("_", " ")
                .replace("#update", ""),
                text,
            )
        )
# Create a dataframe from the list of texts
df = pd.DataFrame(data, columns=["fname", "text"])
df["text"] = df.fname + ". " + remove_newlines(df.text)

if not os.path.exists("processed/" + domain + "/"):
    os.mkdir("processed/" + domain + "/")
df.to_csv(f"processed/{domain}/data.csv")

In [20]:
# Retry up to 6 times with exponential backoff, starting at 1 second and maxing out at 20 seconds delay
@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(6))
def get_embedding(text: str, model=EMBEDDING_MODEL):
    # replace newlines, which can negatively affect performance.
    text = text.replace("\n", " ")

    response = client.embeddings.create(input=[text], model=model)

    return response.data[0].embedding

In [21]:
df["embedding"] = df["text"].apply(lambda x: get_embedding(x))
df.to_csv(f"processed/{domain}/embeddings.csv")
df.head()

Unnamed: 0,fname,text,embedding
0,product 24011057169632,product 24011057169632. PopChill x 滙豐信用卡持卡人最紅優...,"[0.024357043206691742, 0.015573929063975811, -..."
1,event 5stars?t=pc menu,event 5stars?t=pc menu. 五星好店｜PopChill 拍拍圈搜尋商品或...,"[0.023113323375582695, 0.010650375857949257, -..."
2,brand series 83,brand series 83. 二手ChloéDrew包包優惠！2024 05月人氣推薦好...,"[0.05937468633055687, -0.006741405464708805, 0..."
3,user nicebuyintw,user nicebuyintw. 拉堤二手名牌(@nicebuyintw) 的衣櫥｜Pop...,"[0.016361327841877937, -0.020826730877161026, ..."
4,search?q=%E6%98%A5%E5%A4%A9,search?q=%E6%98%A5%E5%A4%A9. 春天 價格優惠！2024 05月人...,"[0.03858361393213272, 0.011575084179639816, 0...."


In [26]:
def search_relatedness(
    query: str,
    df: pd.DataFrame,
    relatedness_fn=lambda x, y: cosine_similarity(x, y),
    top_n: int = 100,
):
    # Get the embedding for the query
    query_embedding = get_embedding(query)

    strings_and_relatednesses = [
        (row["text"], relatedness_fn(query_embedding, row["embedding"]))
        for i, row in df.iterrows()
    ]

    strings_and_relatednesses.sort(key=lambda x: x[1], reverse=True)
    strings, relatednesses = zip(*strings_and_relatednesses)
    return strings[:top_n], relatednesses[:top_n]

# Ask


In [34]:
def num_tokens(text: str, model: str = GPT_MODEL) -> int:
    """Return the number of tokens in a string."""
    encoding = tiktoken.encoding_for_model(model)
    return len(encoding.encode(text))


def query_message(query: str, df: pd.DataFrame, model: str, token_budget: int) -> str:
    """Return a message for GPT, with relevant source texts pulled from a dataframe."""
    strings, relatedness = search_relatedness(query, df)
    introduction = '你是 PopChill 的客服，你根據資料回答 "PopChill" 的問題。'
    question = f"\n\nQuestion: {query}"
    message = introduction
    for string in strings:
        data = f'\n\nData:\n"""\n{string}\n"""'
        if num_tokens(message + data + question, model=model) > token_budget:
            break
        else:
            message += data
    return message + question


def ask(
    query: str,
    df: pd.DataFrame = df,
    model: str = GPT_MODEL,
    token_budget: int = 4096 - 500,
    print_message: bool = False,
) -> str:
    """Answers a query using GPT and a dataframe of relevant texts and embeddings."""
    message = query_message(query, df, model=model, token_budget=token_budget)
    if print_message:
        print(message)
    messages = [
        {
            "role": "system",
            "content": "你回答任何關於 PopChill 的問題",
        },
        {"role": "user", "content": message},
    ]
    response = client.chat.completions.create(
        model=model, messages=messages, temperature=0
    )
    response_message = response.choices[0].message.content
    return response_message

### Examples


In [35]:
ask("PopChill 是什麼？")

'PopChill 是一個提供中間驗證服務的 C2C 二手精品交易平台。它專注於二手名牌商品的買賣，並通過 Entrupy 及 LegitApp 等美國第三方服務對商品進行正品鑑定。賣家將商品寄至 PopChill 進行鑑定，確認為正品後再出貨給買家。PopChill 旨在提供安心購物的體驗，並且與平台上銷售的品牌方無任何關聯或從屬關係。PopChill 通過經濟部中小企業處小型企業創新研發計畫 (SBIR) 補助。'

In [41]:
ask("最多人買的商品是什麼？")

'根據提供的資料，最受歡迎的商品是 "product 23092840179685"。這款商品已經售出，並且提供了多種無卡分期付款選項，吸引了許多新客和舊客。'

In [42]:
ask("台灣站都賣哪些商品？")

'在 PopChill 台灣站上，您可以找到以下類型的商品：\n\n1. **側 / 肩背包**\n2. **皮夾**\n3. **手提包**\n4. **後背包**\n\n這些商品涵蓋了多個知名品牌，包括但不限於：\n- Coach\n- Gucci\n- Louis Vuitton\n- Tory Burch\n- Chanel\n- BURBERRY\n- Prada\n\n這些商品主要來自於多個賣家，包括拉堤二手名牌、台中米蘭站、香榭國際精品、流行工廠名牌二手店等。所有商品都經過嚴格的驗證，確保正品。'

In [43]:
ask("哪一個用戶賣得最好？")

'根據提供的資料，PopChill 並沒有公開具體用戶的銷售數據或排名。因此，我無法確定哪一個用戶賣得最好。如果您有其他問題或需要進一步的幫助，請隨時告訴我。您也可以通過 PopChill 的客服信箱 support@popchill.com 聯絡我們的客服團隊。'

In [44]:
ask("有 Line 官方帳號嗎？")

'根據提供的資料，PopChill 並未提及有 Line 官方帳號。如果您有其他問題或需要更多幫助，請隨時聯絡我們的客服信箱：support@popchill.com。'