In [1]:
import pandas as pd
import numpy as np
import itertools
import unidecode
import asyncio
import aiohttp
import random
import codecs
import time
import re
import os

from contextlib import contextmanager
from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils
from bs4 import BeautifulSoup
from tqdm import tqdm

## Databricks file

In [None]:
spark = SparkSession.builder.getOrCreate()
dbutils = DBUtils(spark)

In [None]:
def id_fetcher_per_sector(country=None):
    if 'DATABRICKS_RUNTIME_VERSION' in os.environ:
        files = [x[0] for x in dbutils.fs.ls("abfss://landingzone@storagetiltdevelop.dfs.core.windows.net/tiltEP/") if f"/{country}" in x[0]]
    else:
        files = [x[0] for x in dbutils.fs.ls("abfss:/mnt/indicatorBefore/tiltEP/") if f"/{country}" in x[0]]

    out = f"../input_data/{country}_ids.csv"
    complete_ids = []
    for i in range(len(files)):
        x = spark.read.csv(files[i],header=True, inferSchema=True, sep=";").toPandas()
        ids = list(x.drop_duplicates(subset="id")["id"])
        complete_ids.append(ids)

    complete_ids = list(set([item for sublist in complete_ids for item in sublist]))
    complete_ids = pd.DataFrame({'id': complete_ids})
    complete_ids.to_csv(out)
    return complete_ids



In [None]:
id_fetcher_per_sector("italy")

Unnamed: 0,id
0,s-3-srl_seac000145150-001
1,dia-costruzioni-srl_seac005437117-001
2,guerini-paolo_seac000824090-001
3,studio-tecnico-associato-stf-faccin-arch-a-fac...
4,2-emme-impianti-srl_seac006235054-001
...,...
224533,studio-tecnico-navale-ansaldo-snc_seac00279190...
224534,agrosun-srl_seac002116524-001
224535,sole-edilizia-societa-a-responsabilita-limitat...
224536,savini-stefano_seac006037585-001


In [None]:
id_fetcher_per_sector("united-kingdom")

Unnamed: 0,id
0,design-build-modern-stained-glass_000000044107...
1,brent-taxis-cabs_00000005453797-798182001
2,hi-performance-feeds_00000004299041-001
3,maxiflow-drains_00000004249310-001
4,simply-frozen_00000004147330-001
...,...
123960,nwt-distribution-ltd_00000004196560-001
123961,jkx-executive-travel-ltd_00000004287528-001
123962,d-herbert-ltd_00000004401371-001
123963,h-wright-sons_00000004349558-001


## AIOHTTP Scraping

In [2]:
# Read your proxy list from .txt file
with open("../input_data/proxies_10.txt") as f:
    lines = [line.strip() for line in f.readlines()]
    proxy_list = ["http://{}:{}@{}:{}".format(line.split(":")[2], line.split(":")[3], line.split(":")[0], line.split(":")[1]) for line in lines]

In [3]:
proxies_in_use = []

@contextmanager
def allocate_proxy():
    """Get a free proxy and hold it as in use"""
    available_proxies = [p for p in proxy_list if p not in proxies_in_use] # Select proxies that are not in use
    if available_proxies:
        proxy = random.choice(available_proxies) 
    else:
        proxy = random.choice(proxy_list) # If there is not an available proxy, we resort to the random.choice method for the entire list.
    try:
        proxies_in_use.append(proxy)
        yield proxy
    finally:
        proxies_in_use.remove(proxy)

In [4]:
async def fetch(url, session):
    async with session.get(url,timeout=5000) as response: # not using proxy at the moment
        company_id = url.split("/")[-2].lower()+ "_" + url.split("/")[-1].split(".")[0].lower()
        body = await response.text()
        soup = BeautifulSoup(body, 'html.parser')
        activities = []
        size_keywords = 0
        # check if you can there is extra information that can be scraped
        try:
            backend_script = soup.find("script", string=re.compile(r"^window\.__NUXT__"))
            # Extract the information between "keywords: [ ]"
            keywords = re.findall(r'keywords:\s*\[(.*?)\]', '\n'.join(backend_script), re.DOTALL)
            # Remove leading and trailing whitespaces from each keyword
            keywords = [keyword.strip() for keyword in keywords]
            matches = re.findall(r'id:"keyword-(\d+)"', keywords[1])
            indexes = [int(match) for match in matches]
            size_keywords = len(indexes)
            # Extract the words between "name:" and "}"
            words = [unidecode.unidecode(word.lower()).strip() for word in re.findall(r'name:"(.*?)"', keywords[1])]
            activities.append(words)
        except Exception as e:
            pass
        try:
            activities.append([codecs.decode(unidecode.unidecode(activity.text.lower()), 'unicode_escape').strip() for activity in soup.find("ul", class_="ep-keywords__list pl-0").find_all("li")])
            flattened_activities = list(set(itertools.chain.from_iterable(activities)))
            obtained_size = len(flattened_activities)
            # merge the list together separate by |
            activities = [' | '.join(flattened_activities)]
        except Exception as e:
            pass

        # Print the extracted words
        if activities == []:
            return {"id": company_id, "activities": activities, "coverage": 0}
        else:
            return {"id": company_id, "activities": activities, "obtained_size": obtained_size, "size_keywords": size_keywords, "coverage": (obtained_size/size_keywords)*100 if size_keywords > 0 else 100}
        

In [5]:
async def bound_fetch(sem, url, session, pbar):
    # Getter function with semaphore.
    async with sem:
        result = await fetch(url, session)
        pbar.update(1)
        return result

In [15]:
async def run(input_file):
    base_url = "https://www.europages.co.uk/{}/{}.html"

    company_ids = pd.read_csv(input_file)
    companies_t = company_ids["id"].tolist()
    companies = [company.upper() for company in companies_t]
    links = np.array([base_url.format(*company.split("_")) for company in companies])
    tasks = []
    # create instance of Semaphore
    sem = asyncio.Semaphore(1000)
    
    # Create client session that will ensure we dont open new connection
    # per each request.
    async with aiohttp.ClientSession() as session:
        # start timer
        start_time = time.time()
        pbar = tqdm(total=len(links), desc='Scraping EuroPages')    
        for link in links:
            # pass Semaphore and session to every GET request
            task = asyncio.ensure_future(bound_fetch(sem, link, session, pbar))
            tasks.append(task)
        responses = await asyncio.gather(*tasks)
    end_time = time.time()
    duration = end_time - start_time
    # report final message
    print('\nAll done. Duration: {}s'.format(duration))
    return responses

In [16]:
uk_data = await run("../input_data/united-kingdom_ids.csv")

Scraping EuroPages: 100%|██████████| 123965/123965 [56:40<00:00, 36.46it/s] 


All done. Duration: 3400.2306847572327s





In [24]:
uk_df = pd.DataFrame(uk_data, columns=['id', 'activities'])
uk_df.to_csv('../output_data/uk_data.csv', index=False)

In [17]:
italy_data = await run("../input_data/italy_ids.csv")

Scraping EuroPages: 100%|██████████| 224538/224538 [1:53:48<00:00, 32.88it/s]  


All done. Duration: 6828.539457082748s





In [25]:
italy_df = pd.DataFrame(italy_data, columns=['id', 'activities'])
italy_df.to_csv('../output_data/italy_data.csv', index=False)