## Readme

This notebook contains code that allows one to scrape data from the Dastex website. Following that is some text cleaning code. Also was implemented a structured JSON parser which attemps to create structured JSON from the websites. Embeddings are created from the website content and the different retrieval methods in Langchain are tested on the vector database.

In [44]:
from bs4 import BeautifulSoup
from usp.tree import sitemap_tree_for_homepage
import requests
import re
import json
import logging
logging.basicConfig()
logging.getLogger('langchain.retrievers.multi_query').setLevel(logging.INFO)
logging.getLogger('langchain.retrievers.self_query.base').setLevel(logging.INFO)

import platform
if platform.processor() != 'arm':
    from pysitemap import crawler
    from pysitemap.parsers.lxml_parser import Parser


import os

from pathlib import Path

from typing import Tuple
from pydantic import BaseModel
import lark

from langchain.document_loaders import AsyncHtmlLoader
from langchain.embeddings.base import Embeddings
from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA, RetrievalQAWithSourcesChain
from langchain.vectorstores.base import VectorStore
from langchain.llms import VertexAI
from langchain.embeddings import VertexAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter
from langchain.schema import Document
from langchain.indexes.vectorstore import VectorStoreIndexWrapper
from langchain.vectorstores.chroma import Chroma
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo
from vertexai.preview.language_models import TextGenerationModel
from usp.tree import sitemap_tree_for_homepage
from langchain import PromptTemplate
import pandas as pd
import requests
from bs4 import BeautifulSoup
import re
import nest_asyncio
from langchain.document_transformers import Html2TextTransformer
from langchain.prompts import (
    PromptTemplate,
    ChatPromptTemplate,
    HumanMessagePromptTemplate,
)
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field, validator, root_validator
from typing import List, Optional
import asyncio
import glob
import os 
from google.cloud import storage


In [45]:
def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"
    # The path to your file to upload
    # source_file_name = "local/path/to/file"
    # The ID of your GCS object
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Optional: set a generation-match precondition to avoid potential race conditions
    # and data corruptions. The request to upload is aborted if the object's
    # generation number does not match your precondition. For a destination
    # object that does not yet exist, set the if_generation_match precondition to 0.
    # If the destination object already exists in your bucket, set instead a
    # generation-match precondition using its generation number.
    generation_match_precondition = 0

    blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)

    print(
        f"File {source_file_name} uploaded to {destination_blob_name}."
    )

def upload_from_directory(dest_bucket_name: str, directory_path: str, dest_blob_name: str):
    GCS_CLIENT = storage.Client()
    rel_paths = glob.glob(directory_path + '/**', recursive=True)
    bucket = GCS_CLIENT.get_bucket(dest_bucket_name)
    for local_file in rel_paths:
        remote_path = f'{dest_blob_name}/{"/".join(local_file.split(os.sep)[1:])}'
        if os.path.isfile(local_file):
            blob = bucket.blob(remote_path)
            blob.upload_from_filename(local_file)


## Some global variables that are useful

In [46]:
PROJECT_ID = "dastex-genai"
REGION = "eu-west1" 
EMBED_FOLDER = "embeddings_website"
JSON_FILE_NAME = "../data/scraped_website_text.json"
SCRAPED_TEXT_BUCKET = "dastex-scraped-text"
EMBEDDINGS_BUCKET = "dastex-chroma-embeddings"

## Functions for getting the website URLs

The get_sitemap function creates an xml sitemap from the website since this is not available

In [47]:
class WebTools:
    def __init__(self, root_url='https://www.dastex.de/produktportfolio/',
        out_file = '../data/sitemap.xml',
        exclude_urls = [".pdf", ".jpg", ".zip"]
    ):
        self.root_url = root_url
        self.out_file = out_file
        self.exclude_urls = exclude_urls

    def get_sitemap(self, websites):
        """
        Scrapes websites for URLs that match the given filter.

        Args:
            websites (list): A list of website URLs to scrape.
            filter (list): A list of keywords to filter the URLs.

        Returns:
            list: A list of URLs that match the filter.
        """
        crawler(
            self.root_url, out_file=self.out_file, exclude_urls=self.exclude_urls,
            http_request_options={"ssl": False}, parser=Parser
        )
    
    def get_urls(self, sitemap_path):
        urls = []

        with open(sitemap_path, 'r', encoding='utf-8') as f:
            xml_url = BeautifulSoup(f.read())

        for url in xml_url.find_all('loc'):
            urls.append(url.get_text())

        self.urls = urls

        return urls

    def split_urls(self):
        split_urls = [x.replace(self.root_url, '').split('/') for x in self.urls]
        return split_urls

    def group_stats(self):
        split_urls = self.split_urls()

        df = pd.DataFrame(split_urls, columns=['group_{}'.format(i) for i in range(5)]).drop(['group_4'], axis=1)
        df['url'] = self.urls
        df['group_3'] = df['group_3'].replace('', pd.NA)
        df['group_2'] = df['group_2'].replace('', pd.NA)
        df['group_1'] = df['group_1'].replace('', pd.NA)

        # First find the products that have the product name in group_3
        df_3 = df.dropna(subset=['group_3']).copy()
        g3 = list(df_3['group_0'].unique())
        counts_3 = df_3.groupby('group_0').count().drop(['group_2', 'group_3', 'url'], axis=1).rename({'group_1': 'count'}, axis=1)
        c3 = counts_3.merge(df_3[['group_0', 'group_1', 'group_2', 'group_3', 'url']], left_index=True, right_on='group_0')

        # Then find the products that have the product name in group_2
        df_2 = df[~(df['group_0'].isin(g3))]
        df_2 = df_2.dropna(subset=['group_2'])
        g2 = list(df_2['group_0'].unique())
        counts_2 = df_2.groupby('group_0').count().drop(['group_2', 'group_3', 'url'], axis=1).rename({'group_1': 'count'}, axis=1)
        c2 = counts_2.merge(df_2[['group_0', 'group_1', 'group_2', 'url']], left_index=True, right_on='group_0')

        # Finally the products that have the product name in group_1
        # It appears that these pages do not contain individual product information and instead each
        # Page contains information about multiple products
        df_1 = df[~(df['group_0'].isin(g2+g3))]
        df_1 = df_1.dropna(subset=['group_1'])
        g1 = list(df_1['group_0'].unique())
        counts_1 = df_1.groupby('group_0').count().drop(['group_2', 'group_3', 'url'], axis=1).rename({'group_1': 'count'}, axis=1)
        c1 = counts_1.merge(df_1[['group_0', 'group_1', 'url']], left_index=True, right_on='group_0')

        return c1, c2, c3

In [48]:
site = WebTools()
urls = site.get_urls('../data/sitemap.xml')



In [49]:
c1, c2, c3 = site.group_stats()

In [50]:
for entry in pd.concat([c1['group_0'], c2['group_0'], c3['group_0']]).unique():
    print(entry)

desinfektionsmittel
entsorgungssysteme-zubehoer
klebebaender-etiketten
mobiliar
papier-zubehoer
reinigung
spendersysteme
spezifische-produkte
staubbindematten
reinraumtuecher
schuhe-socken
zwischenbekleidung
einweg-schutzbekleidung
handschuhe-fingerlinge
oberbekleidung


In [51]:
product_urls = list(c1['url']) + list(c2['url']) + list(c3['url'])

# Original scraping method

https://bitbucket.org/niologic/frontend_pilot/src/notebooks/website_product_google_embeddings_search.md

In [45]:
combined_data = []

for url in urls:
    try:
        response = requests.get(url)
        response.raise_for_status()  # Raise an exception for HTTP errors
    except requests.exceptions.RequestException as e:
        print(f"Error: {e}")
        print(f"Skipping URL: {url}")
        continue  # Skip this URL and move to the next one

    soup = BeautifulSoup(response.text, "html.parser")
    text = soup.get_text()
    clean_text = re.sub(r'[^a-zA-Z0-9ßäöüAÄÖÜ ]', ' ', text)
    clean_text = re.sub(r'\s+', ' ', clean_text)
    clean_text = re.sub(r'\n+', ' ', clean_text)

    data = {"text": clean_text, "source": url}

    combined_data.append(data)


In [78]:
split_data = []
NUM_CHARACTERS = 1000

for data in combined_data:
    text = data['text']
    source = data['source']
   
    chunks = [text[i:i+NUM_CHARACTERS] for i in range(0, len(text), NUM_CHARACTERS)]

    split = source.split('/')
    index = split.index('produktportfolio')
    groups = {}
    for idx, x in enumerate(split[(index+1):(len(split)-1)]):
        groups['group_' + str(idx)] = re.sub(r'[^a-zA-Z0-9ßäöüAÄÖÜ ]', ' ', x)

    for chunk in chunks:
        # Create a new dictionary for each chunk with text, source, and extracted groups
        data = {'text': chunk, 'source': source}
        data.update(groups)
        split_data.append(data)

with open(JSON_FILE_NAME, "w", encoding="utf-8") as file:
    json.dump(split_data, file, ensure_ascii=False)


## Text Scraping Experimentation

I believe this gives cleaner text using the Langchain own `AsyncHtmlLoader()`

In [52]:
nest_asyncio.apply()

header_str = ' Ihre Newsletter Anmeldung Schließen Zum Hauptinhalt springen 49 7222 9696 60 info dastex com suchen Toggle navigation Produktportfolio Oberbekleidung Zwischenbekleidung Einweg Schutzbekleidung Handschuhe Fingerlinge Schuhe Socken Reinraumtücher Desinfektionsmittel Staubbindematten Reinigung Entsorgungssysteme Zubehör Spendersysteme Mobiliar Papier Zubehör Klebebänder Etiketten Spezifische Produkte Glossar Wissenswertes Overalls Kittel Jacken Hosen Hauben Textile Mundschutze Fußbekleidung Sonderlösungen Lagerware Oberbekleidung Gewebe der Oberbekleidung auf einen Blick Wissenswertes T Shirts Overalls Pullover Jacken und Hosen Lagerware Zwischenbekleidung Gewebe der Zwischenbekleidung auf einen Blick Wissenswertes Einweghauben Gesichtsschutz Schutzbrillen Einwegoveralls Einwegkittel Zubehör Einwegartikel für den Fußbereich Wissenswertes Einweghandschuhe und Fingerlinge Textile Handschuhe Wichtige Zertifizierungen und Tests für Reinraumhandschuhe Wissenswertes PU und TPE Clogs Berufsschuhe Sicherheitsschuhe Auswechselbare Einlegesohlen Reinraumsocken Wissenswertes Baumwolltücher Zellulosetücher Polyester Zellulosetücher Polyestertücher Reinraumtücher für besondere Anforderungen Sterile trockene Tücher Getränkte Tücher Alkoholgetränkte Wisch und Mopptücher Unsere Reinraumtücher auf einen Blick Wissenswertes Desinfektionsmittel auf Alkoholbasis Alkoholgetränkte Wisch und Mopptücher Desinfektionsmittel auf Basis nicht alkoholischer Wirkstoffe Biozide mit sporizider Wirkung Ergänzende Produkte Handhygiene Wissenswertes Permanent klebende Staubbindematten Abziehbare Folienstaubbindematten Bodenplatten Wissenswertes Moppsysteme Reinigungswagen und Zubehör Reinigungswerkzeuge Adhäsive Reinigungsprodukte Schwämme und Spezialtücher Reinigungstupfer Swabs Erfolgskontrolle und Training Reinigungsflüssigkeiten Wissenswertes Longopac Pactosafe Wissenswertes Spendersysteme Einwegartikel Spendersysteme Einwegüberziehschuhe Spendersysteme Desinfektionsmittel Wissenswertes Reinraumstühle und hocker Arbeitstische und Schreibtische Regalstecksysteme und Lagerlösungen Garderoben und Bänke Aufbewahrungs und Mehrzweckwagen Leitern und Tritte aus Edelstahl Wissenswertes Papier Ringbücher Blöcke Ordner Stifte Wissenswertes Klebebänder Abroller Etiketten Wissenswertes Zytostatika Schutzverpackungen Schutzunterlagen Aktuelles Downloads Produktanfrage Unternehmen Über uns ISO Zertifizierungen Forschung Entwicklung Verhaltenskodex Unternehmenspolitik Jobs und Karriere Kontakt Suche '

async def my_coroutine(urls):
    loader = AsyncHtmlLoader(urls)
    docs = loader.load()
    return docs

def clean_doc(document):
    text = document.page_content
    clean_text = re.sub(r'[^a-zA-Z0-9ßäöüAÄÖÜ ]', ' ', text)
    clean_text = re.sub(r'\s+', ' ', clean_text)
    clean_text = re.sub(r'\n+', ' ', clean_text)
    clean_text = clean_text.replace(header_str, ' ')
    document.page_content = clean_text
    split = document.metadata['source'].split('/')
    index = split.index('produktportfolio')
    groups = {}
    for idx, x in enumerate(split[(index+1):(len(split)-1)]):
        document.metadata['group_' + str(idx)] = re.sub(r'[^a-zA-Z0-9ßäöüAÄÖÜ ]', ' ', x)
    return document

In [53]:
loop = asyncio.get_event_loop()
docs = loop.run_until_complete(my_coroutine(urls))

html2text = Html2TextTransformer()
docs_transformed = html2text.transform_documents(docs)
clean_docs = [clean_doc(doc) for doc in docs_transformed]


Fetching pages:   0%|          | 0/371 [00:00<?, ?it/s]

Fetching pages: 100%|##########| 371/371 [03:57<00:00,  1.56it/s]


In [39]:
# Scrape only the product url's
loop = asyncio.get_event_loop()
p_docs = loop.run_until_complete(my_coroutine(product_urls))

html2text = Html2TextTransformer()
p_docs_transformed = html2text.transform_documents(p_docs)
p_clean_docs = [clean_doc(doc) for doc in p_docs_transformed]

Fetching pages: 100%|##########| 240/240 [00:38<00:00,  6.27it/s]


In [54]:
# This is just for storage purposes
docs_dict = [dict(doc) for doc in clean_docs]

# First write file to disk
with open(JSON_FILE_NAME, 'w') as f:
    json.dump(docs_dict, f)

# Upload to blob storage
upload_blob(SCRAPED_TEXT_BUCKET, JSON_FILE_NAME, 'scraped_website_text.json')

PreconditionFailed: 412 POST https://storage.googleapis.com/upload/storage/v1/b/dastex-scraped-text/o?uploadType=multipart&ifGenerationMatch=0: {
  "error": {
    "code": 412,
    "message": "At least one of the pre-conditions you specified did not hold.",
    "errors": [
      {
        "message": "At least one of the pre-conditions you specified did not hold.",
        "domain": "global",
        "reason": "conditionNotMet",
        "locationType": "header",
        "location": "If-Match"
      }
    ]
  }
}
: ('Request failed with status code', 412, 'Expected one of', <HTTPStatus.OK: 200>)

In [41]:
# This is just for storage purposes
p_docs_dict = [dict(doc) for doc in p_clean_docs]

# First write file to disk
with open('../data/scraped_website_text_products.json', 'w') as f:
    json.dump(p_docs_dict, f)

# Upload to blob storage
upload_blob(SCRAPED_TEXT_BUCKET, '../data/scraped_website_text_products.json', 'scraped_website_text_products.json')

File ../data/scraped_website_text_products.json uploaded to scraped_website_text_products.json.


## Structured output parsing

In [172]:
model = VertexAI(
    model_name="text-bison@001",
    max_output_tokens=1024,
    temperature=0.1,
    top_p=0.8,
    top_k=40,
)

# Define your desired data structure.

class Product(BaseModel):
    product_name: Optional[str] = Field(description="the names of the product found on the website")
    product_description: Optional[str] = Field(description="the descriptions of the products found on the website")
    product_sizes: Optional[List[str]] = Field(description="product sizes available")
    product_materials: Optional[List[str]] = Field(description="materials the products are made from")
    product_colours: Optional[List[str]] = Field(description="available product colours")

class Webpage(BaseModel):
    page_title: str = Field(description="website title")
    page_description: str = Field(description="website description")
    page_summary: str = Field(description="summary of the website content")
    page_products: List[Product] 

# Set up a parser + inject instructions into the prompt template.
parser = PydanticOutputParser(pydantic_object=Webpage)

prompt = PromptTemplate(
    template="Generate details of a product.\n{format_instructions}\nProduct description: {query}",
    input_variables=["query"],
    partial_variables={"format_instructions": parser.get_format_instructions()}
)

struct_json = []
errors = []

one = [x for x in clean_docs if x.metadata['source'] == 'https://www.dastex.de/produktportfolio/oberbekleidung/sonderloesungen/transporttaschen/']

for doc in one:
    # And a query intented to prompt a language model to populate the data structure.
    page_query = doc.page_content
    try:
        _input = prompt.format_prompt(query=page_query)

        output = model(_input.to_string())

        out = parser.parse(output)
        struct_json.append(
            {'source': doc.metadata['source'],
             'webpage': out}
        )
        print('Erfolgreich analysieren: {}'.format(doc.metadata['source']))
    except:
        print('Parse failed: {}'.format(doc.metadata['source']))
        errors.append({'source': doc.metadata['source']})


for parsed in struct_json:
    print('Titel der Seite: \n{}\n'.format(parsed['webpage'].page_title))
    print('Beschreibung der Seite: \n{}'.format(parsed['webpage'].page_description))
    print('\nErkannte Produkte:\n')
    for prod in parsed['webpage'].page_products:
        print('Name des Produkts: {}'.format(prod.product_name))
        print('Verfügbare Produktgrößen: {}'.format(prod.product_sizes))

Erfolgreich analysieren: https://www.dastex.de/produktportfolio/oberbekleidung/sonderloesungen/transporttaschen/


In [153]:
struct_json[123]['webpage'].page_products

[Product(product_name='Transporttasche mit Reißverschluss', product_description='Transporttasche mit Reißverschluss', product_sizes=['35 cm x 25 cm B x H'], product_materials=['Hypalon'], product_colours=['schwarz']),
 Product(product_name='Transporttasche mit 3 Druckknöpfen', product_description='Transporttasche mit 3 Druckknöpfen', product_sizes=['35,5 cm x 40 cm B x H'], product_materials=['Hypalon'], product_colours=['schwarz']),
 Product(product_name='Hüftbeutel mit integriertem Gürtel', product_description='Hüftbeutel mit integriertem Gürtel', product_sizes=['14 cm x 20 cm Höhe'], product_materials=['Hypalon'], product_colours=['schwarz']),
 Product(product_name='Gürteltasche', product_description='Gürteltasche', product_sizes=['22 cm x 15 cm B x H'], product_materials=['Hypalon'], product_colours=['schwarz']),
 Product(product_name='Tasche für Mobiltelefon', product_description='Tasche für Mobiltelefon', product_sizes=['50 x 140 x 25 mm B x H x T'], product_materials=['Hypalon']

In [154]:
struct_json[123]['source']

'https://www.dastex.de/produktportfolio/oberbekleidung/sonderloesungen/transporttaschen/'

In [121]:
products = []

for x in struct_json:
    d = getattr(x['webpage'], 'page_products')
    for i in d:
        products.append([getattr(i, 'product_name'), x['source'], getattr(i, 'product_sizes'), getattr(i, 'product_materials'), getattr(i, 'product_colours'), getattr(i, 'product_description')])

In [125]:
prod_df = pd.DataFrame(products)
prod_df.columns = ['product', 'link', 'sizes', 'materials', 'colours', 'description']

In [127]:
prod_df.to_csv('../data/scraped_json.csv')

In [51]:
excel_links = pd.read_json('../data/associations_excel.json', orient='index').reset_index()
excel_links.columns = ['product', 'link']

In [60]:
prod_df.merge(excel_links, on='link').to_csv('../data/merged_products.csv')

## Text Splitting

In [60]:
text_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, length_function=len, chunk_overlap=0, add_start_index = True)
docs_spl_lis = text_splitter.split_documents(clean_docs)

In [61]:
model = TextGenerationModel.from_pretrained('text-bison@001')

# Embedding model
embedding = VertexAIEmbeddings(model_name="textembedding-gecko@001")

# LLM Model
llm = VertexAI(
    model_name="text-bison@001",
    max_output_tokens=1024,
    temperature=0.0,
    top_p=0.8,
    top_k=40,
)

llm

VertexAI(cache=None, verbose=False, callbacks=None, callback_manager=None, tags=None, metadata=None, client=<vertexai.language_models._language_models._PreviewTextGenerationModel object at 0x142edf0d0>, model_name='text-bison@001', temperature=0.0, max_output_tokens=1024, top_p=0.8, top_k=40, stop=None, project=None, location='us-central1', credentials=None, request_parallelism=5, max_retries=6, tuned_model_name=None)

## Create or Load Embeddings

In [62]:
def load_embedding(ebd: Embeddings, index_path: Path) -> Tuple[VectorStore, BaseModel]:
    """Load embedded data from its persisting path.

    Args:
        ebd (Embeddings): embedding model
        index_path (Path): persisting path of the embedded data

    Returns:
        Tuple[VectorStore, BaseModel]: Embedded data and the underlying vector store (database).
    """
    vectorstore = Chroma(embedding_function=ebd, persist_directory=str(index_path))
    index = VectorStoreIndexWrapper(vectorstore=vectorstore)

    return index, vectorstore


def create_index_old(ebd, json_fname):

    with open(JSON_FILE_NAME, "rt") as f:
        texts_lis = json.load(f)

    docs_lis = [
        Document(page_content=text["text"], metadata={key: value for key, value in text.items() if key != 'text'}) for text in texts_lis
    ]

    file_path = os.getcwd()
    index_path = os.path.join(file_path, "embeddings_website")

    if os.path.isdir(index_path):
        print("Load embedding")
        index, vectorstore = load_embedding(ebd, index_path)
    else:
        print("Create embedding")
        text_splitter = CharacterTextSplitter(chunk_size=2000, chunk_overlap=0)
        docs_spl_lis = text_splitter.split_documents(docs_lis)
        vectorstore = Chroma.from_documents(
            docs_spl_lis, embedding, persist_directory=str(index_path)
        )
        index = VectorStoreIndexWrapper(vectorstore=vectorstore)
        vectorstore.persist()

    return index, vectorstore

def create_index(ebd, docs, embedding_name="embeddings_website"):

    with open(JSON_FILE_NAME, "rt") as f:
        texts_lis = json.load(f)

    file_path = os.getcwd()
    index_path = os.path.join(file_path, embedding_name)

    if os.path.isdir(index_path):
        print("Load embedding")
        index, vectorstore = load_embedding(ebd, index_path)
    else:
        print("Create embedding")
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, length_function=len, chunk_overlap=0, add_start_index = True)
        docs_spl_lis = text_splitter.split_documents(docs)
        vectorstore = Chroma.from_documents(
            docs_spl_lis, embedding, persist_directory=str(index_path)
        )
        index = VectorStoreIndexWrapper(vectorstore=vectorstore)
        vectorstore.persist()

    return index, vectorstore

In [64]:
# Create the vectorstore for the whole scraped dataset
index, vectorstore = create_index(embedding, clean_docs)
upload_from_directory(EMBEDDINGS_BUCKET, 'embeddings_website', 'embeddings_website')

Create embedding


In [65]:
len(vectorstore.get()['documents'])

899

In [58]:
len(vectorstore.get()['documents'])

899

In [43]:
# Create the vectorstore just for the scraped product dataset
p_index, p_vectorstore = create_index(embedding, p_clean_docs, "embeddings_website_products")
upload_from_directory(EMBEDDINGS_BUCKET, 'embeddings_website_products', 'embeddings_website_products')

Create embedding
