In [73]:
# URL canonization del projekta

from urllib.parse import urlparse, urlunparse

def canonicalize_url(url):
    parsed = urlparse(url)

    # Convert scheme + domain to lowercase
    scheme = parsed.scheme.lower()
    netloc = parsed.netloc.lower()

    # Remove "www." prefix if present
    if netloc.startswith("www."):
        netloc = netloc[4:]

    # Remove fragment (#section)
    path = parsed.path
    query = parsed.query  # We keep query parameters for now

    # Remove trailing slash (except for root "/")
    if path != "/" and path.endswith("/"):
        path = path[:-1]

    # Rebuild the URL
    canonical_url = urlunparse((scheme, netloc, path, "", query, ""))
    
    return canonical_url


In [None]:
import requests
from datetime import datetime

def download_page(url):
    try:
        canonical_url = canonicalize_url(url)  # Canonicalize before request
        response = requests.get(canonical_url, timeout=10, headers={'User-Agent': 'MyCrawler/1.0'})
        accessed_time = datetime.utcnow()
        return {
            'url': canonical_url,  # Always return the canonical version
            'status_code': response.status_code,
            'html_content': response.text,
            'accessed_time': accessed_time
        }
    except requests.RequestException as e:
        print(f"Error downloading {url}: {e}")
        return {
            'url': canonical_url,  # Ensure consistency even in errors
            'status_code': None,
            'html_content': '',
            'accessed_time': datetime.utcnow()
        }


In [15]:
result = download_page("https://sl.wikipedia.org/wiki/Tone_Pav%C4%8Dek")
print(result)


{'url': 'https://sl.wikipedia.org/wiki/Tone_Pav%C4%8Dek', 'status_code': 200, 'html_content': '<!DOCTYPE html>\n<html class="client-nojs vector-feature-language-in-header-enabled vector-feature-language-in-main-page-header-disabled vector-feature-page-tools-pinned-disabled vector-feature-toc-pinned-clientpref-1 vector-feature-main-menu-pinned-disabled vector-feature-limited-width-clientpref-1 vector-feature-limited-width-content-enabled vector-feature-custom-font-size-clientpref-1 vector-feature-appearance-pinned-clientpref-1 vector-feature-night-mode-enabled skin-theme-clientpref-day vector-sticky-header-enabled vector-toc-available" lang="sl" dir="ltr">\n<head>\n<meta charset="UTF-8">\n<title>Tone Pavček - Wikipedija, prosta enciklopedija</title>\n<script>(function(){var className="client-js vector-feature-language-in-header-enabled vector-feature-language-in-main-page-header-disabled vector-feature-page-tools-pinned-disabled vector-feature-toc-pinned-clientpref-1 vector-feature-main

In [None]:
import psycopg2

conn = psycopg2.connect(
    dbname='wier',
    user='user',
    password='SecretPassword',
    host='localhost',
    port='5433'
)
conn.autocommit = True
cursor = conn.cursor()

# nastavimo schema crawldb kot trenutno
cursor.execute("SET search_path TO crawldb;")


In [None]:
from urllib.parse import urlparse

def get_or_create_site_id(url):
    parsed_url = urlparse(url)
    domain = parsed_url.netloc.lower()  # Normalize to lowercase

    # Remove "www." prefix (optional, but common in canonicalization)
    if domain.startswith("www."):
        domain = domain[4:]

    cursor.execute("SELECT id FROM site WHERE domain = %s", (domain,))
    result = cursor.fetchone()
    
    if result:
        return result[0]
    else:
        cursor.execute(
            "INSERT INTO site (domain) VALUES (%s) RETURNING id", (domain,)
        )
        return cursor.fetchone()[0]


In [None]:
def store_page_data(page_data):
    canonical_url = canonicalize_url(page_data['url'])  # Canonicalize first
    site_id = get_or_create_site_id(canonical_url)  # Use canonical URL

    cursor.execute("""
        INSERT INTO page (site_id, page_type_code, url, html_content, http_status_code, accessed_time)
        VALUES (%s, %s, %s, %s, %s, %s)
    """, (
        site_id,
        'HTML',
        canonical_url,  # Store only the canonicalized URL
        page_data['html_content'],
        page_data['status_code'],
        page_data['accessed_time']
    ))


In [23]:
result = download_page("https://slo-tech.com/")
store_page_data(result)


In [25]:
from bs4 import BeautifulSoup

def extract_page_data(page_id, html_content, cursor):
    soup = BeautifulSoup(html_content, 'html.parser')
    
    # Extract title
    title_tag = soup.title.string if soup.title else ''
    if title_tag:
        cursor.execute("""
            INSERT INTO page_data (page_id, data_type_code, data)
            VALUES (%s, %s, %s)
        """, (page_id, 'TITLE', title_tag.strip()))

    # Meta description
    meta_desc = soup.find('meta', attrs={'name': 'description'})
    if meta_desc and meta_desc.get('content'):
        cursor.execute("""
            INSERT INTO page_data (page_id, data_type_code, data)
            VALUES (%s, %s, %s)
        """, (page_id, 'DESCRIPTION', meta_desc['content'].strip()))
    
    # Meta keywords
    meta_keywords = soup.find('meta', attrs={'name': 'keywords'})
    if meta_keywords and meta_keywords.get('content'):
        cursor.execute("""
            INSERT INTO page_data (page_id, data_type_code, data)
            VALUES (%s, %s, %s)
        """, (page_id, 'KEYWORDS', meta_keywords['content'].strip()))


In [77]:
from urllib.parse import urljoin, urlparse

def extract_links(page_id, base_url, html_content, cursor):
    soup = BeautifulSoup(html_content, 'html.parser')
    links = soup.find_all('a', href=True)

    for tag in links:
        href = tag['href']
        full_url = urljoin(base_url, href).split('#')[0]  # Normalize URL, strip fragments
        full_url = canonicalize_url(full_url)  # Apply canonicalization

        # Lookup to_page in DB
        cursor.execute("SELECT id FROM page WHERE url = %s", (full_url,))
        result = cursor.fetchone()
        if result:
            to_page_id = result[0]
            cursor.execute("""
                INSERT INTO link (from_page, to_page)
                VALUES (%s, %s)
                ON CONFLICT DO NOTHING
            """, (page_id, to_page_id))


In [89]:
''' nek fix za data_type ce bom se kdaj rabu
cursor.execute("""
    INSERT INTO data_type (code) VALUES
    ('TITLE'),
    ('DESCRIPTION'),
    ('KEYWORDS')
    ON CONFLICT DO NOTHING;
""")
'''

# Test data extractiona

url = canonicalize_url("https://slo-tech.com/")  # Canonicalize before use
result = download_page(url)

# Store page and get new page_id
site_id = get_or_create_site_id(result['url'])  # Ensure site is stored in canonical form

# Currently, we update the page if it already exists (later, we can change to ON CONFLICT DO NOTHING)
cursor.execute("""
    INSERT INTO page (site_id, page_type_code, url, html_content, http_status_code, accessed_time)
    VALUES (%s, %s, %s, %s, %s, %s)
    ON CONFLICT (url) DO UPDATE SET
        html_content = EXCLUDED.html_content,
        http_status_code = EXCLUDED.http_status_code,
        accessed_time = EXCLUDED.accessed_time
    RETURNING id
""", (
    site_id,
    'HTML',
    result['url'],  # Ensure stored URL is canonical
    result['html_content'],
    result['status_code'],
    result['accessed_time']
))
page_id = cursor.fetchone()[0]

# Extract metadata and links
extract_page_data(page_id, result['html_content'], cursor)
extract_links(page_id, result['url'], result['html_content'], cursor)  # Canonicalization should already be applied in extract_links



In [None]:
# Preferential crawler del projekta

from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# Set your target description (what the crawler is looking for)
TARGET_DESCRIPTION = "ChatGPT"

# Compute BoW for target once
vectorizer = CountVectorizer(stop_words='english')
target_vector = vectorizer.fit_transform([TARGET_DESCRIPTION])

def score_url(url):
    score = 0

    # Keyword-based scoring
    keywords = ['forum', 'news', 'tech']
    if any(keyword in url.lower() for keyword in keywords):
        score += 10

    # URL depth scoring
    path_depth = urlparse(url).path.count('/')
    score += max(0, 5 - path_depth)  # Higher score for shallower paths

    # Additional scoring criteria can be added here

    return score


In [2]:
import re
from bs4 import BeautifulSoup
from urllib.parse import urljoin

def extract_links_to_frontier(base_url, html_content, cursor):
    base_url = canonicalize_url(base_url)  # Canonicalize base URL
    soup = BeautifulSoup(html_content, 'html.parser')
    links = soup.find_all(['a', 'div', 'span', 'button'], href=True) + soup.find_all(onclick=True)
    new_links_count = 0

    for tag in links:
        href = None

        # Standard link via <a href="...">
        if tag.has_attr('href'):
            href = tag['href']

        # JS onclick event extraction
        elif tag.has_attr('onclick'):
            onclick_content = tag['onclick']
            # Regex to extract URL from location.href or document.location
            match = re.search(r"(?:location\.href|document\.location)\s*=\s*[\"']([^\"']+)[\"']", onclick_content)
            if match:
                href = match.group(1)

        if href:
            full_url = urljoin(base_url, href).split('#')[0].strip()
            full_url = canonicalize_url(full_url)
            priority = score_url(full_url)  # alculate priority based on scoring logic

            try:
                cursor.execute("""
                    INSERT INTO url_frontier (url, priority)
                    VALUES (%s, %s)
                    ON CONFLICT (url) DO UPDATE SET priority = EXCLUDED.priority
                """, (full_url, priority))  # Insert or update priority
                new_links_count += cursor.rowcount
            except Exception as e:
                print(f"Error inserting link into frontier: {e}")

    print(f"Discovered {new_links_count} new link(s) added to frontier with priority.")


In [91]:
#test link extractiona za frontier
extract_links_to_frontier(result['url'], result['html_content'], cursor)

Discovered 28 new link(s) added to frontier (including JS links).


In [79]:
#duplication check s hashiranjem
import hashlib

def compute_content_hash(html_content):
    return hashlib.sha256(html_content.encode('utf-8')).hexdigest()

def store_page_with_duplicate_detection(result, from_page_id=None):
    url = canonicalize_url(result['url'])  # Canonicalize before using
    site_id = get_or_create_site_id(url)

    # Handle Binary Files First
    if result.get('is_binary', False):
        # Store as BINARY, no html_content
        cursor.execute("""
            INSERT INTO page (site_id, page_type_code, url, http_status_code, accessed_time)
            VALUES (%s, %s, %s, %s, %s)
            RETURNING id
        """, (
            site_id,
            'BINARY',
            url,  # Store canonicalized URL
            result['status_code'],
            result['accessed_time']
        ))
        page_id = cursor.fetchone()[0]
        print(f"Stored BINARY page_id={page_id} for {url}")

        # Log filename in page_data
        filename = url.split('/')[-1]
        cursor.execute("""
            INSERT INTO page_data (page_id, data_type_code, data)
            VALUES (%s, %s, %s)
        """, (page_id, 'FILENAME', filename))

        return page_id  # Return early to skip hashing

    # Proceed with HTML page logic
    html_content = result['html_content']
    if html_content is None:
        print(f"Warning: HTML content is None for {url}")
        return None  # Avoid error if somehow HTML is still None

    content_hash = compute_content_hash(html_content)  # Only hash non-binary files

    # Check if URL already exists
    cursor.execute("SELECT id, content_hash FROM page WHERE url = %s", (url,))  # Use canonicalized URL
    existing_page = cursor.fetchone()

    if existing_page:
        page_id = existing_page[0]
        cursor.execute("""
            UPDATE page
            SET html_content = %s,
                http_status_code = %s,
                accessed_time = %s,
                content_hash = %s
            WHERE id = %s
        """, (
            html_content,
            result['status_code'],
            result['accessed_time'],
            content_hash,
            page_id
        ))
        print(f"Updated page_id={page_id} content.")

    else:
        # Check for duplicate content (hash)
        cursor.execute("SELECT id FROM page WHERE content_hash = %s", (content_hash,))
        duplicate_page = cursor.fetchone()

        if duplicate_page:
            cursor.execute("""
                INSERT INTO page (site_id, page_type_code, url, http_status_code, accessed_time, content_hash)
                VALUES (%s, %s, %s, %s, %s, %s)
                RETURNING id
            """, (
                site_id,
                'DUPLICATE',
                url,  # Store canonicalized URL
                result['status_code'],
                result['accessed_time'],
                content_hash
            ))
            page_id = cursor.fetchone()[0]
            print(f"Stored DUPLICATE page_id={page_id} of {duplicate_page[0]}")
        else:
            cursor.execute("""
                INSERT INTO page (site_id, page_type_code, url, html_content, http_status_code, accessed_time, content_hash)
                VALUES (%s, %s, %s, %s, %s, %s, %s)
                RETURNING id
            """, (
                site_id,
                'HTML',
                url,  # Store canonicalized URL
                html_content,
                result['status_code'],
                result['accessed_time'],
                content_hash
            ))
            page_id = cursor.fetchone()[0]
            print(f"Stored page_id={page_id} as new HTML page.")

    # Insert into link table if from_page exists
    if from_page_id:
        from_page_id = canonicalize_url(from_page_id)  # Ensure links use canonicalized URLs
        cursor.execute("""
            INSERT INTO link (from_page, to_page)
            VALUES (%s, %s)
            ON CONFLICT DO NOTHING;
        """, (from_page_id, page_id))

    return page_id


In [None]:
# test example
result = download_page("https://slo-tech.com/")
page_id = store_page_with_duplicate_detection(result)
extract_page_data(page_id, result['html_content'], cursor)
extract_links_to_frontier(result['url'], result['html_content'], cursor)

URL exists. Updated page_id=1 content.
Discovered 0 new link(s) added to frontier.


In [80]:
import requests
from urllib.parse import urlparse
from urllib import robotparser

# robots.txt check

# Download and store robots.txt
def fetch_and_store_robots_txt(domain, cursor):
    domain = domain.lower()  # Normalize domain
    if domain.startswith("www."):  # Remove "www."
        domain = domain[4:]

    robots_url = f"https://{domain}/robots.txt"
    try:
        response = requests.get(robots_url, timeout=10)
        if response.status_code == 200:
            robots_content = response.text
            # Store in site.robots_content
            cursor.execute("""
                UPDATE site SET robots_content = %s WHERE domain = %s
            """, (robots_content, domain))
            print(f"robots.txt for {domain} stored.")
            return robots_content
        else:
            print(f"No robots.txt found for {domain}, status: {response.status_code}")
            return ''
    except Exception as e:
        print(f"Error fetching robots.txt for {domain}: {e}")
        return ''

# Check if crawling is allowed
def is_allowed(url, cursor):
    parsed_url = urlparse(url)
    domain = parsed_url.netloc.lower()  # Normalize domain
    if domain.startswith("www."):  # Remove "www."
        domain = domain[4:]

    # Get robots_content from DB
    cursor.execute("SELECT robots_content FROM site WHERE domain = %s", (domain,))
    result = cursor.fetchone()
    robots_content = result[0] if result and result[0] else fetch_and_store_robots_txt(domain, cursor)

    rp = robotparser.RobotFileParser()
    rp.parse(robots_content.splitlines())
    rp.set_url(f"https://{domain}/robots.txt")

    # Set crawl delay fallback: 5 sec min
    crawl_delay = rp.crawl_delay("*")
    delay = max(crawl_delay or 0, 5)

    allowed = rp.can_fetch("*", url)
    return allowed, delay

In [81]:
# test za robots.txt
url = "https://slo-tech.com/"
allowed, delay = is_allowed(url, cursor)
print(f"Allowed: {allowed}, Crawl-delay: {delay} seconds")


Allowed: True, Crawl-delay: 5 seconds


In [None]:
# delay zahtevka na 5 sekund
import time
from datetime import datetime, timedelta
import socket

def get_ip(domain):
    domain = domain.lower()  # Normalize domain
    if domain.startswith("www."):  # Remove "www."
        domain = domain[4:]

    try:
        return socket.gethostbyname(domain)
    except:
        return domain  # Fallback if DNS fails

last_access_times = {}  # domain_or_ip : datetime

def enforce_crawl_delay(url, delay_sec):
    parsed_url = urlparse(url)
    domain = parsed_url.netloc.lower()  # Normalize domain
    if domain.startswith("www."):  # Remove "www."
        domain = domain[4:]

    ip = get_ip(domain)
    key = ip  # You can also use domain if preferred

    now = datetime.utcnow()
    last_access = last_access_times.get(key, None)

    wait_time = 0
    if last_access:
        elapsed = (now - last_access).total_seconds()
        if elapsed < delay_sec:
            wait_time = delay_sec - elapsed
            print(f"Waiting {wait_time:.2f} seconds before crawling {url}")
            time.sleep(wait_time)

    # Update last access
    last_access_times[key] = datetime.utcnow()

In [None]:
'''
# Test example with robots.txt and delay handling
url = "https://slo-tech.com/"

allowed, delay = is_allowed(url, cursor)

if allowed:
    enforce_crawl_delay(url, delay)
    result = download_page(url)
    page_id = store_page_with_duplicate_detection(result)
    extract_page_data(page_id, result['html_content'], cursor)
    extract_links_to_frontier(result['url'], result['html_content'], cursor)
    # image extraction
    # extract_and_store_images(page_id, result['url'], result['html_content'], cursor)

    # Mark as crawled
    cursor.execute("""
        UPDATE url_frontier SET status = %s WHERE url = %s
    """, ('crawled', url))

else:
    print(f"URL disallowed by robots.txt: {url}")
    # Mark as failed
    cursor.execute("""
        UPDATE url_frontier SET status = %s WHERE url = %s
    """, ('failed', url))
    '''


URL exists. Updated page_id=1 content.
Discovered 0 new link(s) added to frontier.


In [82]:
# Multi-worker crawler del projekta

# crawl funkcija za en URL
def crawl_one_url(url, cursor):
    try:
        url = canonicalize_url(url)  # Canonicalize before use

        allowed, delay = is_allowed(url, cursor)
        if not allowed:
            print(f"[SKIP] Disallowed by robots.txt: {url}")
            cursor.execute("UPDATE url_frontier SET status = %s WHERE url = %s", ('failed', url))
            return

        enforce_crawl_delay(url, delay)
        result = download_page(url)  # Use canonicalized URL
        page_id = store_page_with_duplicate_detection(result)
        extract_page_data(page_id, result['html_content'], cursor)
        extract_links_to_frontier(result['url'], result['html_content'], cursor)

        cursor.execute("UPDATE url_frontier SET status = %s WHERE url = %s", ('crawled', url))
        print(f"[CRAWLED] {url}")

    except Exception as e:
        print(f"[ERROR] Failed to crawl {url}: {e}")
        cursor.execute("UPDATE url_frontier SET status = %s WHERE url = %s", ('failed', url))


In [None]:
from concurrent.futures import ThreadPoolExecutor
import psycopg2

# thread pool setup
def get_next_urls_to_crawl(cursor, limit=10):
    cursor.execute("""
        SELECT url FROM url_frontier
        WHERE status = 'queued'
        ORDER BY priority DESC, discovered_time ASC
        LIMIT %s
    """, (limit,))
    return [row[0] for row in cursor.fetchall()]


In [None]:
# main loop with workers

def start_crawler(num_workers):
    conn = psycopg2.connect(
        dbname='wier',
        user='user',
        password='SecretPassword',
        host='localhost',
        port='5433'
    )
    conn.autocommit = True
    cursor = conn.cursor()

    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        while True:
            urls = get_next_urls_to_crawl(cursor, limit=num_workers)
            if not urls:
                print("No URLs left to crawl. Frontier empty.")
                break

            # Submit crawl jobs
            futures = [executor.submit(crawl_one_url, url, cursor) for url in urls]

            # Wait for all to complete
            for future in futures:
                future.result()  # Raises exceptions if any

    cursor.close()
    conn.close()


In [None]:
# Image extraction del projekta

def extract_and_store_images(page_id, base_url, html_content, cursor):
    base_url = canonicalize_url(base_url)  # Canonicalize base URL
    soup = BeautifulSoup(html_content, 'html.parser')
    images = soup.find_all('img', src=True)
    count = 0

    for img_tag in images:
        img_src = img_tag['src']
        img_url = urljoin(base_url, img_src)
        img_url = img_url.strip()

        try:
            response = requests.get(img_url, timeout=10, headers={'User-Agent': 'MyCrawler/1.0'})
            if response.status_code == 200 and 'image' in response.headers.get('Content-Type', ''):
                content_type = response.headers.get('Content-Type')
                filename = img_url.split('/')[-1]
                data = response.content
                accessed_time = datetime.utcnow()

                cursor.execute("""
                    INSERT INTO image (page_id, filename, content_type, data, accessed_time)
                    VALUES (%s, %s, %s, %s, %s)
                """, (page_id, filename, content_type, psycopg2.Binary(data), accessed_time))

                count += 1
        except Exception as e:
            print(f"Error downloading image {img_url}: {e}")

    print(f"Extracted and stored {count} image(s) from {base_url}")



In [None]:
'''
# Test example with robots.txt and delay handling in image extraction
url = "https://slo-tech.com/"

allowed, delay = is_allowed(url, cursor)

if allowed:
    enforce_crawl_delay(url, delay)
    result = download_page(url)
    page_id = store_page_with_duplicate_detection(result)
    extract_page_data(page_id, result['html_content'], cursor)
    extract_links_to_frontier(result['url'], result['html_content'], cursor)
    # image extraction
    extract_and_store_images(page_id, result['url'], result['html_content'], cursor)

    # Mark as crawled
    cursor.execute("""
        UPDATE url_frontier SET status = %s WHERE url = %s
    """, ('crawled', url))

else:
    print(f"URL disallowed by robots.txt: {url}")
    # Mark as failed
    cursor.execute("""
        UPDATE url_frontier SET status = %s WHERE url = %s
    """, ('failed', url))
'''

URL exists. Updated page_id=1 content.
Discovered 0 new link(s) added to frontier.
Extracted and stored 21 image(s) from https://slo-tech.com/


In [83]:
# Binary file detection del projekta

# download + binary detection funkcija
def download_page_with_binary_detection(url):
    try:
        canonical_url = canonicalize_url(url)  # Canonicalize first

        response = requests.get(canonical_url, timeout=10, headers={'User-Agent': 'MyCrawler/1.0'}, stream=True)
        accessed_time = datetime.utcnow()
        content_type = response.headers.get('Content-Type', '').lower()
        is_binary = False

        # Extension check
        binary_extensions = ('.pdf', '.doc', '.docx', '.ppt', '.pptx')
        if canonical_url.lower().endswith(binary_extensions):
            is_binary = True

        # MIME check
        if any(binary_mime in content_type for binary_mime in ['application/pdf', 'application/msword', 'application/vnd']):
            is_binary = True

        # Prepare result
        result = {
            'url': canonical_url,  # Store canonicalized URL in result
            'status_code': response.status_code,
            'accessed_time': accessed_time,
            'is_binary': is_binary,
            'html_content': None if is_binary else response.text
        }

        return result

    except requests.RequestException as e:
        print(f"Error downloading {url}: {e}")
        return {
            'url': canonicalize_url(url),  # Ensure consistency even on error
            'status_code': None,
            'accessed_time': datetime.utcnow(),
            'is_binary': False,
            'html_content': ''
        }


In [84]:
# Test example with robots.txt and delay handling in image extraction in binary detection
url = canonicalize_url("https://www.princexml.com/samples/invoice/invoicesample.pdf")  # Canonicalize early

allowed, delay = is_allowed(url, cursor)

if allowed:
    enforce_crawl_delay(url, delay)
    result = download_page_with_binary_detection(url)
    page_id = store_page_with_duplicate_detection(result)

    if not result.get('is_binary', False):
        extract_page_data(page_id, result['html_content'], cursor)
        extract_links_to_frontier(result['url'], result['html_content'], cursor)
        extract_and_store_images(page_id, result['url'], result['html_content'], cursor)

    # Mark as crawled
    cursor.execute("""
        UPDATE url_frontier SET status = %s WHERE url = %s
    """, ('crawled', url))  # Use canonicalized URL

else:
    print(f"URL disallowed by robots.txt: {url}")
    # Mark as failed
    cursor.execute("""
        UPDATE url_frontier SET status = %s WHERE url = %s
    """, ('failed', url))  # Use canonicalized URL



robots.txt for princexml.com stored.
Stored BINARY page_id=15 for https://princexml.com/samples/invoice/invoicesample.pdf
