# Distributed Web Crawler

This notebook implements a distributed web crawler using Apache Spark and Python. It demonstrates:
- Distributed crawling across multiple machines
- Parallel processing for improved performance
- Fault tolerance and error handling
- Data extraction and analysis

## 1. Setup and Installation

In [22]:
# Install Java and Spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz #if already installed
!tar -xzf spark-3.5.5-bin-hadoop3.tgz
!pip install -q findspark requests beautifulsoup4 pandas matplotlib

## 2. Initialize Spark Environment

In [23]:
import os
import findspark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
findspark.init("spark-3.5.5-bin-hadoop3")

# Initialize Spark
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.appName("BigDistributedCrawler").getOrCreate()
sc = SparkContext.getOrCreate()

print("✅ Spark initialized successfully!")

DEBUG:py4j.clientserver:Command to send: r
u
SparkSession$
rj
e

2025-05-11 15:34:34 [py4j.clientserver] DEBUG: Command to send: r
u
SparkSession$
rj
e

DEBUG:py4j.clientserver:Answer received: !ycorg.apache.spark.sql.SparkSession$
2025-05-11 15:34:34 [py4j.clientserver] DEBUG: Answer received: !ycorg.apache.spark.sql.SparkSession$
DEBUG:py4j.clientserver:Command to send: r
m
org.apache.spark.sql.SparkSession$
MODULE$
e

2025-05-11 15:34:34 [py4j.clientserver] DEBUG: Command to send: r
m
org.apache.spark.sql.SparkSession$
MODULE$
e

DEBUG:py4j.clientserver:Answer received: !yro69
2025-05-11 15:34:34 [py4j.clientserver] DEBUG: Answer received: !yro69
DEBUG:py4j.clientserver:Command to send: i
java.util.HashMap
e

2025-05-11 15:34:34 [py4j.clientserver] DEBUG: Command to send: i
java.util.HashMap
e

DEBUG:py4j.clientserver:Answer received: !yao70
2025-05-11 15:34:34 [py4j.clientserver] DEBUG: Answer received: !yao70
DEBUG:py4j.clientserver:Command to send: c
o70
put
sspark.app.name
sBigD

✅ Spark initialized successfully!


## 3. Define Crawler Functions

In [24]:
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse

def crawl_page(url, max_links=50, max_images=30, max_scripts=30):
    try:
        headers = {'User-Agent': 'Mozilla/5.0'}
        response = requests.get(url, headers=headers, timeout=10, allow_redirects=True)
        response.raise_for_status()

        soup = BeautifulSoup(response.text, 'html.parser')
        title = soup.title.string.strip() if soup.title else "No Title"

        # Meta tags
        meta_desc = soup.find("meta", attrs={"name": "description"})
        meta_keywords = soup.find("meta", attrs={"name": "keywords"})
        meta_desc = meta_desc.get("content", "") if meta_desc else ""
        meta_keywords = meta_keywords.get("content", "") if meta_keywords else ""

        # Links
        raw_links = [a['href'] for a in soup.find_all('a', href=True)]
        unique_links = list(set([
            urljoin(url, link) for link in raw_links
            if urlparse(urljoin(url, link)).scheme in ['http', 'https']
        ]))[:max_links]

        # Images
        raw_images = [img['src'] for img in soup.find_all('img', src=True)]
        unique_images = list(set([urljoin(url, img) for img in raw_images]))[:max_images]

        # Scripts
        raw_scripts = [script['src'] for script in soup.find_all('script', src=True)]
        unique_scripts = list(set([urljoin(url, script) for script in raw_scripts]))[:max_scripts]

        return {
            'original_url': url,
            'final_url': response.url,
            'status_code': response.status_code,
            'content_type': response.headers.get('Content-Type', ''),
            'title': title,
            'meta_description': meta_desc,
            'meta_keywords': meta_keywords,
            'link_count': len(unique_links),
            'links': unique_links,
            'image_count': len(unique_images),
            'images': unique_images,
            'script_count': len(unique_scripts),
            'scripts': unique_scripts
        }
    except Exception as e:
        return {'url': url, 'error': str(e)}

## 4. Define Seed URLs and Start Crawling

In [25]:
# Seed URLs
seed_urls = [
    "https://www.wikipedia.org", "https://www.bbc.com", "https://edition.cnn.com",
    "https://www.python.org", "https://www.stackoverflow.com", "https://www.github.com",
    "https://www.nytimes.com", "https://www.microsoft.com", "https://www.oracle.com",
    "https://www.reddit.com", "https://www.mozilla.org", "https://www.nationalgeographic.com",
    "https://www.forbes.com", "https://www.ted.com", "https://www.medium.com",
    "https://www.theguardian.com", "https://www.bloomberg.com", "https://www.amazon.com",
    "https://www.apple.com", "https://www.linkedin.com"
]

def crawl_with_progress(url):
    result = crawl_page(url)
    print(f"{'Success' if 'error' not in result else 'Failed'}: {url}")
    return result

# Start distributed crawling
rdd = sc.parallelize(seed_urls, numSlices=4)
results = rdd.map(crawl_with_progress).collect()

DEBUG:py4j.clientserver:Command to send: r
u
PythonRDD
rj
e

2025-05-11 15:35:07 [py4j.clientserver] DEBUG: Command to send: r
u
PythonRDD
rj
e

DEBUG:py4j.clientserver:Answer received: !ycorg.apache.spark.api.python.PythonRDD
2025-05-11 15:35:07 [py4j.clientserver] DEBUG: Answer received: !ycorg.apache.spark.api.python.PythonRDD
DEBUG:py4j.clientserver:Command to send: r
m
org.apache.spark.api.python.PythonRDD
readRDDFromFile
e

2025-05-11 15:35:07 [py4j.clientserver] DEBUG: Command to send: r
m
org.apache.spark.api.python.PythonRDD
readRDDFromFile
e

DEBUG:py4j.clientserver:Answer received: !ym
2025-05-11 15:35:07 [py4j.clientserver] DEBUG: Answer received: !ym
DEBUG:py4j.clientserver:Command to send: c
z:org.apache.spark.api.python.PythonRDD
readRDDFromFile
ro13
s/tmp/spark-ea4e815a-3a95-4498-9640-376ec7e2ccce/pyspark-b974c492-a8d1-4998-a256-89bda405ea29/tmp16c0t_8g
i4
e

2025-05-11 15:35:07 [py4j.clientserver] DEBUG: Command to send: c
z:org.apache.spark.api.python.PythonRDD
readRD

## 5. Process and Save Results

In [26]:
from pyspark.sql import Row
import datetime

# Separate successes and failures
successes = [r for r in results if 'error' not in r and r.get('links')]
failures = [r for r in results if 'error' in r]

# Convert to DataFrame
clean_rows = [
    Row(
        url=r['original_url'],
        final_url=r.get('final_url', ''),
        status_code=r.get('status_code', ''),
        content_type=r.get('content_type', ''),
        title=r.get('title', 'No Title'),
        meta_description=r.get('meta_description', ''),
        meta_keywords=r.get('meta_keywords', ''),
        link_count=r.get('link_count', 0),
        links=", ".join(r.get('links', [])),
        image_count=r.get('image_count', 0),
        images=", ".join(r.get('images', [])),
        script_count=r.get('script_count', 0),
        scripts=", ".join(r.get('scripts', []))
    )
    for r in successes
]

clean_df = spark.createDataFrame(clean_rows)

# Save results
def save_clean_df(df):
    import shutil
    output_csv_path = "cleaned_crawled_data_csv"
    output_parquet_path = "cleaned_crawled_data_parquet"

    shutil.rmtree(output_csv_path, ignore_errors=True)
    shutil.rmtree(output_parquet_path, ignore_errors=True)

    if df.count() > 0:
        df.write.mode("overwrite").option("header", True).csv(output_csv_path)
        df.write.mode("overwrite").parquet(output_parquet_path)
        df.limit(5).toPandas().to_csv("sample_cleaned_crawled_data.csv", index=False)
        print("✅ Cleaned crawl data saved to CSV, Parquet, and sample CSV.")
    else:
        print("⚠️ No data to save: clean_df is empty.")

# Pass the 'clean_df' DataFrame to the function
save_clean_df(clean_df) # This line was changed to include the DataFrame as argument.

DEBUG:py4j.clientserver:Command to send: r
u
SparkSession
rj
e

2025-05-11 15:35:20 [py4j.clientserver] DEBUG: Command to send: r
u
SparkSession
rj
e

DEBUG:py4j.clientserver:Answer received: !ycorg.apache.spark.sql.SparkSession
2025-05-11 15:35:20 [py4j.clientserver] DEBUG: Answer received: !ycorg.apache.spark.sql.SparkSession
DEBUG:py4j.clientserver:Command to send: r
m
org.apache.spark.sql.SparkSession
setActiveSession
e

2025-05-11 15:35:20 [py4j.clientserver] DEBUG: Command to send: r
m
org.apache.spark.sql.SparkSession
setActiveSession
e

DEBUG:py4j.clientserver:Answer received: !ym
2025-05-11 15:35:20 [py4j.clientserver] DEBUG: Answer received: !ym
DEBUG:py4j.clientserver:Command to send: c
z:org.apache.spark.sql.SparkSession
setActiveSession
ro24
e

2025-05-11 15:35:20 [py4j.clientserver] DEBUG: Command to send: c
z:org.apache.spark.sql.SparkSession
setActiveSession
ro24
e

DEBUG:py4j.clientserver:Answer received: !yv
2025-05-11 15:35:20 [py4j.clientserver] DEBUG: Answer receiv

✅ Cleaned crawl data saved to CSV, Parquet, and sample CSV.


In [27]:
from google.colab import files
import zipfile
import os

# Zip folders for easier download
def zip_and_download_folder(folder_path, zip_name):
    zipf = zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED)
    for root, dirs, files_in_dir in os.walk(folder_path):
        for file in files_in_dir:
            full_path = os.path.join(root, file)
            relative_path = os.path.relpath(full_path, folder_path)
            zipf.write(full_path, os.path.join(os.path.basename(folder_path), relative_path))
    zipf.close()
    files.download(zip_name)

# Download zipped folders
zip_and_download_folder("cleaned_crawled_data_csv", "cleaned_crawled_data_csv.zip")
zip_and_download_folder("cleaned_crawled_data_parquet", "cleaned_crawled_data_parquet.zip")

# Download sample CSV file
files.download("sample_cleaned_crawled_data.csv")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# **6. Spider Scrapy**

In [99]:
!python3 my_spider.py

# **7. download Scrapy data Json**

In [100]:
from google.colab import files
files.download("scraped_data.json")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>