In [37]:
import datetime
from langchain.vectorstores import Qdrant
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
import cloudscraper
import httpx  
import os
from langchain.schema import Document
from langchain_openai.embeddings import OpenAIEmbeddings
from dotenv import load_dotenv
load_dotenv()
from typing import Optional, Tuple, Dict, Any
from markitdown import MarkItDown 
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from qdrant_client import QdrantClient
from qdrant_client.models import VectorParams, Distance
import asyncio
import tempfile
import traceback
from typing import Optional, Tuple,List
import re


In [None]:
DOCINTEL_ENDPOINT = "<document_intelligence_endpoint>"
BASE_URL = "https://fptshop.com.vn"
class FPTCrawler:
    def __init__(self):
        """
        Initialize the FPTCrawler class for fetching and processing tour data.
        
        Args:
            base_url: The base URL for resolving relative URLs.
            docintel_endpoint: The endpoint for the MarkItDown service.
            output_dir: Directory to save processed markdown files.
        """
        self.base_url = BASE_URL
        self.docintel_endpoint = DOCINTEL_ENDPOINT
        self.output_dir = "markdown_dir"

    async def fetch_html(self, url: str) -> Optional[str]:
        """
        Fetches HTML content by running both httpx and cloudscraper methods concurrently.
        Returns the result from whichever method completes successfully first.
        """
        httpx_task = asyncio.create_task(self.fetch_html_httpx(url))
        cloudscraper_task = asyncio.create_task(self.fetch_html_cloudscraper(url))

        
        pending_tasks = {httpx_task, cloudscraper_task}
        html_content = None
        while pending_tasks and html_content is None:
            done_tasks, pending_tasks = await asyncio.wait(
                pending_tasks, 
                return_when=asyncio.FIRST_COMPLETED
            )
            
            for task in done_tasks:
                try:
                    result, method = task.result()
                    if result is not None:
                        html_content = result
                        print(f"[Success] Got content using {method}")
                        for pending_task in pending_tasks:
                            pending_task.cancel()
                        break
                except Exception as e:
                    print(f"[Error] Task failed: {e}")
        
        if html_content is None:
            print("[Error] Unable to fetch content from URL using any method.")
            
        return html_content
    
    def fetch_raw_html_from_url(self, url: str) -> Optional[str]:
        """
        Fetches raw HTML content from a URL using cloudscraper.
        Returns HTML string or None if there's an error.
        """
        try:
            scraper = cloudscraper.create_scraper(
                browser={ 
                    'browser': 'chrome',
                    'platform': 'windows',
                    'mobile': False
                }
            )
            response = scraper.get(url, timeout=3)
            response.raise_for_status()
            return response.text
        except Exception as e:
            print(f"[cloudscraper] Failed to fetch: {e}")
            return None

    async def fetch_html_httpx(self, url: str) -> Tuple[Optional[str], str]:
        """
        Fetches HTML content using httpx async client.
        Returns tuple of (HTML string or None if there's an error, method name).
        """
        method_name = "httpx"
        try:
            async with httpx.AsyncClient(timeout=3, limits=httpx.Limits(max_connections=5)) as client:
                response = await client.get(url)
                response.raise_for_status()
                return response.text, method_name
        except Exception as e:
            print(f"[{method_name}] Failed to fetch: {e}")
            return None, method_name

    async def fetch_html_cloudscraper(self, url: str) -> Tuple[Optional[str], str]:
        """
        Fetches HTML content using cloudscraper in an executor to not block the event loop.
        Returns tuple of (HTML string or None if there's an error, method name).
        """
        method_name = "cloudscraper"
        try:
            loop = asyncio.get_event_loop()
            result = await loop.run_in_executor(None, self.fetch_raw_html_from_url, url)
            return result, method_name
        except Exception as e:
            print(f"[{method_name}] Failed to fetch: {e}")
            return None, method_name

    def clean_html_content(self, html_content: str) -> Tuple[str, List[str]]:
        """
        Cleans HTML content by removing headers, navs, footers, etc.,
        extracts og:image meta tags and processes tables into Markdown.
        Returns (cleaned_html, extracted_images).
        """
        soup = BeautifulSoup(html_content, 'html.parser')

        elements_to_remove = [
            "header", "nav", ".header", ".header--wrapper", ".navbar", 
            "footer", ".footer", ".footer--container", 
            ".navigation", ".nav-bar", ".menu", ".main-menu",
            ".sidebar", ".ads", ".advertisement", ".cookie-notice", 
            ".social-media", ".share-buttons", ".comments",
            # FPTShop specific selectors
            ".logo-wrapper", ".logo", ".cart-wrapper", ".main-categories",
            "a[href*='gio-hang']",  # Remove cart link
            # Hot-key div with search suggestions
            ".hot-key"
        ]
        for selector in elements_to_remove:
            if selector.startswith('.'):
                class_name = selector[1:]
                for element in soup.find_all(class_=lambda x: x and class_name in x.split()):
                    element.decompose()
            else:
                for element in soup.find_all(selector):
                    element.decompose()

        extracted_images = []
        seen_image_srcs = set()
        for tag in soup.find_all("meta", property="og:image"):
            src = tag.get("content", "")
            if (src.endswith(".png") or src.endswith(".jpg")) and src not in seen_image_srcs:
                full_url = urljoin(self.base_url, src)
                seen_image_srcs.add(full_url)
                extracted_images.append(f"![Images]({full_url})")

        # Add extraction for all img tags
        for img_tag in soup.find_all("img"):
            src = img_tag.get("src", "")
            if (src.endswith(".png") or src.endswith(".jpg")) and src not in seen_image_srcs:
                full_url = urljoin(self.base_url, src)
                seen_image_srcs.add(full_url)
                extracted_images.append(f"![Images]({full_url})")

        for a_tag in soup.find_all('a'):
            if a_tag.has_attr('href') and not a_tag['href'].startswith(('http://', 'https://', 'data:', '#', 'javascript:')):
                a_tag['href'] = urljoin(self.base_url, a_tag['href'])

        return str(soup), extracted_images

    def format_markdown_content(self, markdown_text: str, extracted_images: List[str]) -> str:
        """
        Formats the markdown content by removing duplicates and formatting.
        
        Args:
            markdown_text: The raw markdown text.
            extracted_images: List of extracted image URLs formatted as markdown.
            
        Returns:
            Formatted markdown content.
        """
        seen = set()
        result_lines = []

        for line in markdown_text.splitlines():
            stripped = line.strip()
            if stripped.lower().startswith("loading") or not stripped:
                continue
            if stripped not in seen:
                seen.add(stripped)
                result_lines.append(line)

        # Add main content
        content = "### CONTENT\n" + "\n".join(result_lines)

        # Append extracted images
        if extracted_images:
            content += "\n\n### IMAGE\n" + "\n".join(extracted_images)

        return content

    async def convert_url_to_markdown(self, url: str) -> Optional[Tuple[str, str]]:
        """
        Converts HTML content from URL to Markdown after cleaning the HTML.
        Uses concurrent fetching to get the fastest successful response.
        
        Args:
            url: The URL to fetch and convert.
            
        Returns:
            Tuple of (formatted_markdown, url) or None if failed.
        """
        temp_html_file_path = None

        try:
            html_content = await self.fetch_html(url)
            
            if html_content is None:
                print("[Error] Unable to fetch content from URL.")
                return None
            
            cleaned_html, extracted_images = self.clean_html_content(html_content)
            
            with tempfile.NamedTemporaryFile(mode='w+', suffix='.html', delete=False, encoding='utf-8') as temp_f:
                temp_f.write(cleaned_html)
                temp_html_file_path = temp_f.name
                print(f"Saved cleaned HTML to temporary file: {temp_html_file_path}")

            md = MarkItDown(docintel_endpoint=self.docintel_endpoint)
            result = md.convert(temp_html_file_path)
            
            formatted_markdown = self.format_markdown_content(result.markdown, extracted_images)
            
            os.makedirs(self.output_dir, exist_ok=True)
            safe_filename = re.sub(r'\W+', '_', url.strip())[:50] + ".md"
            output_path = os.path.join(self.output_dir, safe_filename)

            with open(output_path, 'w', encoding='utf-8') as f_out:
                f_out.write(formatted_markdown)

            print(f"Successfully converted to Markdown and saved to {output_path}")
            return formatted_markdown, url

        except Exception as e:
            print(f"An error occurred during conversion: {e}")
            traceback.print_exc()
            return None
        finally:
            if temp_html_file_path and os.path.exists(temp_html_file_path):
                try:
                    os.remove(temp_html_file_path)
                    print(f"Deleted temporary file: {temp_html_file_path}")
                except OSError as e:
                    print(f"Error when deleting temporary file {temp_html_file_path}: {e}")

In [39]:
from pydantic import BaseModel, ConfigDict
from typing import Annotated, List,Literal

class FPTData(BaseModel):
    """Details of an electronic device including specifications, promotions, and warranty."""

    model_config = ConfigDict(extra='forbid')  

    device_name: Annotated[
        str,
        "Name of the electronic device. Example: 'iPhone 15'"
    ]

    storage: Annotated[
        List[str],
        "Storage capacitíe in GB. Example: ['256', '512']"
    ]
    battery: Annotated[
        str,
        "Information about batter ."
    ]
    cpu: Annotated[
        str,
        "CPU of the computer/laptop. Example: 'Ryzen 5'"
    ]
    card: Annotated[
        str,
        "graphic card of the laptop.Example: 'AMD Radeon Graphics'"
    ]
    screen: Annotated[
        str,
        "Information about screen.Example: '14 inch'"
    ]
    sale_price: Annotated[
        int,
        "Current discounted price in VND. Digits only, no symbols or separators. Example: 4990000"
    ]
    original_price: Annotated[
        Optional[int],
        "Original price before discount (if available). Digits only. Example: 6490000"
    ]
    discount_percent: Annotated[
        Optional[int],
        "Discount percentage without % symbol. Example: 23"
    ]
    installment_price: Annotated[
        Optional[int],
        "Monthly installment amount in VND, digits only. Example: 972750"
    ]
    bonus_points: Annotated[
        Optional[int],
        "Promotional bonus points awarded. Digits only. Example: 1247"
    ]
    suitable_for: Annotated[
        Optional[Literal[
            "students", "adults"
        ]],
        "what type of customer is suitable for this device based on sale_price. follow strictly this rule: if sale_price less than 1000000 then suitable_for is 'students' and if sale_price is more than 10000000 then suitable_for is 'adult'"
    ] = None
    
    colors: Annotated[
        List[str],
        "Available colors of the device. Example: ['black', 'blue']"
    ]

    sales_perks: Annotated[
        str,
        "All detailed text (TRY to find in \"Quà tặng và ưu đãi khác\" and under \"Khuyến mãi được hưởng\") regarding gifts, other sales perks, extra offers, interest-free installment plans. Include exact text from the website with full promotional wording."
    ]

    guarantee_program: Annotated[
        str,
        "ALL Warranty or guarantee program associated with the device. Look for sections like \"Bảo hành mở rộng\" containing information about extended warranties and care programs."
    ]
    
    payment_perks: Annotated[
        str,
        "All detailed text (Try to find in \"Khuyến mãi thanh toán\") regarding promotions, perks, bonus programs, installment options, and payment-related discounts. Include full promotional terms   "
    ]

    image_link: Annotated[
        str,
        "ALL URL links to the device's featured image in the ### IMAGE section"
    ]

    source: Annotated[
        str,
        "Source URL of the product page"
    ] = None


In [None]:
import os

In [None]:
OPENAI_API_KEY=os.getenv("OPENAI_API_KEY")

QDRANT_URL = os.getenv("QDRANT_URL")
QDRANT_API_KEY =os.getenv("QDRANT_API_KEY")
QDRANT_COLLECTION_NAME = os.getenv("STORAGE")
def ai_model():
    return ChatOpenAI(
        openai_api_key=OPENAI_API_KEY,       
        model="gpt-4o-mini",     
        temperature=0
    )


In [41]:
def connect_and_create_collection():
    client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY)
    collection_name = QDRANT_COLLECTION_NAME
    vector_size = 1536    
    if not client.collection_exists(collection_name):
        client.create_collection(
            collection_name=collection_name,
            vectors_config=VectorParams(size=vector_size, distance=Distance.COSINE)
        )
        print(f"Created collection '{collection_name}'")
    else:
        print(f"Collection '{collection_name}' already exists.")

    return client, collection_name

In [42]:
def apply_payload_schema(client, collection_name):
    payload_schema = {
        "device_name": {"type": "text"},
        "storage": {"type": "keyword"},
        "battery": {"type": "text"},
        "colors": {"type": "keyword"}, 
        "cpu": {"type": "text"},
        "card": {"type": "text"},
        "screen": {"type": "text"},
        "suitable_for": {"type": "text"},
        "sales_perks": {"type": "text"},
        "payment_perks": {"type": "text"},
        "guarantee_program": {"type": "text"},
        "source": {"type": "text"},
        "image_link": {"type": "text"},
        "sale_price": {"type": "integer"},          
        "original_price": {"type": "integer"},      
        "discount_percent": {"type": "integer"},   
        "installment_price": {"type": "integer"},  
        "bonus_points": {"type": "integer"},        
    }



    for field_name, field_config in payload_schema.items():
        try:
            client.create_payload_index(
                collection_name=collection_name,
                field_name=field_name,
                field_schema=field_config
            )
            print(f"Index created for field: {field_name}")
        except Exception as e:
            print(f"Failed to create index for {field_name}: {e}")


In [43]:
text_extract_prompt = """
    You are tasked with extracting specific information from the "## Mô tả sản phẩm" section of a product description for FPT Shop. Your focus is on the following categories:
    **FIND INFORMATION ABOUT**
    - Design & Materials
    - Performance: RAM, 
    - Camera & Photography Features
    - Video Recording Capabilities
    - Performance & Hardware
    - Battery & Charging
    - AI Features
    - Comparison with Other Devices
    **YOU MUST**
    - Locate and extract all relevant information pertaining to the categories listed above.
    - Ensure to keep the images links if it goes with the text, (Preserve the original format)
    The text to extract from will be provided in the placeholder {input}.
"""

async def description(content: str) -> str:

    llm = ai_model()
    prompt = ChatPromptTemplate.from_template(text_extract_prompt)
    chain = prompt | llm
    extraction = chain.invoke({"input": content})
    return extraction.content

In [44]:
async def extract_metadata_from_context(context: str, source_url: Optional[str] = None) -> dict:
    """
    Extracts structured metadata from FPT shop product content using an AI model.
    """
    llm = ai_model()
    prompt = ChatPromptTemplate.from_messages([
        (("system", """You are a specialized extractor for FPT Shop product data. Your job is to extract specific fields from product pages exactly as they appear.
        **IMPORTANT**:
        
        1. For sales_perks: Look for sections labeled "Quà tặng và ưu đãi khác" AND "Khuyến mãi được hưởng" or "Chính sách sản phẩm" - include ALL perks, gifts, offers, installment plans, B2B deals with exact wording.
        2. For guarantee_program: Look for ALL warranty information, especially under "Bảo hành mở rộng" including extended warranties, care programs, and their prices.
        3. For payment_perks: Look for section labeled "Khuyến mãi thanh toán" and extract ALL payment-related promotions, discounts, and installment options with full details and conditions.
        4. Make sure the 'suitable_for' field CAN NOT be null, based on price , if price less than 1000000 VND, suitble_for = 'students' else 'adults' 
        RULES
        - Extract text EXACTLY as it appears in the source
        - If a section is not found, return an empty string for that field""")),
        ("human", "Extract the metadata from the following FPT Shop product page text:\n\n{context}")
    ])

    llm = llm.with_structured_output(schema=FPTData)
    chain = prompt | llm 
    
    try:
        result: FPTData = chain.invoke({"context": context})
        metadata = result.model_dump(mode='json')
        time_update = datetime.datetime.now().strftime("%Y-%m-%d")
        if source_url:
            metadata["source"] = source_url
        if time_update:
            metadata['time_update'] = time_update
        return metadata
    except Exception as e:
        print(f"Error extracting metadata: {e}")
        raise

In [45]:
async def process_data(content: str, source_url: Optional[str] = None) -> Tuple[str, Dict[str, Any]]:
    """
    Process tour content by concurrently running summarization and metadata extraction.
    
    Args:
        content: The tour content text
        source_url: Optional URL source of the content
        
    Returns:
        Tuple containing the summary and metadata dictionary
    """
    summary_task = asyncio.create_task(description(content))
    metadata_task = asyncio.create_task(extract_metadata_from_context(content, source_url))
    
    summary, metadata = await asyncio.gather(summary_task, metadata_task)
    
    return summary, metadata

In [46]:
async def pipeline(paths):
    """
    Process multiple URLs, convert them to markdown, and add them to Qdrant.
    
    Args:
        paths: List of URLs to process
    
    Returns:
        Dictionary with results for each URL
    """
    results = {}
    
    try:
        # Initialize the crawler once
        tour_data = FPTCrawler()
        
        # Connect to Qdrant once for all URLs
        client, collection_name = connect_and_create_collection()
        if not client or not collection_name:
            return {"error": "Error in connecting to Qdrant or creating collection."}
        
        # Apply payload schema once
        apply_payload_schema(client, collection_name)
        
        embedding_model = OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY, model="text-embedding-3-small")
        
        # Initialize vectorstore once
        vectorstore = Qdrant(
            client=client,
            collection_name=collection_name,
            embeddings=embedding_model
        )
        
        for path in paths:
            try:
                # Convert URL to markdown
                result = await tour_data.convert_url_to_markdown(path)
                if not result:
                    results[path] = "Error in converting URL to markdown."
                    continue
                    
                content, url = result
                
                # Process the data
                description, metadata = await process_data(content, url)
                
                # Create document
                doc = Document(
                    page_content=description,
                    metadata=metadata
                )
                
                # Add the document to the Qdrant vector store
                vectorstore.add_documents([doc])
                
                results[path] = "Document added to Qdrant successfully."
                
            except Exception as e:
                print(f"An error occurred processing {path}: {e}")
                traceback.print_exc()  
                results[path] = f"Error: {str(e)}"
        
        return results

    except Exception as e:
        print(f"An error occurred in the pipeline: {e}")
        traceback.print_exc()  
        return {"error": f"General pipeline error: {str(e)}"}

In [48]:
import nest_asyncio
nest_asyncio.apply()

# List of URLs to process
urls = [
    "https://fptshop.com.vn/dien-thoai/samsung-galaxy-a56",
    "https://fptshop.com.vn/dien-thoai/samsung-galaxy-a36",
    "https://fptshop.com.vn/dien-thoai/samsung-galaxy-s25-ultra",
    "https://fptshop.com.vn/dien-thoai/iphone-13",
    "https://fptshop.com.vn/dien-thoai/iphone-15-plus",
    "https://fptshop.com.vn/dien-thoai/iphone-14",
    "https://fptshop.com.vn/phu-kien/tai-nghe-bluetooth-alpha-works-curve300",
    "https://fptshop.com.vn/phu-kien/tai-nghe-bluetooth-nhet-tai-defunc-true-go-slim-den",
    "https://fptshop.com.vn/phu-kien/tai-nghe-bluetooth-choang-dau-co-mic-havit-h628bt",
]

results = asyncio.run(pipeline(urls))
print(results)

Collection 'FPT_SHOP' already exists.
Index created for field: device_name
Index created for field: storage
Index created for field: battery
Index created for field: colors
Index created for field: cpu
Index created for field: card
Index created for field: screen
Index created for field: suitable_for
Index created for field: sales_perks
Index created for field: payment_perks
Index created for field: guarantee_program
Index created for field: source
Index created for field: image_link
Index created for field: sale_price
Index created for field: original_price
Index created for field: discount_percent
Index created for field: installment_price
Index created for field: bonus_points
[Success] Got content using cloudscraper
Saved cleaned HTML to temporary file: C:\Users\Admin\AppData\Local\Temp\tmpc8dryy6k.html
Successfully converted to Markdown and saved to markdown_dir\https_fptshop_com_vn_dien_thoai_samsung_galaxy_a56.md
Deleted temporary file: C:\Users\Admin\AppData\Local\Temp\tmpc8dryy