In [1]:
from bs4 import BeautifulSoup, Comment
import re
import requests, time, logging, json
import uuid
import tldextract
from openai import OpenAI
from dotenv import load_dotenv
import os
from typing import Dict, Any

load_dotenv("config.env")
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

def get_openai_response(messages, response_format="text"):
    try:
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "text",
                            "text": messages
                        }
                    ]
                }
            ],
            response_format={
                "type": response_format
            }
        )
        return response.choices[0].message.content
    except Exception as e:
        print(f"Error calling OpenAI API: {e}")
        return None

class Website_agent:
    def __init__(self, domain, existing_urls=None):
        self.crawl_key = os.getenv("CRAWLBASE_KEY")
        if not existing_urls:
            self.existing_urls = [domain]
        else:
            self.existing_urls = existing_urls

        self.main_domain = domain
        if not domain.startswith(('http://', 'https://')):
            domain = 'https://' + domain
        self.domain = re.sub(r'/$', '', domain)

        self.logger = logging.getLogger(__name__)

    def generate_request_id(self):
        random_uuid = uuid.uuid4()
        return random_uuid

    def fetch_html_by_js_token(self, url, max_retry=2):
        api_url = f'https://api.crawlbase.com/?token={self.crawl_key}&url={url}'
        for i in range(max_retry):
            try:
                print(f"Fetching page: {url} (Attempt {i + 1})")
                response = requests.get(api_url)
                if response.status_code == 200:
                    print("Page successfully fetched")
                    return response.text
                else:
                    print(f"Request failed, status code: {response.status_code}, retrying...")
                    if i < max_retry - 1:
                        time.sleep(1)
            except requests.exceptions.RequestException as e:
                print(f"Network request error: {str(e)}, retrying...")
                if i < max_retry - 1:
                    time.sleep(1)
                else:
                    raise
        raise requests.exceptions.RequestException(f"Max retries exceeded, URL fetching failed: {url}")

    def extract_head_info(self, html):
        soup = BeautifulSoup(html, 'html.parser')
        head_tag = soup.head

        title = head_tag.title.string if head_tag.title else None

        description = head_tag.find('meta', attrs={'name': 'description'}).get('content') if head_tag.find('meta',
                                                                                                           attrs={
                                                                                                               'name': 'description'}) else None

        return title, description

    def extract_body_tags(self, html):
        soup = BeautifulSoup(html, 'html.parser')
        body_tag = soup.body

        if not body_tag:
            return False

        for script_tag in body_tag.find_all('script'):
            script_tag.extract()

        for noscript_tag in body_tag.find_all('noscript'):
            noscript_tag.extract()

        for style_tag in body_tag.find_all('style'):
            style_tag.extract()

        for link_tag in body_tag.find_all('link'):
            link_tag.extract()

        for img_tag in body_tag.find_all('img'):
            img_tag.attrs = {'alt': img_tag.get('alt', '')}

        for tag in body_tag.find_all(True):
            tag.attrs = {attr: value for attr, value in tag.attrs.items() if attr.lower() != 'style'}

        for svg_tag in body_tag.find_all('svg'):
            svg_tag.extract()

        for tag in body_tag.find_all(True):
            if tag.name == 'a':
                tag.attrs = {attr: value for attr, value in tag.attrs.items() if attr.lower() == 'href'}
            else:
                tag.attrs = {}

        for comment in soup.find_all(string=lambda text: isinstance(text, Comment)):
            comment.extract()

        body_tags = ''.join(str(tag) for tag in body_tag.contents)
        body_tags = body_tags.replace("\n", "")

        return body_tags

    def extract_and_process_values(self, text):
        status_pattern = r'<status>(.*?)</status>'
        url_pattern = r'<url>(.*?)</url>'
        answer_pattern = r'<answer>(.*?)</answer>'

        status_matches = re.findall(status_pattern, text, re.DOTALL)
        url_matches = re.findall(url_pattern, text)
        answer_matches = re.findall(answer_pattern, text, re.DOTALL)

        statuses = [match.strip() for match in status_matches]
        urls = []
        contacts = []
        for match in url_matches:
            match = match.strip()
            if match:
                if match.startswith('tel:') or match.startswith('mailto:'):
                    contacts.append(match)
                else:
                    if not match.startswith('http://') and not match.startswith('https://'):
                        if match.startswith('/'):
                            url_value = self.domain.rstrip('/') + match
                        else:
                            url_value = match
                    else:
                        url_value = match
                    url_value = re.sub(r'/$', '', url_value)
                    if url_value not in self.existing_urls:
                        match_domain = extract_main_domain(url_value)
                        if match_domain == self.main_domain:
                            urls.append(url_value)
                        else:
                            print(f"Skipping external domain - Current domain:{self.main_domain}, Matched domain:{match_domain}")

        urls = list(set(urls))
        answers = [match.strip() for match in answer_matches]

        result = {
            'status': 0,
            'answer': '',
            'urls': [],
            'contacts': []
        }

        if 'Fully meets the query requirement' in statuses:
            result['status'] = 1
        elif 'Partially meets the query requirement' in statuses:
            result['status'] = 2
        elif 'Unable to meet the query requirement' in statuses:
            result['status'] = 3

        if answers:
            result['answer'] = ' '.join(answers)

        if urls:
            result['urls'] = urls

        if contacts:
            result['contacts'] = contacts

        return result

    def process_website(self, htmlObj, url, question):
        print(f"\nProcessing website: {url}")
        if 'body' not in htmlObj or not htmlObj['body']:
            return {"status": 4, "answer": "", "urls": [], "error": "Page content is empty"}

        if 'title' not in htmlObj:
            htmlObj['title'] = ''
        if 'description' not in htmlObj:
            htmlObj['description'] = ''

        try:
            prompt = f"""###
            The URL to be crawled:{url}
            Title:{htmlObj['title']}
            Description:{htmlObj['description']}
            Information contained in the website's HTML body tags:
            {htmlObj['body']}
            ###

            Instructions:
            1. The content enclosed by ### above describes website content. When responding to a Question, provide answers based on this website content.
            2. There are three statuses for responding to questions (the provided URL is just an example, and the answer tag should only contain the response):
               a). <status>Fully meets the query requirement</status> <answer>The answer</answer>
               b). <status>Partially meets the query requirement</status>, with the potential for more detailed answers by crawling <urls><url>website1</url><url>website2</url></urls>, <answer>The answer</answer>
               c). <status>Unable to meet the query requirement</status>, if there are recommended URLs to crawl, please specify them in the format: <urls><url>website address</url><url>website address</url></urls> for the answer
            3. The suggested URLs for crawling should not be identical to the URL being crawled: {url}.
            4. When the question requires providing contact information, include email, phone number, social media, etc.
            5. If there is data in the website content that is relevant to the question, it's best to reflect that relevant data in the answer.    

            Question:
            {question}
            """
            print("Analyzing page content...")
            response = get_openai_response(prompt)
            print("Page analysis completed, extracting results...")
            result = self.extract_and_process_values(response)
            return result
        except Exception as e:
            print(f"Error processing website: {str(e)}")
            return {"status": 0, "answer": "", "urls": [], "error": str(e)}

    def fetch_html_and_process_website(self, url, question):
        try:
            print(f"\nFetching and processing page: {url}")
            html_content = self.fetch_html_by_js_token(url)
            title, description = self.extract_head_info(html_content)
            body_tags = self.extract_body_tags(html_content)

            if body_tags is False:
                return {"status": 4, "error": "Page content is empty"}

            prompt = f"""###
            The URL to be crawled:{url}
            Title:{title}
            Description:{description}
            Information contained in the website's HTML body tags:
            {body_tags}
            ###

            Instructions:
            1. The content enclosed by ### above describes website content. When responding to a Question, provide answers based on this website content.
            2. There are three statuses for responding to questions (the provided URL is just an example, and the answer tag should only contain the response):
               a). <status>Fully meets the query requirement</status> <answer>The answer</answer>
               b). <status>Partially meets the query requirement</status>, with the potential for more detailed answers by crawling <urls><url>website1</url><url>website2</url></urls>, <answer>The answer</answer>
               c). <status>Unable to meet the query requirement</status>, if there are recommended URLs to crawl, please specify them in the format: <urls><url>website address</url><url>website address</url></urls> for the answer
            3. The suggested URLs for crawling should not be identical to the URL being crawled: {url}.
            4. When the question requires providing contact information, include email, phone number, social media, etc.
            5. If there is data in the website content that is relevant to the question, it's best to reflect that relevant data in the answer.    

            Question:
            {question}
            """

            response = get_openai_response(prompt)
            result = self.extract_and_process_values(response)

            return result

        except Exception as e:
            print(f"Error processing website: {str(e)}")
            raise

    def process_website_url(self, htmlObj, url, question, request_id=None):
        print(f"\nExtracting page URL: {url}")
        if 'body' not in htmlObj or not htmlObj['body']:
            return {"status": 4, "error": "Page content is empty"}

        if 'title' not in htmlObj:
            htmlObj['title'] = ''
        if 'description' not in htmlObj:
            htmlObj['description'] = ''

        try:
            prompt = f"""###
            The URL to be crawled:{url}
            Title:{htmlObj['title']}
            Description:{htmlObj['description']}
            Information contained in the website's HTML body tags:
            {htmlObj['body']}
            ###
            You are a data analysis engineer.

            Question: {question}

            1. Analyze the HTML and extract all valid URLs and description information.
            2. Based on the extracted information, list URLs related to the question using <url></url> tags.
            3. Irrelevant ones should not be tagged.
            """

            response = get_openai_response(prompt)
            url_pattern = r'<url>(.*?)</url>'
            url_matches = re.findall(url_pattern, response)
            urls = []
            contacts = []
            for match in url_matches:
                match = match.strip()
                url_value = ""
                if match:
                    if match.startswith('tel:') or match.startswith('mailto:'):
                        contacts.append(match)
                    else:
                        if not match.startswith('http://') and not match.startswith('https://'):
                            if match.startswith('/'):
                                url_value = self.domain.rstrip('/') + match
                        else:
                            match_domain = extract_main_domain(match)
                            if match_domain == self.main_domain:
                                url_value = match
                            else:
                                print(f"Skipping external domain - Current domain:{self.main_domain}, Matched domain:{match_domain}")
                        if url_value:
                            url_value = re.sub(r'/$', '', url_value)
                            if url_value not in self.existing_urls:
                                urls.append(url_value)

            urls = list(set(urls))

            if not request_id:
                request_id = self.generate_request_id()

            return urls

        except Exception as e:
            print(f"Error processing page URL: {str(e)}")
            return {"status": 0, "error": str(e)}

    def test_process_website(self, url, question, max_depth=3):
        request_id = self.generate_request_id()
        print(f"\n=== Starting website analysis ===")
        print(f"Initial URL: {url}")
        print(f"Max exploration depth: {max_depth}")
        
        try:
            print("\nStep 1: Fetching page content")
            html_content = self.fetch_html_by_js_token(url)
            title, description = self.extract_head_info(html_content)
            body_tags = self.extract_body_tags(html_content)

            if not body_tags:
                print("Error: Page content is empty")
                return {"request_id": request_id, "status": 4, "error": "Page content is empty"}

            print("\nStep 2: Analyzing page content")
            htmlObj = {'title': title, 'description': description, 'body': body_tags}
            result = self.process_website(htmlObj, url, question)
            
            print(f"\nStep 3: Processing analysis result")
            print(f"Status code: {result['status']}")
            if result['status'] == 1:
                print("✓ Complete answer found")
                return {'status': result['status'], 'answer': result['answer']}
            
            elif result['status'] == 2:
                print("→ Partial answer found, further analysis needed...")
                if max_depth > 0:
                    print("\nStep 4: Analyzing related pages")
                    return self._process_additional_urls(result, htmlObj, url, question, request_id, max_depth)
                else:
                    print("Max depth reached, returning current results")
                    return {'status': 2, 'answer': result['answer']}
                    
            elif result['status'] == 3:
                print("→ No answer found, trying other pages...")
                if max_depth > 0:
                    print("\nStep 4: Analyzing other related pages")
                    return self._process_additional_urls(result, htmlObj, url, question, request_id, max_depth)
                else:
                    print("Max depth reached, unable to find answer")
                    return {'status': 3}
            
            return result

        except Exception as e:
            print(f"\nError during processing: {str(e)}")
            return {"status": 0, "error": str(e)}

    def _process_additional_urls(self, initial_result, htmlObj, url, question, request_id, max_depth):
        urls = initial_result['urls'] or self.process_website_url(htmlObj, url, question, request_id)
        if urls:
            print(f"Found {len(urls)} related links")
        answers = [initial_result['answer']] if initial_result['answer'] else []
        
        for i, url_value in enumerate(urls, 1):
            if url_value not in self.existing_urls:
                print(f"\nAnalyzing related link {i}/{len(urls)}: {url_value}")
                self.existing_urls.append(url_value)
                sub_result = self.fetch_html_and_process_website(url_value, question)
                
                if sub_result['status'] == 1:
                    print("✓ Complete answer found in related page")
                    return sub_result
                elif sub_result['status'] == 2:
                    print("→ Partial related information found")
                    answers.append(sub_result['answer'])

        if answers:
            print("\nConsolidating all found information...")
            return {'status': 2, 'answer': '\n'.join(answers)}
        
        print("\nNo related answers found")
        return {'status': 3}

def extract_main_domain(url):
    if not url.startswith(('http://', 'https://')):
        url = 'https://' + url
    ext = tldextract.extract(url)
    return ".".join(part for part in (ext.domain, ext.suffix) if part)


In [2]:
# Testing code
crawl_url = "https://www.percent.cn"
existing_urls = [crawl_url]
main_domain = extract_main_domain(crawl_url)
question = "Main members and contact information"

website_bot = Website_agent(main_domain, existing_urls)
result = website_bot.test_process_website(crawl_url, question)

print("Crawl result:")
print(f"Status code: {result['status']}")

if result['status'] == 0:
    print("Error information:")
else:
    if result['status'] == 2 or result['status'] == 1:
        print(f"Answer: {result['answer']}")
    if result['status'] == 3:
        print("Unable to provide an answer")
    if result['status'] == 4:
        print("Unable to provide an answer")


=== Starting website analysis ===
Initial URL: https://www.percent.cn
Max exploration depth: 3

Step 1: Fetching page content
Fetching page: https://www.percent.cn (Attempt 1)
Page successfully fetched

Step 2: Analyzing page content

Processing website: https://www.percent.cn
Analyzing page content...
Page analysis completed, extracting results...

Step 3: Processing analysis result
Status code: 2
→ Partial answer found, further analysis needed...

Step 4: Analyzing related pages

Extracting page URL: https://www.percent.cn
Found 2 related links

Analyzing related link 1/2: https://percent.cn/Contact.html

Fetching and processing page: https://percent.cn/Contact.html
Fetching page: https://percent.cn/Contact.html (Attempt 1)
Page successfully fetched
→ Partial related information found

Analyzing related link 2/2: https://percent.cn/Company.html

Fetching and processing page: https://percent.cn/Company.html
Fetching page: https://percent.cn/Company.html (Attempt 1)
Page successfully 

In [3]:
serper_api_key = os.getenv("SERPER_KEY")

def process_query(query: str) -> Dict[str, Any]:
    """
    Analyze the user's query and execute the corresponding skill
    Args:
        query: The user's query content
    Returns:
        A dictionary with the execution result
    """
    print(f"\nStarting query processing: {query}")
    
    prompt = f"""You are an AI web search Q&A assistant, your main job is to determine which skill to trigger next based on the question.

Skill Types:
Search Official Website (type_code: search)
Use Case: When the user inquires about a product, company, or service but does not provide the official URL.
You must generate a search keyword for finding the official website.
Return Format: {{"type_code": "search", "keyword": "company name official website","question":"user's question"}}

Website Crawling (type_code: crawl)
Use Case: The user directly provides the target URL.
Return Format: {{"type_code": "crawl", "url": "user-provided URL","question":"user's question"}}

Direct Answer (type_code: other)
Use Case: For general Q&A or chit-chat.
Return Format: {{"type_code": "other", "answer": "content of the answer"}}

Strictly return results in the JSON format described above without any additional explanatory text.

User Question: {query}
"""

    # Get GPT analysis result
    print("Analyzing query type...")
    analysis_result = json.loads(get_openai_response(prompt, "json_object"))
    print(f"Analysis Result: {json.dumps(analysis_result, ensure_ascii=False, indent=2)}")

    # Execute corresponding skills based on analysis result
    if analysis_result['type_code'] == 'search':
        print("\nExecuting search skill")
        print(f"Search keyword: {analysis_result['keyword']}")
        return search_and_process(analysis_result['question'])
            
    elif analysis_result['type_code'] == 'crawl':
        print("\nExecuting website crawling")
        crawl_url = analysis_result['url']
        existing_urls = [crawl_url]
        main_domain = extract_main_domain(crawl_url)
        website_bot = Website_agent(main_domain, existing_urls)
        result = website_bot.test_process_website(crawl_url, analysis_result['question'])
        
        print("Crawl result:")
        print(f"Status code: {result['status']}")
        if result['status'] == 0:
            print("Error information:")
        else:
            if result['status'] == 2 or result['status'] == 1:
                print(f"Answer: {result['answer']}")
            if result['status'] == 3:
                print("Unable to provide an answer")
            if result['status'] == 4:
                print("Unable to provide an answer")
                
        return result
        
    else:  # type_code == 'other'
        print("\nExecuting direct answer")
        return analysis_result

def build_official_site_prompt(search_results: list) -> str:
    """
    Build prompt for determining official website
    Args:
        search_results: List of Google search results
    """
    print("\n=== Building prompt for official site identification ===")
    
    prompt = """You are a website classification expert. Your task is to identify the most likely official website from search results.
    
Rules:
1. Official websites usually have the following characteristics:
   - The domain name usually contains the company/brand name
   - URL structure is simple, typically the root domain
   - Website title usually includes the company/brand name
   - Generally not links to social media, news sites, or third-party platforms

2. Factors to consider (in order of importance):
   a) Matching of the domain name with the company/brand name
   b) Whether it's the brand's main domain (not a subdomain)
   c) Credibility of the link
   d) Official nature of the website description

Analyze the following search results and return the most likely official website URL in JSON format.

Search Results:
"""
    # Add search results to the prompt
    print(f"\nAnalyzing the following search results:")
    for idx, result in enumerate(search_results, 1):
        url = result.get('link', '')
        title = result.get('title', '')
        snippet = result.get('snippet', '')
        
        prompt += f"""
{idx}. Website Info:
   URL: {url}
   Title: {title}
   Description: {snippet}
"""
        print(f"\n{idx}. {url}")
        print(f"   Title: {title}")

    prompt += """
Return the analysis result in the following JSON format:
{
    "official_url": "Most likely official website URL",
    "confidence": "High/Medium/Low",
    "reason": "Brief explanation of why this choice was made"
}

Note:
- If no obvious official website is found, set confidence to "Low"
- If multiple possible official websites are found, choose the most likely one
"""
    print("\nPrompt construction completed")
    return prompt

def process_search_results(search_results: dict, question: str) -> dict:
    """
    Process Google search results to find the official website
    Args:
        search_results: Full search results from Google Serper
        question: Original question
    Returns:
        dict: Result containing the official website URL
    """
    print("\n=== Beginning to process search results ===")
    try:
        if 'organic' not in search_results:
            print("❌ Error: Search results format error")
            return {"error": "Search results format error"}

        print(f"Found {len(search_results['organic'])} search results")
        
        # Build analysis prompt
        print("\nStep 1: Building analysis prompt")
        prompt = build_official_site_prompt(search_results['organic'][:5])  # Only analyze the top 5 results
        
        # Analyze results
        print("\nStep 2: Using AI to analyze search results")
        try:
            result = json.loads(get_openai_response(prompt, "json_object"))
            
            print(f"\n=== AI Analysis Result ===")
            print(f"✓ Identified Official Website: {result['official_url']}")
            print(f"✓ Confidence: {result['confidence']}")
            print(f"✓ Reason for Choice: {result['reason']}")
            
            if result['official_url']:
                if result['confidence'] == "Low":
                    print("\nWarning: Low confidence in the official website identification, but proceeding anyway")
                    
                print(f"\nStep 3: Starting website crawling")
                print(f"Target URL: {result['official_url']}")
                
                # Instantiate Website_agent and perform crawling
                crawl_url = result['official_url']
                existing_urls = [crawl_url]
                main_domain = extract_main_domain(crawl_url)
                
                agent = Website_agent(main_domain, existing_urls)
                return agent.test_process_website(
                    url=crawl_url,
                    question=question
                )
            else:
                print("\n❌ Error: No credible official website found")
                return {"error": "No credible official website found"}
                
        except Exception as e:
            print("\n❌ Error: Analysis process failed")
            print(f"Error information: {str(e)}")
            return {"error": f"Error occurred during analysis: {str(e)}"}
            
    except Exception as e:
        print("\n❌ Error: Failed to process search results")
        print(f"Error information: {str(e)}")
        return {"error": f"Error occurred while processing search results: {str(e)}"}

def search_and_process(query: str) -> dict:
    """Search and process results"""
    print("\n=== Starting search process ===")
    search_keyword = f"{query.strip()} official website"
    print(f"Search keyword: {search_keyword}")
    
    headers = {
        'X-API-KEY': serper_api_key,
        'Content-Type': 'application/json'
    }
    payload = {
        'q': search_keyword,
        'gl': 'us',
        'page': 0,
        'num': 5
    }
    
    try:
        print("\nStep 1: Calling search API")
        response = requests.post(
            "https://google.serper.dev/search",
            headers=headers,
            json=payload
        )
        
        if response.status_code == 200:
            print("✓ Search request successful")
            search_results = response.json()
            
            print("\nStep 2: Processing search results")
            return process_search_results(search_results, query)
        else:
            print(f"\n❌ Error: Search request failed (Status code: {response.status_code})")
            return {"error": f"Search request failed: {response.status_code}"}
            
    except Exception as e:
        print("\n❌ Error: Search process failed")
        print(f"Error information: {str(e)}")
        return {"error": f"Error occurred during the search process: {str(e)}"}

In [7]:
#query2 = "https://www.percent.cn/ Percent company's main members and contact information"
query2 = "Percent company's main members and contact information"
# query2 = "What's the weather like today"
result2 = process_query(query2)
print(f"Query question: {query2}")
print("\nFinal result:")
print(result2)


Starting query processing: Percent company's main members and contact information
Analyzing query type...
Analysis Result: {
  "type_code": "search",
  "keyword": "Percent company official website",
  "question": "Percent company's main members and contact information"
}

Executing search skill
Search keyword: Percent company official website

=== Starting search process ===
Search keyword: Percent company's main members and contact information official website

Step 1: Calling search API
✓ Search request successful

Step 2: Processing search results

=== Beginning to process search results ===
Found 5 search results

Step 1: Building analysis prompt

=== Building prompt for official site identification ===

Analyzing the following search results:

1. https://percent.com/contact/
   Title: Contact - Percent

2. https://percent.com/
   Title: Percent — Private credit investing. Simplified.

3. https://poweredbypercent.com/legal/
   Title: Legal - Percent

4. https://leadiq.com/c/percen