In [1]:
import pandas as pd
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from tqdm import tqdm

In [2]:
PATH_URLS = "datahuck_sample.csv"

urls_df = pd.read_csv(PATH_URLS).drop_duplicates(ignore_index=True)
urls_df["seed_url"] = urls_df["seed_url"].apply(lambda l: l.replace("https", "http"))
urls_df = urls_df.set_index("seed_url")

In [3]:
class Crawler:
    def __init__(self, output_path="output.csv"):
        self.output_path = output_path
        self.errors_path = f"errors_{output_path}"

        self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'}

        self.num_crawled = 0 # crawled is defined as succesfully extracted relevant information
        self.num_errors = 0
    
    def __str__(self):
        return f'Crawler has so far processed [{self.num_crawled}] urls and failed on [{self.num_errors}] urls'
    
    def process_url(self, url):
        url = url.replace("https", "http")
        try:
            response = requests.get(url, headers=self.headers)

            response.raise_for_status()  # Check if the request was successful

            soup = BeautifulSoup(response.content, 'html.parser')
            # Find all <a> tags with an href attribute
            links = soup.find_all('a', href=True)
            
            # Extract the href attribute from each <a> tag and store it in a list
            hrefs = [link['href'] for link in links]
            hrefs = [urljoin(url, link['href']) for link in links]
            hrefs = [link.replace(urlparse(link).scheme + ':', 'http:') for link in hrefs]

            # To see all internal links
            # print([l for l in hrefs if url in l])

            hrefs = [urlparse(link).path.strip("/") for link in hrefs if url in link]
            hrefs = list(set([l for l in hrefs if len(l) > 0]))

            self.num_crawled += 1

            return {"url": url, "results": hrefs, "error?": False}

        except requests.RequestException as e:
            self.num_errors += 1
            return {"url": url, "results": e, "error?": True}

    def batch_process(self, urls, batch_size=10, alias="ALL"):
        """
        Process a urls in batches

        urls: pd.DataFrame of urls
        batch_size: size of batch
        alias: When parallel processing give each instance a unique identitifier so intermediate results don't clash
        """
        num_batches = len(urls) // batch_size + (len(urls) % batch_size > 0)
        processed_data = []
        print(f'Processing {alias} with batch size: {batch_size} and number of batches: {num_batches}')

        for i in tqdm(range(num_batches)):
            batch = urls.iloc[i * batch_size:(i + 1) * batch_size]
            for seed_url in batch.index:
                processed_data.append(self.process_url(seed_url))
                
            # Save intermediate results
            intermediate_df = pd.DataFrame(processed_data)
            intermediate_df.to_csv(f'{alias}_intermediate_results.csv', index=False)

        # Save final results
        final_df = pd.DataFrame(processed_data)
        final_df.to_csv(f'{alias}_{self.output_path}', index=False)
        print(self)
        return final_df

In [4]:
import os

num_cores = os.cpu_count()
print(f"Number of CPU cores: {num_cores}")

def get_batch_start_end_indices(total_rows, n_batches):
    batch_size = total_rows // n_batches
    indices = []
    for i in range(n_batches):
        start = i * batch_size
        # For the last batch, ensure it goes to the end of the total rows
        end = start + batch_size if i != n_batches - 1 else total_rows
        indices.append((start, end))
    return indices


total_rows = len(urls_df)

# Ideally the number of CPU cores
n_batches = num_cores - 2

start_end_indices = get_batch_start_end_indices(total_rows, n_batches)
segments = [urls_df.iloc[start: end] for start, end in start_end_indices]
segments
    

Number of CPU cores: 12


[Empty DataFrame
 Columns: []
 Index: [http://www.roarkproductions.com, http://tapron.co.uk, http://the-shoguns-dojo.myshopify.com, http://dobsons.com.au, http://www.qtandco.com, http://leggingspromo.com, http://www.brittaylormusic.com, http://www.chumbak.com, http://www.wrp-timber-mouldings.co.uk, http://www.yourdollaryourdiscount.com, http://www.thehouseoftimber.com, http://www.vingolf.com, http://instockps.bigcartel.com, http://khajoorstudio.com, http://pinas-sadya.com, http://detoxboxdelivered.com, http://africanbutterfly.com, http://directkeys.nl, http://kevinmiller.co, http://www.profoundoutdoors.com, http://www.jaygalorehair.com, http://heracloset.com, http://stlwomensmarch.com, http://myweighmaster.com, http://kleosmx.com, http://cloudrf.com, http://bdfingerboards.bigcartel.com, http://lovediorhairco.myshopify.com, http://www.al-barakah.co.uk, http://www.synofit.de, http://www.ineffablegoodsonline.com, http://jjessentials.bigcartel.com, http://www.sachasiblends.com, http://dipp

In [6]:
import concurrent.futures

# segments = p1, p2  = urls_df.iloc[0:100], urls_df.iloc[100: 200]

# Create instances of Crawler for each DataFrame
crawler_instances = [Crawler() for _ in segments]

# Function to wrap batch_process method
def process_with_crawler(crawler, df, alias):
    return crawler.batch_process(df, alias=alias)

with concurrent.futures.ThreadPoolExecutor(max_workers=len(segments)) as executor:
    futures = [executor.submit(process_with_crawler, crawler, df, f"DF{i+1}") for i, (crawler, df) in enumerate(zip(crawler_instances, segments))]
    
    # Wait for all futures to complete (optional)
    for future in concurrent.futures.as_completed(futures):
        pass  # We are ignoring the results

Processing DF1 with batch size: 10 and number of batches: 344
Processing DF2 with batch size: 10 and number of batches: 344
Processing DF3 with batch size: 10 and number of batches: 344
Processing DF4 with batch size: 10 and number of batches: 344
Processing DF5 with batch size: 10 and number of batches: 344
Processing DF6 with batch size: 10 and number of batches: 344
Processing DF7 with batch size: 10 and number of batches: 344
Processing DF8 with batch size: 10 and number of batches: 344
Processing DF9 with batch size: 10 and number of batches: 344
Processing DF10 with batch size: 10 and number of batches: 345


  0%|          | 0/344 [00:00<?, ?it/s]





[A[A[A[A[A[A
[A



[A[A[A[A




[A[A[A[A[A


[A[A[A

[A[A






[A[A[A[A[A[A[A







[A[A[A[A[A[A[A[A

[A[A


[A[A[A




[A[A[A[A[A
[A







[A[A[A[A[A[A[A[A






  0%|          | 1/344 [00:10<58:29, 10.23s/it]



[A[A[A[A






  1%|          | 2/344 [00:15<41:54,  7.35s/it]


[A[A[A







[A[A[A[A[A[A[A[A

[A[A





[A[A[A[A[A[A



[A[A[A[A




[A[A[A[A[A






[A[A[A[A[A[A[A


[A[A[A
  1%|          | 3/344 [00:26<52:06,  9.17s/it]







[A[A[A[A[A[A[A[A






[A[A[A[A[A[A[A

[A[A


  1%|          | 4/344 [00:33<45:25,  8.02s/it]

[A[A




[A[A[A[A[A
[A







  1%|▏         | 5/344 [00:37<38:12,  6.76s/it]




[A[A[A[A[A



[A[A[A[A






[A[A[A[A[A[A[A
[A


[A[A[A

[A[A




[A[A[A[A[A
[A



  2%|▏         | 6/344 [00:51<52:15,  9.28s/it]


[A[A[A







[A[A[A[A[A[A[A[A






In [5]:
# insert number of splits you had 

all_dfs = pd.concat([pd.read_csv(f"DF{i}_intermediate_results.csv") for i in range(1, 10 + 1)], ignore_index=True)

In [22]:

def subtract_dataframes(df1, df2, key_column):
    """
    Subtracts the rows of df2 from df1 based on a key column.

    Parameters:
    df1 (pd.DataFrame): The original DataFrame.
    df2 (pd.DataFrame): The DataFrame containing rows to subtract from df1.
    key_column (str): The column name to use as the key for subtraction.

    Returns:
    pd.DataFrame: The resulting DataFrame after subtraction.
    """
    result_df = df1.merge(df2, on=key_column, how='left', indicator=True)
    result_df = result_df[result_df['_merge'] == 'left_only']
    result_df = result_df.drop(columns=['_merge'])
    return result_df

# Example usage
df1 = pd.DataFrame({"url": ["a.com", "b.com", "c.com"]})
df2 = pd.DataFrame({"url": ["a.com"]})

result_df = subtract_dataframes(df1, df2, key_column='url')
print(result_df)

     url
1  b.com
2  c.com


In [26]:
subtract_dataframes(urls_df, all_dfs.rename(columns={"url":"seed_url"})[["seed_url"]], key_column="seed_url").to_csv("remaining.csv", index=False)

In [18]:
all_dfs = pd.read_csv("output_30k.csv")

In [19]:
all_dfs

Unnamed: 0,url,results,error?
0,http://nataliehuggins.com,"['lessons', 'cart', 'contact', 'epk', 'about-1...",False
1,http://fauxynaturalhaircare.com,"['cart', 'products/deep-conditioner', 'product...",False
2,http://winkeyless.kr,"['product/b-mini-ex-x2-pcb', 'cart', 'product/...",False
3,http://anna-goodman.com,['cdn/shop/products/ANNAGOODMANBLEUMARINEtexti...,False
4,http://darrenbooth.com,"['googleportraits', 'cart', 'work', 'portraits...",False
...,...,...,...
29723,http://labikineria.com.mx,"['cart', 'collections/lo-nuevo', 'pages/guia-d...",False
29724,http://ohbeauty.com,"['products/isdinceutics-k-ox-eyes', 'collectio...",False
29725,http://belovedforever.us,['admin'],False
29726,http://www.popyourself.ch,[],False
