<a href="https://colab.research.google.com/github/Echo9k/WebScrapping/blob/main/WebScrapping_Crawler.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Set up

In [None]:
#@title Install libraries
#@markdown use only in colab [Dismissed temporarly]
!rm sample_data -r
# !pip install w3lib
# !pip install selenium
# !apt-get update # to update ubuntu to correctly run apt install
# !apt install chromium-chromedriver
# !cp /usr/lib/chromium-browser/chromedriver /usr/bin

In [None]:
#@title Set up
#@markdown Loading dependencies...
import os
import re
import scipy
import numpy as np
import pandas as pd
from tqdm import tqdm
from datetime import datetime

# HTML
import requests
from bs4 import BeautifulSoup as bs
from requests.exceptions import HTTPError
from IPython.core.display import display, HTML
from urllib.parse import unquote
# import mechanize

# Selenium for JS support
import sys
sys.path.insert(0,'/usr/lib/chromium-browser/chromedriver')
# from selenium import webdriver

In [None]:
#@title Headless
#@markdown As: headles_driver
# chrome_options = webdriver.ChromeOptions()
# chrome_options.add_argument('--headless')
# chrome_options.add_argument('--no-sandbox')
# chrome_options.add_argument('--disable-dev-shm-usage')

# #@markdown As: headles_driver
# headles_driver = webdriver.Chrome('chromedriver',chrome_options=chrome_options)

In [None]:
#@title PhantomJS
#@markdown As: phantom_driver
# from selenium import webdriver
# !wget https://bitbucket.org/ariya/phantomjs/downloads/phantomjs-2.1.1-linux-x86_64.tar.bz2
# !tar xvjf phantomjs-2.1.1-linux-x86_64.tar.bz2
# !cp phantomjs-2.1.1-linux-x86_64/bin/phantomjs /usr/local/bin
# !ls -al
# phantom_driver = webdriver.PhantomJS()

# Explore with tags
One option we have to obtain the product detaill's from a URL would be using the labels corresponding and attribute to find their necesary value. We can use Regular Expressions (RegEx) to speed up this process.

In [None]:
#@markdown Regex strings defined under: *variable*_regex
brand_regex = r"(?:brand|brandname|vendor|manufacturer|product-brand)(?![&])(.)"
crumb_regex = r"(?:category|categories|category path|breadcrumbs|breadcrum|crumb|navbar|Product Category)(?![&])(.)"
sku_regex = r"(?:sku|model|model id|model no|item number|itemid|article no|product number|style number|product id|item code|mfr no|data-product)(?![&])(.)"
model_regex = '(?:sku|model|model id|model no|item number|itemid|article no|product number|style number|product id|item code|mfr no|data-product)(?![&])(.)'
upc_regex = r'(?:"UPC"|"GTIN"|"EAN"|"upc"|"upccode"|"product_upc"|"product:upc"|"gtin"|"ean"|"barcode")'
part_regex = r"(?:PN|P/N|part no|part number|part|part #|mpn)(?![&g])(...)"
color_regex = r"(?:color|color_name|shade|finish|shade description)(?![&])(.)"
size_regex = r"(?:selected size|available size|choose a size|product size|attribute pa size)(?![&])(.)"
mfr_regex = r"(?:manufacturer|mfr|mfg|manufacturer logo|manufacturer name|label|producer|fabricante|fabrikant|Hersteller)(?![&])(.)"
price_regex = r"(?:MSRP|MRP|Recommended Customer Price|USD MSRP|List Price|reseller price may vary)(?![&])(.)"
ct_regex = r"(?:count|pieces|ct|pc|combo|per pack|contains)(?![&])(.)"
pk_regex = r"(?:packs|packs of|pk|package|combo|carton|carton pack)(?![&])(.)"
description_regex = r"(?:Product Details|Specification|Tech specs|Technical specification|Details|see more features|Product Description|Description|About the product|ingredients|Where to use|How to use)(?![&])(.)"
#@markdown The RegEx strings are stored in the directory: _labels_ <br> <br>
labels= {
    "Brand Name":brand_regex,
    "Category Name":crumb_regex,
    "SKU":sku_regex,
    "Model Name":model_regex,
    "UPC":upc_regex,
    "Part Number":part_regex,
    "Color name":color_regex,
    "Size Name":size_regex,
    "Manufacturer Name":mfr_regex,
    "List Price":price_regex,
    "Item Count":ct_regex,
    "Item Package Quantity":pk_regex,
    "Product Description":description_regex
    }

#@markdown ### Functions
#@markdown * Search tag
#@markdown * Finder
def search_tag(tag, string):
    regex=r"(?:"+tag+"=)"
    if tag in string:
        split_1 = re.split(regex,string)[1].replace('%20', ' ')
        print(f"{tag}: found in text\n"
            f"contains:{split_1}")

def finder(regex:str, text:str,*,
           look_before:int=10,
           look_ahead:int=250,
           extra_dots=1) -> str:
    """
    # RETURNS: group found, match
    """
    matches = re.finditer(regex, text, re.MULTILINE | re.IGNORECASE | re.UNICODE)
    
    for matchNum, match in enumerate(matches, start=1):
        print(f"Match: {matchNum} {match.group()}")
        # , match = match.group()
        
        for groupNum in range(0, len(match.groups())):
            groupNum = groupNum + 1
            return match.group()[:-extra_dots], match.string[match.start(groupNum)-look_before:match.end(groupNum)+look_ahead]

In [None]:
#@title Read URL
# Functions
render = lambda html_contents: display(HTML(html_contents))

# Attributes
web_driver = False #@param {type:"boolean"}
URL = "https://www.fultonperformance.com/products.aspx/trailer-accessories/trailer-fenders/trailer-fender/WV5KcOu1jGUOChEob70bwOTxorgeYPz1iJvSEqIJ0V0%3d" #@param {type:"string"}
show = True #@param {type:"boolean"}

# Retrieve URL
if len(URL)>0:
    response = requests.request('GET', URL)
    soup = bs(response.text)
    pretty_soup = soup.prettify()

    if web_driver:
        wd.get(URL)

    if show:
        render(response.text)

In [None]:
try:
    find = "Manufacturer Name" #@param ['Brand Name', 'Category Name', 'Model Name', 'UPC', 'Part Number', 'Color name', 'Size Name', 'Manufacturer Name', 'List Price', 'Item Count', 'Item Package Quantity', 'Product Description']
    finder(labels.get(find), soup.text, look_ahead=10)
except:
    "something happened"

In [None]:
#@title By id and attribute
try:
    id = "ProductPrice" #@param {type:"string"}
    attribute = "itemprop" #@param {type:"string"}
    try:
        wd.find_element_by_id(id).get_attribute(attribute=attribute)
    except:
        print("something happened")
except:
    "something happened"

Most of this can be done through the libraries: soup, and BeautifulSoup4.

For sites which rely heavly on JS it can also be usefull to use Phantom or headless_driver to access specific attributes.
```
def get_attribute(id, attribute):
    return headles_driver.find_element_by_id(id).get_attribute(attribute)
```
These two last options are slower to load and often you can find a workaround using soup.decode.




# Recursive search
A different paradigm is to use a list of URL to retrive and save their soups at once, thus reducing server work and reducing processing time.

In [None]:
#@title Additional set up

#@markdown * Import libraries
import json
import time
import unicodedata
# from w3lib.html import replace_entities
import pandas as pd
from tqdm import tqdm
from copy import deepcopy
from google.colab import data_table

In [None]:
#@title Introducing a Class: RetrivePage

class RetrivePage:
    "This is a page class meant to retrive a page and return it's soup object"
    def __init__(self, url, headless=False, phantom=False):
        __variant_extractor_lambda = lambda x: np.squeeze(re.findall(r"=(.*)", x))
        self.url = url
        self.soup = bs(requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}).text)
        self.parsed_url = unquote(url)
        self.headless_driver = None
        self.phantom_driver = None
        self.variant = __variant_extractor_lambda(self.url)
        if headless:
            self.headless_driver = get_headless_driver()
        if phantom:
            self.phantom_driver = get_phantom_driver()
    
    def apply(self, function):
        return function(self.page)
    
    # # Headless driver
    # def get_headless_driver(self):
    #     return headles_driver.get(self.url)
    # def build_headless(self):
    #     self.headless_driver = self.get_headless_driver()

    # # Phantom driver
    # def build_phantom(self):
    #     self.phantom_driver = self.get_phantom_driver()
    # def get_phantom_driver(self):
    #     return phantom_driver.get(self.url)

In [None]:
#@title Functions
#@markdown * Retriving data
#Functions
def load_file(file_name, usecols):
    def __read_csv(file_name, usecols=useful_cols, **kwargs):
        try:
            return pd.read_csv(file_name, usecols)
        except:
            display("Dismissed: useful_cols")
            return pd.read_csv(file_name)

    def __read_excel(file_name, usecols=useful_cols, **kwargs):
        try:
            return pd.read_csv(file_name, usecols)
        except:
            display("Dismissed: useful_cols")

            return pd.read_excel(file_name)
    # read
    if file_name.rsplit('.',1)[-1] in ['csv','text']:
        print('text')
        return __read_csv(file_name, usecols=useful_cols, **kwargs)
    elif file_name.rsplit('.',1)[-1] in ['xls','xlsx']:
        print('Excel')
        return __read_excel(file_name, usecols=useful_cols, **kwargs)
    else:
        "unknown format"
#@markdown List of useful columns
#["url","brand_name","manufacturer_name","product_description","model","color_name","item_package_quantity", "unit_count"]
all_columns=["title","bread_crumb1","bread_crumb2","bread_crumb3","brand_name",
             "manufacturer_name","model","upc","color_name","size_name",
             "item_package_quantity","part_number","list_price","unit_count",
             "product_description"]
useful_cols =  ["url","brand_name","manufacturer_name","product_description","model","color_name","item_package_quantity", "unit_count"]#@param {type:"raw"}

In [None]:
#@title Get the Websites
wait_time =  0 #@param {type:"integer"}
def load_URLs(URL):
    r = requests.request('GET', URL)
    time.sleep(wait_time)
    return bs(r.text), phantom_driver.get(URL), headles_driver.get(URL)

file_name = "/content/nakamichicaraudio.csv" #@param {type:"string"}
kwargs  =  {} #@param {type:"raw"}
#@markdown Data loaded as: data
data = load_file(file_name, usecols=useful_cols)

## separating URLs
url_variant = data.url[data.url.apply(lambda x: "=" in x)]
not_variant = data.url[data.url.apply(lambda x: "=" not in x)]

print(f"Not variant URLs: {len(not_variant)}\n"
      f"URL with variant: {len(url_variant)}")

#@markdown Create a new pd.Series to store the web scrapped RetrivePage objects.


In [None]:
#@markdown ...loading
start_time = time.time()
stopwatch = lambda x: time.time() - start_time

pages_1 = data[:40].url.apply(RetrivePage), print("20%\t", f"time {stopwatch(start_time)}")
pages_2 = data[40:80].url.apply(RetrivePage), print("40%\t", f"time {stopwatch(start_time)}")
pages_3 = data[80:120].url.apply(RetrivePage), print("60%\t", f"time {stopwatch(start_time)}")
pages_4 = data[120:160].url.apply(RetrivePage), print("80%\t", f"time {stopwatch(start_time)}")
pages_5 = data[160:].url.apply(RetrivePage), print("100%\t", f"time {stopwatch(start_time)}")
pages = pd.concat([pages_1[0], pages_2[0], pages_3[0],pages_4[0], pages_5[0]])
print("Completed%", "time: ", stopwatch(start_time))

In [None]:
#@markdown ### **Start** ► URL & title
_get_url = lambda x:x.url
urls = pages.apply(_get_url)
result = {"url":urls} #@param {type:"raw"}
#@markdown include titles?
boolean = False #@param {'type':'boolean'}
if boolean:
    _get_title = lambda x:x.soup.title.text.strip('\n').rstrip('\n')
    titles = pages.apply(_get_title)
    result.update({"title":titles})

In [None]:
#@markdown Progress
show_progress = True #@param {type:"boolean"}
df_result = pd.DataFrame(result)
if show_progress:
    display(data_table.DataTable(df_result))

export_file = False #@param {type:"boolean"}
__export_name = file_name.split('.')[0] + "_reviwed.csv"
if export_file:
    df_result.to_csv(__export_name)

In [None]:
#@markdown Find url' index
urls_dict = {url:i for i,url in enumerate(urls)}
url = "https://nakamichicaraudio.com/products/nakamichi-na-md1?variant=29234205032501" #@param {type:"string"}
urls_dict.get(url)

In [None]:
#@markdown Example page: example
index =   12 #@param {type:"integer"}
page = pages[index]
soup = page.soup
url = page.url
variant = page.variant
example = {'index':index,
           'page':page,
           'soup':soup,
           'url':url,
           'variant':variant}
print(f"url: {page.url}")
show = False #@param {type:"boolean"}
if show:
    render(response.text)

## Finding additional attributes

### Done

In [None]:
result.update({"title":titles})

In [None]:
# variants = pages.apply(lambda x: x.variant)
parsed_urls = pages.apply(lambda page:np.squeeze((page.parsed_url.split('?')[1:])))

In [None]:
pattern = re.compile(r"\d+")
def __get_uc(parsed_url):
    try: return re.findall(pattern, parsed_url)
    except: return parsed_url
unit_counts = parsed_urls.apply(__get_uc)

In [None]:
#@markdown Breadcrumbs
def __get_crumbs(page):
    crumb = page.soup.find("nav", {"class":"woocommerce-breadcrumb"})
    try:
        return unicodedata.normalize("NFKD", crumb.text)
    except:
        return crumb
crumbs = pages.apply(__get_crumbs)
result.update({"crumbs":crumbs})

In [None]:
#@markdown Titles
def __get_titles(page):
    title = page.soup.find("h1", {"class":"product_title entry-title"})
    try:
        return unicodedata.normalize("NFKD", title.text)
    except:
        return title
titles = pages.apply(__get_titles)
result.update({"titles":titles})

In [None]:
# text = page.soup.find("div", {'class':"woocommerce-tabs wc-tabs-wrapper"}).text
unicodedata.normalize('NFKC', text)





						Description					



						Directions					



						Reviews (5)					



						Partner Assets					



Yard Odor Eliminator Plus Citronella Spray is for your lawn and yard. Do not spray directly on pets.
Yard Odor Eliminator Plus Citronella helps eliminate stool and urine odors. Spray on any outdoor surface. For use on grass, plants, shrubs, patios, patio furniture, kennels, dog runs, swing sets, fences, block walls or any other surface where odors arise due to pets. If spraying on fabric, test product on a very small inconspicuous surface area before using.
Caution:
Keep out of the reach of children. Do not spray directly on pets. Avoid contact with eyes or accidental ingestion. Keep children and pets from sprayed from sprayed area until dry. In case of allergic reaction or accidental ingestion consult a health professional immediately. Do not spray around fish ponds or where run-off will drain into ponds. To avoid run-off or puddling, do not over-spray product.


Directions:
Shake well before using. Connect sprayer to garden hose. Turn on water. To begin spraying, point nozzle in the direction you want to spray. Turn plastic knob to “ON” position. Spray evenly over area. To stop spraying turn knob to “OFF” position. Turn off water and disconnect sprayer from hose. Yard Odor Eliminator Plus quickly eliminates pet odors from your yard due to stool and urine, leaving your yard with a pleasant citronella scent. 





			5 reviews for Yard Odor Eliminator Plus Citronella Spray 
Disclaimer: These testimonials are for informational purposes only. The information is not a substitute for expert veterinary care. Testimonials are written by actual customers and represent their own observations. These observations are not guaranteed, are not medically substantiated, and may not be typical for other pets.





Rated 5 out of 5

Angus Tillson 
– May 26, 2016

This product helped get rid of those “dog” smells that were always wafting inside our house from the backyard. Now I get a nice citronella smell. Sure beats the odors I was smelling! Great product, it does what it says 🙂








Rated 5 out of 5

Cindy 
– July 16, 2016

Does what it says but smell is only gone about a week. Still a bottle does 3 doses for our area and that’s great. Also keeps the mosquitoes and insects away from the house a bit too. Smells great, citronella smell is not really strong. Definitely reordering regularly.









Jenifer L Jordan 
– September 2, 2019

What are the ingredients?









NaturVet 
– September 4, 2019

Primarily deionized water, and citronella oil.











Christine 
– September 1, 2020

How often do I need to reapply?









NaturVet 
– September 2, 2020

Hi Christine.  Thank you for your product question.  The Yard Odor Eliminator will eliminate the odors instantly however they will return once the area is re-contaminated.  You can use the product as many times as necessary.











Matt 
– September 20, 2020

Can the Yard Odor Eliminator Refill be used in a hand-pump sprayer?   I don’t have a water source near the area that needs spraying, so the hose hookup bottle won’t work for me.  Does it need to be diluted with water...and if so, what is the ratio?









NaturVet 
– September 21, 2020

Hi Matt.  Thank you for your product question.  You can dilute the product 50/50 to apply it from a hand-pump sprayer.











Add a review Cancel replyYour email address will not be published. Required fields are marked *Your rating *
Rate...
Perfect
Good
Average
Not that bad
Very poor
Your review *Name *
Email *
 Save my name, email, and website in this browser for the next time I comment.
 

 






[partner-assets]



### Work In Progress

#### From scripts
Often pages relay on a data structure for keeping the information of the different variants.

In [None]:
# @title Metadata
# @markdown **var_name** From where we're going to obtain the values.
var_name = "SUBParams = " #@param {'type':'string'}
def get_meta(page):
    soup = page.soup
    scripts = soup.find_all('script')
    mets = ''
    for s in scripts:
        if var_name in s.text:          # find the script of interest
            # Exctract the json value of var_name
            meta = s.text
            meta = meta.split(var_name)[1].split('};')[0]+'}'
            # Load the json and make's sure the format is correct
            json_meta = json.loads(meta)
            page.SUBParams = json_meta # add metadata to the RetrivedPage
            return json_meta
    print(count)
metadata = pages.apply(get_meta)
# @markdown **result_key** Name to store the results in the result' directory.
result_key = "metadata" #@param {type:"string"}
result.update({result_key:metadata}) 

In [None]:
#@markdown vendor
_get_vendor = lambda m: m['product']['vendor']
vendor = metadata.apply(_get_vendor)
result.update({"vendor":vendor})

#@markdown resourceId
_get_resourceID = lambda m: m['page']['resourceId']
resourceId = metadata.apply(_get_resourceID)
result.update({"resourceId":resourceId})

In [None]:
#@title Variant Info
def __variant_info(page):
    ps= page.SUBParams

    var_ls = ps['product']['variants']
    if len(var_ls)>1:
        for v in var_ls:
            s=''
            try: s = str(i['id'])
            except: page.variant=None
            if s == page.variant:
                page.vSUBParams = [i for i in var_ls if i['id'] == page.variant][0]
            else:
                page.vSUBParams = None
    else:
        page.vSUBParams = var_ls[0]

    try:return page.vSUBParams
    except:return None

In [None]:
variant_info = pages.apply(__variant_info)
def __get_barcode(b):
    try: return str(b['barcode'])
    except: return b
barcodes = barcode.apply(__get_barcode)
result.update({"barcodes":barcodes})

In [None]:
# parsed_urls = pages.apply(lambda page: page.parsed_url)
pattern = r"\dpc"
[i for parsed_url in parsed_urls for i in re.finditer(pattern, parsed_url, re.MULTILINE)]

In [None]:
#@markdown Progress: df
show_sample = True #@param {type:"boolean"}
df = pd.DataFrame(result)
df.drop("metadata", 1, inplace=True)
if show_sample:
    display(data_table.DataTable(df))

df.resourceId = resourceId.apply(lambda x: str(x))
df.to_excel("Rukket.xlsx",index="urls")

In [None]:
#@title Description
def __get_descriptions(page):
    soup = page.soup
    content = soup.find_all('div', {"class":"TabbedPanelsContent"})
    description_tabs = soup.find("ul", attrs={"class":"TabbedPanelsTabGroup"})
    description_tabs = description_tabs.text\
                                    .rstrip('\n').strip('\n')\
                                    .split('\n')

    description = ''
    ignore = ['Optional Accessories', 'Impeller Kits']

    def clean_text(text_to_clean):
        return text_to_clean.strip('\n').rstrip('\n').strip('\xa0').rstrip('\n').strip('\n')

    def __post_process_description(description):
        description_noWarranty = ''
        for i, strr in enumerate(description.split('\n')):
            if (len(strr) < 15) & ("arranty" in strr):
                ++i
            else:
                description_noWarranty += strr
        return description_noWarranty\
                            .rstrip('\n')\
                            .replace('â\x80¢', '•')\
                            .replace('Â°', '°')\
                            .replace('âs',"'s")
                            

    try:
        for i, tab_name in enumerate(description_tabs):
            if tab_name not in ignore:
                info = clean_text(content[i].text)
                description += tab_name +  '\n' + info + '\n'
        return __post_process_description(description)
    except IndexError:
        print(page.url)
descriptions = pages.apply(__get_descriptions)
# df = df.join(descriptions)

### Done

### json

In [None]:
#@markdown Find Attributes
dc = json.loads(finder(r'(?:var meta = )(.*)(?:};)', s.text)[0].replace('var meta = ',''))
try:
    variant_num = URL.split('variant=')[1]
    variant = [i for i in dc['product']['variants'] if i['id']==int(variant_num)][0]
except IndexError:
    variant = dc['product']['variants'][0]
#@markdown * title
title = driver.find_element_by_class_name("standard-single").text

#@markdown * Public_title  | _has the size values_
public_title = variant['public_title'] 

#@markdown * brand
brand = finder(r"(?:\"brand\")(.)", s.text, look_ahead=20)[1].split(":")[1].replace('"','')

#@markdown * manofacturer
try:
    label = dc['product']['label']
except:
    label = finder(r"(?:\"label\")(.)", s.text, look_ahead=20)[1].split(":")[1].replace('"','')

#@markdown * Price
price = wd.find_element_by_id('ProductPrice')

# Crawler
Finally, another paradigm is defining a the page structure to obtain the values using a crawler.

In [None]:
#@title Imports
import requests
from bs4 import BeautifulSoup

In [None]:
#@title Legacy sites
import json
formats_json = json.dumps([
   "demeterfragrance.com",
   {
      "Title":"title",
      "BarcodeTag":"sku",
      "Description":"<div>"
   }
])
legacy_sites = json.loads(formats_json)

In [None]:
# Key- Retrival function
kwargs = {'title':(lambda p: p.title)}

# Class: Website 
class Website:
    """
    Contains information about website structure.
    """
    def __init__(self, url, **kwargs):
        self.url = url
        self.__dict__.update(kwargs)

# Initialize the website as an empty entity
# kwargs_n = {keys:None for keys in kwargs.keys()}
# w = Website(page.url, kwargs_n)

In [None]:
class Crawler:
    def __init__(self, attrs:[dir]):
        self.attrs=attrs

    def getPage(self, url):
        try:
            req = requests.get(url)
        except requests.exceptions.RequestException:
            return None
        return BeautifulSoup(req.text, 'html.parser')

    def safeGet(self, pageObj, selector):
        """
        Utility function used to get a content string from a
        Beautiful Soup object and a selector. Returns an empty
        string if no object is found for the given selector
        """
        selectedElems = pageObj.soup.select(selector)
        if selectedElems is not None and len(selectedElems) > 0:
            return '\n'.join([elem.get_text()
                for elem in selectedElems])
        return 'empty'

    def ifer(self, page, sfun:[str, callable]):
        if callable(sfun):
            return sfun(page)
        else:
            return self.safeGet(page, sfun)

    def parser(self, pageObj=None, url:[str]=None):
        """
        Extract content from a given page URL
        """
        if pageObj is None:
            if url is not None:
                pageObj = self.getPage(url)
            else:
                "You need to pass one of pageObj/url"
        else:
            url = pageObj.url

        attrs = {key:self.ifer(pageObj, sfun)
                for key, sfun in self.attrs.items()
                }
        return Website(pageObj.url, page=pageObj, **attrs)

In [None]:
# c = Crawler(kwargs)
w=c.parser(page.soup, page.url)
page.url