In [1]:
from bs4 import BeautifulSoup
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from collections import OrderedDict
from concurrent.futures import ProcessPoolExecutor
from furniture_terms import furniture_terms_non_plural as furniture_terms 
import Levenshtein
import requests as rs
import numpy as np
import pandas as pd
import re
import random

In [2]:
# Load the CSV file containing URLs
csv_file_path = 'URL_list.csv'
data = pd.read_csv(csv_file_path)

# Get the first 100 URLs for training
urls_for_training = data['max(page)'][:200].tolist()

# Get the remaining URLs for testing
urls_for_testing = data['max(page)'][200:].tolist()

In [3]:
import aiohttp
import asyncio
import nest_asyncio

nest_asyncio.apply()

async def fetch_url(session, url):
    try:
        async with session.get(url) as response:
            if response.status == 200:
                return url
    except Exception:
        return None

async def fetch_all_urls(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    return [url for url in results if url is not None]

new_urls_for_training = await fetch_all_urls(urls_for_training)
new_urls_for_training

['https://www.factorybuys.com.au/products/euro-top-mattress-king',
 'https://dunlin.com.au/products/beadlight-cirrus',
 'https://themodern.net.au/products/hamar-plant-stand-ash',
 'https://furniturefetish.com.au/products/oslo-office-chair-white',
 'https://hemisphereliving.com.au/products/',
 'https://interiorsonline.com.au/products/interiors-online-gift-card',
 'https://livingedge.com.au/products/tables/dining',
 'https://vastinterior.com.au/products/samson-daybed-single-2',
 'https://www.hudsonfurniture.com.au/products/string-weave-timber-stool',
 'https://dhfonline.com/products/gift-card',
 'https://www.tandemarbor.com/products/kaiser-box-bed-blush-plush-velvet',
 'https://www.perchfurniture.com/products/hoyt-chair',
 'https://4-chairs.com/products/mason-chair',
 'https://www.theinside.com/products/x-bench-onyx-austin-stripe-by-old-world-weavers/PTM_XBench_OnyxAustinStripeByOldWorldWeavers',
 'https://pinchdesign.com/products/yves-desk',
 'https://www.do-shop.com/products/gift-card'

In [4]:
new_urls_for_testing = await fetch_all_urls(urls_for_testing)
new_urls_for_testing

['https://www.castlery.com/products/spot-shelf',
 'https://homestreethome.ie/products/zinc-tray',
 'https://www.myconcept.com.hk/products/moo',
 'https://vauntdesign.com/products/forna-plant-stand-small',
 'https://asianteakfurniture.com/products/bali-teak-bench-atf388',
 'https://furnitica-vinova.myshopify.com/products/enim-donec-pede',
 'https://mikazahome.ca/products/page/4/',
 'https://barnabylane.com.au/products/spensley-tan',
 'https://www.modernfurniture.com.au/collections/on-sale/products/vision-suspended-sunbed',
 'https://teak-furniture-singapore.com/products/sweden-side-table',
 'https://teak-warehouse-sale.com/products/tws889lt-000-ta-lp',
 'https://www.modishstore.com/products/twos-company-sunburst-antiqued-gold-wall-mirror',
 'https://loft-theme-demo-nashville.myshopify.com/products/black-chair',
 'https://teakco.com/products/amsterdam-2-drawer-bedside-table-tek168bs-002-ta',
 'https://big-sale-furniture.com/products/amsterdam-bench-150-x-35-be-150-35-ta',
 'https://taktc

In [4]:
link = urls_for_training[13]
print(link)

response = rs.get(link) #'https://livingedge.com.au/tables/dining-tables/'
print(response.status_code)
soup = BeautifulSoup(response.content, 'html.parser')

# # Remove scripts, styles, and other specific tags
for script in soup(["script", "style", "iframe", "sup"]):
    script.decompose()
    
# Extract text from the parsed HTML
text = soup.get_text(separator=' ~ ', strip=True)

# Clean text: remove special characters, extra spaces, etc.
cleaned_text = re.sub(r'\s+', ' ', text)  # Replace multiple spaces/newlines with a single space
cleaned_text = cleaned_text.replace("&", "and")
cleaned_text = re.sub(r'[^A-Za-z0-9\s~]', '', cleaned_text)  # Remove non-alphanumeric characters
cleaned_text = re.sub(r'^\s*|\s\s*', ' ', cleaned_text) # Remove extra whitespaces
cleaned_text = cleaned_text.strip() # Remove whitespaces from left and right sides of a string
data = cleaned_text.split("  ")


https://www.knoll.com/design-plan/products/by-designer/knoll
200


In [3]:
# Example link
# link = new_urls_for_training[19]
# print(link)

def get_data(link, verbose=True):
    # Get the page content
    response = rs.get(link)  
    if verbose:
        print(response.status_code)
    
    # Parse the HTML with BeautifulSoup
    soup = BeautifulSoup(response.content, 'html.parser')
    
    # Remove unnecessary tags: scripts, styles, iframes, and <sup> tags
    for script in soup(["script", "style", "iframe", "sup"]):
        script.decompose()
    
    # Extract text specifically from block elements (use more specific tags to avoid duplicates)
    block_elements = soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'alt', 'a'])
    
    # Create an empty set to collect unique text (avoiding duplicates)
    seen_text = set()
    
    # Collect text, ensuring that duplicates are avoided
    extracted_text = []
    for element in block_elements:
        text = element.get_text(separator=' ', strip=True)
        if text not in seen_text:
            seen_text.add(text)
            extracted_text.append(text)
    
    # Join the extracted text into one long string separated by ' ~ '
    combined_text = ' ~ '.join(extracted_text)
    
    # Clean text: replace special characters, extra spaces, and clean up
    cleaned_text = re.sub(r'\s+', ' ', combined_text)  # Replace multiple spaces/newlines with a single space
    cleaned_text = cleaned_text.replace("&", "and")
    cleaned_text = re.sub(r'[^A-Za-z0-9\s~]', '', cleaned_text)  # Remove non-alphanumeric characters except ~
    cleaned_text = re.sub(r'^\s*|\s\s*', ' ', cleaned_text)  # Remove extra whitespaces
    cleaned_text = cleaned_text.strip()  # Remove leading/trailing whitespaces
    
    # Split the cleaned text by the separator '~'
    data = cleaned_text.split(" ~ ")
    
    # Remove any empty strings or extra spaces from the resulting list
    return [item.strip() for item in data if item.strip()]

# data = get_data(link)

### Test funcs to format data

In [10]:
# Compile regex patterns for each keyword
patterns = [re.compile(r'\b' + re.escape(keyword) + r'\b', re.IGNORECASE) for keyword in furniture_terms]

# Function to filter out irrelevant content using regex
def filter_text(text):
    return any(pattern.search(text) for pattern in patterns)
    
# Function to remove duplicates and irrelevant content
def clean_data(data):
    # Remove duplicates by converting the list to a set
    unique_data = set(data)
    # Remove empty strings and irrelevant content
    filtered_data = [text for text in unique_data if filter_text(text)]
    return filtered_data

# Apply the cleaning function
cleaned_data = clean_data(data)

# Display the cleaned data
print("Cleaned Data:")
for item in cleaned_data:
    print(item)

Cleaned Data:


In [55]:
from collections import OrderedDict

# List of irrelevant terms to exclude (case-insensitive)
irrelevant_terms = [
    'sale', 'discount', 'free shipping', 'delivery', 'buy now', 'clearance', 
    'special offer', 'limited time', 'bestseller', 'featured', 'new arrivals'
]

# Compile regex patterns for each keyword
patterns = [re.compile(r'\b' + re.escape(keyword) + r'\b', re.IGNORECASE) for keyword in furniture_terms]
irrelevant_patterns = [re.compile(r'\b' + re.escape(term) + r'\b', re.IGNORECASE) for term in irrelevant_terms]

# Function to filter out irrelevant content and check for complete product names
def is_relevant(text):
    if any(pattern.search(text) for pattern in irrelevant_patterns):
        return False
    if not any(pattern.search(text) for pattern in patterns):
        return False
    # Ensure the text is not too short and contains product-related keywords
    return len(text.split()) >= 2

# Function to remove duplicates and irrelevant content
def clean_data(data):
    # Remove duplicates while preserving order
    unique_data = list(OrderedDict.fromkeys(data))
    # Remove empty strings and irrelevant content
    filtered_data = [text for text in unique_data if is_relevant(text)]
    return filtered_data

# Apply the cleaning function
cleaned_data = clean_data(data)

# Display the cleaned data
print("Cleaned Data:")
for item in cleaned_data:
    print(item)

Cleaned Data:
BED FRAMES FROM 96
Bed Frames
LED Bed Frames
Queen Bed Frames
Double Bed Frames
King Bed Frames
King Single Bed Frames
Single Bed Frames
Chair Mats
Sofa Covers
Kids Table  Chair Sets
Kids Bed Safety Rails
Bed Headboards
Table Top Christmas Trees
Black Friday Bed Frames
Bed Frame Deals
Sofa Beds
Bed Base
Boucl Bed Frames
Boucl Dining Chair
Black Bed Frame
White Bed Frame
Bed Frames by Style
4 Poster Bed Frame
Fabric Bed Frames
Upholstered Bed Frames
Metal Bed Frames
Timber Wooden Bed Frame
Leather Bed Frames
Velvet Bed Frame
House Bed Frames
Coastal Bed Frames
Low Bed Frames
Scandi Bed Frames
Wishbone Dining Chair
Dining Chair
Bed Frames By Size
Bed Frames with Storage
Gas Lift Bed Frames
Bed Frames with Drawers
Bed Frames By Style
Wooden Bed Frames
4 Poster Bed Frames
Linen Bed Frames
Velvet Bed Frames
Bed Bases
Queen Bed Bases
Double Bed Bases
King Bed Bases
King Single Bed Bases
Single Bed Bases
Ensemble Bed Base
Wooden Bed Base
Bed Base with Storage
Kids Bed Frames
Ens

### Final func to format data

In [5]:
# List of irrelevant terms (converted to lowercase once)
irrelevant_terms = [
    'sale', 'discount', 'free shipping', 'delivery', 'buy now', 'clearance', 
    'special offer', 'limited time', 'bestseller', 'featured', 'new arrivals'
]

# Compile regex pattern for irrelevant terms
irrelevant_pattern = re.compile(r'\b(' + '|'.join(map(re.escape, irrelevant_terms)) + r')\b', re.IGNORECASE)

# Create singular terms by excluding any plural word from the list
# singular_furniture_terms = [term for term in furniture_terms if not re.match(r'.*[^s]s$', term.lower())]

# Function to detect valid plural forms (avoid words like 'mattress')
def is_plural_word(word):
    # Check if word ends with 's' and singular form exists in the furniture terms list
    return word.lower().endswith('s') and word[:-1].lower() in furniture_terms

# Function to check if the entire text contains any plural form
def contains_plural(text):
    words = text.split()
    return any(is_plural_word(word) for word in words)

# Function to filter out irrelevant content and check for complete product names
def is_relevant(text):
    # Exclude irrelevant content
    # if irrelevant_pattern.search(text):
    #     return False
    # Exclude text that contains any plural forms
    if contains_plural(text):
        return False
    # Include only if singular furniture terms match and there are at least 2 words
    return re.search(r'\b(' + '|'.join(map(re.escape, furniture_terms)) + r')\b', text, re.IGNORECASE) and len(text.split()) >= 2

# Function to remove duplicates and irrelevant content
def clean_data(data):
    # Remove duplicates while preserving order
    unique_data = list(OrderedDict.fromkeys(data))
    # Remove empty strings and irrelevant content
    filtered_data = [text for text in unique_data if is_relevant(text)]
    return filtered_data

# Parallel version 
def clean_data_parallel(data):
    # Remove duplicates first
    unique_data = list(OrderedDict.fromkeys(data))
    
    # Apply parallel processing for relevance check
    with ProcessPoolExecutor() as executor:
        results = list(executor.map(is_relevant, unique_data))
    
    # Return filtered data
    return [text for text, keep in zip(unique_data, results) if keep]

# # Apply the cleaning function
# cleaned_data = clean_data_parallel(data)  # For large datasets; otherwise, use clean_data(data)

# # Display the cleaned data
# print("Cleaned Data:")
# for item in cleaned_data:
#     print(item)

In [181]:
link = new_urls_for_training[0]
print(link)
data = get_data(link)
cleaned_data = clean_data_parallel(data)
# Display the cleaned data
print("Cleaned Data:")
for item in cleaned_data:
        print(f'"{item}",')
    # print(f'{item},')

https://www.factorybuys.com.au/products/euro-top-mattress-king
200
Cleaned Data:
"BED FRAMES FROM 96",
"Bed Frames",
"LED Bed Frames",
"Queen Bed Frames",
"Double Mattress",
"King Mattress",
"King Single Mattress",
"Single Mattress",
"Super King Mattress",
"Double Bed Frames",
"Queen Mattress",
"King Bed Frames",
"King Single Bed Frames",
"Single Bed Frames",
"Hammock Range",
"Mattress Protector",
"Mattress Toppers",
"Chair Mats",
"Curtain Range",
"Sofa Covers",
"Kids Table and Chair Sets",
"Kids Bed Safety Rails",
"Christmas Light Displays",
"Weights Bench",
"Round Mirror",
"Mattress Protectors",
"Pillow Cases",
"Garage Shelving",
"Black Friday Bed Frames",
"Black Friday Mattress Sale",
"Black Friday Bed Frame Sale",
"Mattress Deals",
"Bed Frame Deals",
"Sofa Sale",
"Boucl Bed Frames",
"Dining Chair",
"Boucl Dining Chair",
"Medium Firm Mattress",
"Plush Mattress",
"Soft Mattress",
"Extra Firm Mattress",
"Firm Mattress",
"Memory Foam Mattress",
"Euro Top Mattress",
"Shop by Mattress Sl

In [176]:
import json

with open('labels.json', 'w') as file:
    json.dump(labels, file)

In [189]:
prepared_text_data = list()
for url in new_urls_for_training:
    data = get_data(url, verbose=False)
    cleaned_data = clean_data_parallel(data)
    prepared_text_data  += cleaned_data

Some characters could not be decoded, and were replaced with REPLACEMENT CHARACTER.


In [191]:
with open('prepared_text_data.txt', 'w') as file:
    for string in prepared_text_data:
        file.write(string + '\n')

In [194]:
import matplotlib.pyplot as plt

### CreateDataset class

In [3]:
class CreateDataset:
    def __init__(self, data, labels = None):
        self.data = data
        self.labels = labels
        self.train_tokens = None
        self.test_tokens = None
        self.train_labels = None
        self.test_labels = None
        self.train_dataset = None
        self.test_dataset = None

    def __len__(self):
        return len(self.data)
    
    def train_len(self):
        return len(self.train_dataset)
    
    def test_len(self):
        return len(self.test_dataset)

    def __getitem__(self, index):
        return self.data[index]
    
    def get_nr_of_products(self):
        return  sum(token.count('B-PROD') for token in self.labels)
    
    def get_nr_of_non_products(self):
        return sum('B-PROD' not in token for token in self.labels)
    
    def sentences_length(self):
        return [len(sentence.split()) for sentence in self.data]
    
    def sentence_distribution_histogram(self):
        sentence_lengths = self.sentences_length()
        plt.hist(sentence_lengths, bins=range(min(sentence_lengths), max(sentence_lengths) + 1, 1), edgecolor='black')
        plt.xlabel('Sentence Length')
        plt.ylabel('Frequency')
        plt.title('Histogram of Sentence Lengths')

        mean_length = np.mean(sentence_lengths)
        std_length = np.std(sentence_lengths)
        annotation_text = f"Mean: {mean_length:.2f}\nStd: {std_length:.2f}"
        plt.annotate(annotation_text, xy=(0.95, 0.95), xycoords='axes fraction', ha='right', va='top')

        plt.show()

    def classes_balance_plot(self):
        categories = ['Products', 'Non-Products']
        counts = [self.get_nr_of_products(), self.get_nr_of_non_products()]

        plt.bar(categories, counts, color=['blue', 'green'])
        plt.xlabel('Categories')
        plt.ylabel('Number of Samples')
        plt.title('Number of Products vs. Number of Non-Products')
        plt.show()

    
    def label_data_using_lavenshtein(self, target_group, similarity_threshold=0.8):
        self.labels = [['O' for j in range(len(self.data[i].split()))] for i in range(len(self.data))]
        for nsample, sample_data in enumerate(self.data):
            sample_data = sample_data.lower().split()
            for prod in target_group:
                prod = prod.lower().split()
                for element_idx in range(len(sample_data) - len(prod) + 1):
                    subset = sample_data[element_idx:element_idx+len(prod)]
                    subset_str = ' '.join(subset)
                    similarity = Levenshtein.ratio(subset_str, ' '.join(prod))
                    if similarity >= similarity_threshold:
                        self.labels[nsample][element_idx] = 'B-PROD'
                        for index in range(1, len(prod)):
                            if sample_data[element_idx + index] in prod:
                                self.labels[nsample][element_idx + index] = 'I-PROD'
                    element_idx = element_idx + len(prod)
        return self.data, self.labels
    
    def nr_of_labeled_entities(self, entitie = 'B-PROD'):
        return sum(row.count(entitie) for row in self.labels)
    
    def get_indices_by_label(self, target_label = 'B-PROD'):
        return np.asarray([idx for idx, label in enumerate(self.labels) if target_label in label]).astype(int)
    
    def shuffle_data(self, l1, l2):
        combined_lists = list(zip(l1, l2))
        random.shuffle(combined_lists)
        
        shuffled_l1, shuffled_l2 = zip(*combined_lists)
        return list(shuffled_l1), list(shuffled_l2)
    
    def data_to_dataset(self, text, tags):
        dataset_final = []
        for elem in range(len(text)):
            dataset_final.extend(list(zip(text[elem], tags[elem])))
            dataset_final.append(('\n',''))
        return dataset_final[:-1]

    def split_data_balanced(self, split_percent = 0.7):
        data = [item for item in self.data]
        product_data_idx = self.get_indices_by_label()
        non_product_data_idx = np.asarray([idx for idx in range(len(self.data)) if idx not in product_data_idx]).astype(int)

        product_data = [data[product_data_idx[i]] for i in range(len(product_data_idx))]
        non_product_data = [data[non_product_data_idx[i]] for i in range(len(non_product_data_idx))]
        product_labels = [self.labels[product_data_idx[i]] for i in range(len(product_data_idx))]
        non_product_labels = [self.labels[non_product_data_idx[i]] for i in range(len(non_product_data_idx))]

        train_product_nr = round(split_percent * len(product_data))
        train_non_product_nr = round(split_percent * len(non_product_data))

        product_data, product_labels = self.shuffle_data(product_data, product_labels)
        non_product_data, non_product_labels = self.shuffle_data(non_product_data, non_product_labels)

        train_data = product_data[:train_product_nr] + non_product_data[:train_non_product_nr]
        test_data = product_data[train_product_nr:] + non_product_data[train_non_product_nr:]
        train_labels = product_labels[:train_product_nr] + non_product_labels[:train_non_product_nr]
        test_labels = product_labels[train_product_nr:] + non_product_labels[train_non_product_nr:]

        self.train_tokens, self.train_labels = self.shuffle_data(train_data, train_labels)
        self.test_tokens, self.test_labels = self.shuffle_data(test_data, test_labels)
        

    def split_data_using_sklearn(self, test_size=0.3, random_state=None):
        self.train_tokens, self.test_tokens, self.train_labels, self.test_labels = train_test_split(self.data, self.labels, test_size=test_size, random_state=random_state)

    def create_datasets_for_trainig(self):
        train_tokens = [item.split() for item in self.train_tokens]
        test_tokens = [item.split() for item in self.test_tokens]
        self.train_dataset = self.data_to_dataset(train_tokens, self.train_labels)
        self.test_dataset = self.data_to_dataset(test_tokens, self.test_labels)

### Making Dataset

In [193]:
dataset = CreateDataset(prepared_text_data)
print(dataset.label_data_using_lavenshtein(labels))

(['BED FRAMES FROM 96', 'Bed Frames', 'LED Bed Frames', 'Queen Bed Frames', 'Double Mattress', 'King Mattress', 'King Single Mattress', 'Single Mattress', 'Super King Mattress', 'Double Bed Frames', 'Queen Mattress', 'King Bed Frames', 'King Single Bed Frames', 'Single Bed Frames', 'Hammock Range', 'Mattress Protector', 'Mattress Toppers', 'Chair Mats', 'Curtain Range', 'Sofa Covers', 'Kids Table and Chair Sets', 'Kids Bed Safety Rails', 'Christmas Light Displays', 'Weights Bench', 'Round Mirror', 'Mattress Protectors', 'Pillow Cases', 'Garage Shelving', 'Black Friday Bed Frames', 'Black Friday Mattress Sale', 'Black Friday Bed Frame Sale', 'Mattress Deals', 'Bed Frame Deals', 'Sofa Sale', 'Boucl Bed Frames', 'Dining Chair', 'Boucl Dining Chair', 'Medium Firm Mattress', 'Plush Mattress', 'Soft Mattress', 'Extra Firm Mattress', 'Firm Mattress', 'Memory Foam Mattress', 'Euro Top Mattress', 'Shop by Mattress Sleep Style', 'Mattress for Side Sleepers', 'Mattress for Back Sleepers', 'Mattre

In [202]:
dataset.split_data_balanced()
dataset.create_datasets_for_trainig()

In [204]:
def write_to_text(data, filename):
    with open(filename, "w", encoding="utf-8") as text_file:
        for row in data:
            text_file.write("\t".join(map(str, row)) + "\n")
            
write_to_text(dataset.train_dataset, "train_dataset.txt")
write_to_text(dataset.test_dataset, "test_dataset.txt")

### NER using SPARKNLP

#### Data prep for training

In [2]:
def load_data(file_path):
    data = []
    with open(file_path, 'r') as f:
        lines = f.readlines()

        text = []
        labels = []
        for line in lines:
            if line.strip() == '':
                if text and labels:
                    data.append((' '.join(text), ' '.join(labels)))
                    text = []
                    labels = []
            else:
                token, label = line.strip().split('\t')
                text.append(token)
                labels.append(label)

    return pd.DataFrame(data, columns=['text', 'labels'])

# Load training and testing data
train_df = load_data('train_data/train_dataset.txt')
test_df = load_data('test_data/test_dataset.txt')

In [30]:
with open('test_data/test_dataset.txt', 'r') as f:
    data = []
    lines = f.readlines()
    flag = 1
    sent_id = 1
    for line in lines:
        if line.strip() == '':
            if flag==1:
                flag = 0
                sent_id += 1
            else:
                flag = 1
        else:
            token, label = line.strip().split('\t')
            data.append([sent_id, token, label])
            
test_df = pd.DataFrame(data, columns=['sent_id', 'tokens', 'labels'])

In [31]:
test_df['pos']='NN'

In [32]:
conll_lines="-DOCSTART- -X- -X- -O-\n\n"
save=0

for sent, token, pos, label in zip(test_df['sent_id'],test_df['tokens'],test_df['pos'],test_df['labels']): 
    
# If the sentence ID has changed, that means we are starting a new sentence. We have to add an empty line.
    
    if save!=sent:
        conll_lines+='\n'
    
# Save the conll line
    
    conll_lines += "{} {} {} {}\n".format(token, pos, pos, label)
    
    save=sent
    

# Now print all of the lines to a text file

with open('test_data/test_dataset_final','w') as txtfile:
        
    for line in conll_lines:
        txtfile.write(line)

txtfile.close()

In [33]:
with open('test_data/test_dataset_final','r') as f:
    lines=f.readlines()[0:25]
f.close()
lines

['-DOCSTART- -X- -X- -O-\n',
 '\n',
 '\n',
 'A NN NN O\n',
 'slanted NN NN O\n',
 'headboard NN NN O\n',
 'makes NN NN O\n',
 'early NN NN O\n',
 'morning NN NN O\n',
 'reading NN NN O\n',
 'or NN NN O\n',
 'late NN NN O\n',
 'night NN NN O\n',
 'movie NN NN O\n',
 'watching NN NN O\n',
 'all NN NN O\n',
 'the NN NN O\n',
 'more NN NN O\n',
 'comfortable NN NN O\n',
 'in NN NN O\n',
 'our NN NN O\n',
 'Windsor NN NN B-PROD\n',
 'bed NN NN I-PROD\n',
 '\n',
 'Blues NN NN B-PROD\n']

#### Training

In [3]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *
import sparknlp

spark = sparknlp.start()

spark

In [5]:
from sparknlp.training import CoNLL

training_data = CoNLL().readDataset(spark, './train_data/train_dataset_final')

# Observe the first 3 rows of the Dataframe
training_data.show(3)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|            document|            sentence|               token|                 pos|               label|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Polygon Pedestal ...|[{document, 0, 33...|[{document, 0, 33...|[{token, 0, 6, Po...|[{pos, 0, 6, NN, ...|[{named_entity, 0...|
|Dallas Modern Cof...|[{document, 0, 25...|[{document, 0, 25...|[{token, 0, 5, Da...|[{pos, 0, 5, NN, ...|[{named_entity, 0...|
|Chloe Bed Frame M...|[{document, 0, 58...|[{document, 0, 58...|[{token, 0, 4, Ch...|[{pos, 0, 4, NN, ...|[{named_entity, 0...|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 3 rows



In [6]:
import pyspark.sql.functions as F

training_data.select(F.explode(F.arrays_zip(training_data.token.result,
                                            training_data.label.result)).alias("cols")) \
             .select(F.expr("cols['0']").alias("token"),
                     F.expr("cols['1']").alias("ground_truth")).groupBy('ground_truth').count().orderBy('count', ascending=False).show(100,truncate=False)

+------------+-----+
|ground_truth|count|
+------------+-----+
|O           |3021 |
|I-PROD      |928  |
|B-PROD      |330  |
+------------+-----+



In [4]:
bert = BertEmbeddings.pretrained('bert_base_cased', 'en') \
    .setInputCols(["sentence", "token"]) \
    .setOutputCol("bert")

bert_base_cased download started this may take some time.
Approximate size to download 384.9 MB
[OK!]


In [5]:
test_data = CoNLL().readDataset(spark, 'test_data/test_dataset_final')

test_data = bert.transform(test_data)

test_data.show(3)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|            document|            sentence|               token|                 pos|               label|                bert|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|A slanted headboa...|[{document, 0, 11...|[{document, 0, 11...|[{token, 0, 0, A,...|[{pos, 0, 0, NN, ...|[{named_entity, 0...|[{word_embeddings...|
| Blues Counter Stool|[{document, 0, 18...|[{document, 0, 18...|[{token, 0, 4, Bl...|[{pos, 0, 4, NN, ...|[{named_entity, 0...|[{word_embeddings...|
|Donatello Side Table|[{document, 0, 19...|[{document, 0, 19...|[{token, 0, 8, Do...|[{pos, 0, 8, NN, ...|[{named_entity, 0...|[{word_embeddings...|
+--------------------+--------------------+--------------------+--------------------+--------------------+

In [11]:
test_data.write.parquet("test_withEmbeds.parquet")

In [12]:
test_data.select("bert.result","bert.embeddings",'label.result').show()

+--------------------+--------------------+--------------------+
|              result|          embeddings|              result|
+--------------------+--------------------+--------------------+
|[A, slanted, head...|[[-0.028735025, -...|[O, O, O, O, O, O...|
|[Blues, Counter, ...|[[0.113111205, -0...|[B-PROD, I-PROD, ...|
|[Donatello, Side,...|[[-0.2197286, -0....|[B-PROD, I-PROD, ...|
|[Florence, Knoll,...|[[-0.5486287, -0....|[B-PROD, I-PROD, ...|
|[Ultimate, comfor...|[[-0.05448273, -0...|[O, O, O, O, O, O...|
|[Storage, and, sh...|[[-0.15164343, 0....|           [O, O, O]|
|        [Sofa, Sale]|[[0.3907959, -0.3...|              [O, O]|
|[CORTONA, SOFA, C...|[[0.13743053, -0....|           [O, O, O]|
|[Coastal, Bed, Fr...|[[-0.093407124, -...|           [O, O, O]|
|     [Coffee, Table]|[[-0.010024222, -...|              [O, O]|
|[NEW, ARRIVAL, SO...|[[-0.1236439, -0....|[O, O, B-PROD, I-...|
|[by, Pawleys, Isl...|[[0.31112942, -0....|           [O, O, O]|
|[Phantom, Lamp, E...|[[-

In [15]:
# NER Model with additional hyperparameters
nerTagger = NerDLApproach()\
    .setInputCols(["sentence", "token", "bert"])\
    .setLabelColumn("label")\
    .setOutputCol("ner")\
    .setMaxEpochs(50)\
    .setLr(0.0005)\
    .setPo(0.01)\
    .setBatchSize(128)\
    .setRandomSeed(42)\
    .setVerbose(1)\
    .setEvaluationLogExtended(True)\
    .setEnableOutputLogs(True)\
    .setIncludeConfidence(True)\
    .setTestDataset("test_withEmbeds.parquet")  # Test dataset

# Create the pipeline
pipeline = Pipeline(stages=[
    bert,
    nerTagger
])

In [16]:
%%time

ner_model = pipeline.fit(training_data)

CPU times: total: 31.2 ms
Wall time: 2min 31s


In [17]:
ner_model.stages[1].write().overwrite().save('./NER_bert')

In [20]:
test_data = CoNLL().readDataset(spark, 'test_data/test_dataset_final')

test_data = bert.transform(test_data)

test_data.show(3)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|            document|            sentence|               token|                 pos|               label|                bert|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|A slanted headboa...|[{document, 0, 11...|[{document, 0, 11...|[{token, 0, 0, A,...|[{pos, 0, 0, NN, ...|[{named_entity, 0...|[{word_embeddings...|
| Blues Counter Stool|[{document, 0, 18...|[{document, 0, 18...|[{token, 0, 4, Bl...|[{pos, 0, 4, NN, ...|[{named_entity, 0...|[{word_embeddings...|
|Donatello Side Table|[{document, 0, 19...|[{document, 0, 19...|[{token, 0, 8, Do...|[{pos, 0, 8, NN, ...|[{named_entity, 0...|[{word_embeddings...|
+--------------------+--------------------+--------------------+--------------------+--------------------+

In [29]:
predictions.select('token.result','label.result','ner.result').show(truncate=40)

+----------------------------------------+----------------------------------------+----------------------------------------+
|                                  result|                                  result|                                  result|
+----------------------------------------+----------------------------------------+----------------------------------------+
|[A, slanted, headboard, makes, early,...|[O, O, O, O, O, O, O, O, O, O, O, O, ...|[O, O, O, O, O, O, O, O, O, O, O, O, ...|
|                 [Blues, Counter, Stool]|                [B-PROD, I-PROD, I-PROD]|                [B-PROD, I-PROD, I-PROD]|
|                [Donatello, Side, Table]|                [B-PROD, I-PROD, I-PROD]|                [B-PROD, I-PROD, I-PROD]|
|[Florence, Knoll, Hairpin, Stacking, ...|[B-PROD, I-PROD, I-PROD, I-PROD, I-PROD]|[B-PROD, I-PROD, I-PROD, I-PROD, I-PROD]|
|[Ultimate, comfort, in, a, compact, s...|[O, O, O, O, O, O, O, O, O, O, O, O, ...|[O, O, O, O, O, O, O, O, O, O, O, O, ...|


### Prediction Pipeline

In [7]:
from pyspark.ml import Pipeline

document = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

sentence = SentenceDetector()\
    .setInputCols(['document'])\
    .setOutputCol('sentence')

token = Tokenizer()\
    .setInputCols(['sentence'])\
    .setOutputCol('token')

bert = BertEmbeddings.pretrained('bert_base_cased', 'en') \
    .setInputCols(["sentence", "token"]) \
    .setOutputCol("bert")

loaded_ner_model = NerDLModel.load("NER_bert")\
 .setInputCols(["sentence", "token", "bert"])\
 .setOutputCol("ner")

converter = NerConverter()\
  .setInputCols(["document", "token", "ner"])\
  .setOutputCol("ner_span")

ner_prediction_pipeline = Pipeline(
    stages = [
        document,
        sentence,
        token,
        bert,
        loaded_ner_model,
        converter])

bert_base_cased download started this may take some time.
Approximate size to download 384.9 MB
[OK!]


In [8]:
empty_data = spark.createDataFrame([['']]).toDF("text")
prediction_model = ner_prediction_pipeline.fit(empty_data)

In [9]:
prediction_model.write().overwrite().save("pipeline_model")

In [11]:
from pyspark.ml import PipelineModel

# Load the saved pipeline model
prediction_model = PipelineModel.load("pipeline_model")

In [12]:
# Sample text data as a list of strings
text = ["Amsterdam Bench 150 x 35 cm BE-150-35-TA ( Chocolate Colour )", 
        "White Polar Bear Lounge Swivel Chair", "Coffee tables and glass mirrors on stock!!!"]

# Convert the list of strings to a list of tuples
text_tuples = [(t,) for t in text]

# Create a Spark DataFrame from the list of tuples
sample_data = spark.createDataFrame(text_tuples).toDF("text")

# Use the prediction model to make predictions
preds = prediction_model.transform(sample_data)

In [13]:
# Collect all results from the 'ner_span.result' column
all_results = preds.select('ner_span.result').collect()

# Display the results
for row in all_results:
    print(row['result'])

['Amsterdam Bench 150 x 35 cm']
['White', 'Polar Bear Lounge Swivel Chair']
[]


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 57424)
Traceback (most recent call last):
  File "c:\Users\danik\anaconda3\Lib\socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "c:\Users\danik\anaconda3\Lib\socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "c:\Users\danik\anaconda3\Lib\socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "c:\Users\danik\anaconda3\Lib\socketserver.py", line 755, in __init__
    self.handle()
  File "c:\Users\danik\anaconda3\Lib\site-packages\pyspark\accumulators.py", line 295, in handle
    poll(accum_updates)
  File "c:\Users\danik\anaconda3\Lib\site-packages\pyspark\accumulators.py", line 267, in poll
    if self.rfile in r and func():
                           ^^^^^^
  File "c:\Users\danik\anaconda3\Lib\site-packages\p

In [6]:
new_urls_for_testing

['https://www.factorybuys.com.au/products/euro-top-mattress-king',
 'https://dunlin.com.au/products/beadlight-cirrus',
 'https://themodern.net.au/products/hamar-plant-stand-ash',
 'https://furniturefetish.com.au/products/oslo-office-chair-white',
 'https://hemisphereliving.com.au/products/',
 'https://interiorsonline.com.au/products/interiors-online-gift-card',
 'https://livingedge.com.au/products/tables/dining',
 'https://vastinterior.com.au/products/samson-daybed-single-2',
 'https://www.hudsonfurniture.com.au/products/string-weave-timber-stool',
 'https://dhfonline.com/products/gift-card',
 'https://www.tandemarbor.com/products/kaiser-box-bed-blush-plush-velvet',
 'https://www.perchfurniture.com/products/hoyt-chair',
 'https://4-chairs.com/products/mason-chair',
 'https://www.theinside.com/products/x-bench-onyx-austin-stripe-by-old-world-weavers/PTM_XBench_OnyxAustinStripeByOldWorldWeavers',
 'https://pinchdesign.com/products/yves-desk',
 'https://www.do-shop.com/products/gift-card'