In [0]:
import requests
import json
import pandas as pd
from urllib.parse import quote_plus
from warcio.archiveiterator import ArchiveIterator
import re
import concurrent.futures
import time
from bs4 import BeautifulSoup
import tqdm


SERVER = 'http://index.commoncrawl.org/'
INDEX_NAME = 'CC-MAIN-2025-08' 
USER_AGENT = 'australian-website-extractor/1.0 (Data research project)'


AU_DOMAINS = [
    '.com.au',
    '.net.au',
    '.gov.au',
    '.org.au',
    '.edu.au'
]

def search_cc_index(domain_suffix, limit=1000):
    """
    Search the Common Crawl index for URLs with the specified domain suffix.
    Returns a list of records containing URLs and their locations in WARC files.
    """
    encoded_url = quote_plus(f'*{domain_suffix}')
    index_url = f'{SERVER}{INDEX_NAME}-index?url={encoded_url}&output=json&limit={limit}'
    
    try:
        response = requests.get(index_url, headers={'user-agent': USER_AGENT})
        if response.status_code == 200:
            records = response.text.strip().split('\n')
            return [json.loads(record) for record in records if record.strip()]
        else:
            print(f"Failed to search index for {domain_suffix}: HTTP {response.status_code}")
            return []
    except Exception as e:
        print(f"Error searching index for {domain_suffix}: {e}")
        return []

def fetch_warc_record(record):
    """
    Fetch a specific WARC record using the information from an index record.
    Returns the HTML content of the record.
    """
    try:
        offset, length = int(record['offset']), int(record['length'])
        s3_url = f'https://data.commoncrawl.org/{record["filename"]}'
        byte_range = f'bytes={offset}-{offset+length-1}'
        
        response = requests.get(
            s3_url, 
            headers={'user-agent': USER_AGENT, 'Range': byte_range},
            stream=True,
            timeout=10
        )
        
        if response.status_code == 206:  # Partial Content
            stream = ArchiveIterator(response.raw)
            for warc_record in stream:
                if warc_record.rec_type == 'response':
                    content = warc_record.content_stream().read()
                    try:
                        return content.decode('utf-8', errors='ignore')
                    except UnicodeDecodeError:
                        return content.decode('latin-1', errors='ignore')
        else:
            print(f"Failed to fetch WARC record: HTTP {response.status_code}")
            return None
    except Exception as e:
        print(f"Error fetching WARC record: {e}")
        return None

def extract_company_information(html, url, timestamp=''):
    """
    Extract company information from HTML content.
    Returns a dictionary with company name, industry, and other metadata.
    
    Args:
        html: The HTML content of the page
        url: The URL of the page
        timestamp: The timestamp of when the page was crawled
    """
    try:
        soup = BeautifulSoup(html, 'html.parser')
        
        title = soup.title.text.strip() if soup.title else ""
        company_name = title
        
        for suffix in [' - Home', ' | Home', ' - Official Site', ' - Australia']:
            if suffix in company_name:
                company_name = company_name.split(suffix)[0].strip()
                break
        
        description = ""
        meta_desc = soup.find('meta', attrs={'name': 'description'})
        if meta_desc and meta_desc.get('content'):
            description = meta_desc['content'].strip()
        
        industry_keywords = {
            'finance': ['bank', 'finance', 'investment', 'insurance', 'wealth', 'mortgage','loans', 'credit'],
            'retail': ['shop', 'store', 'retail', 'ecommerce', 'products', 'buy','warehouse', 'fashion'],
            'technology': ['technology', 'software', 'IT', 'computing', 'digital', 'tech','robotics'],
            'healthcare': ['health', 'medical', 'hospital', 'clinic', 'care', 'patient','pharmacy', 'pharmaceutical'],
            'education': ['education', 'university', 'school', 'college', 'learn', 'training'],
            'government': ['government', 'council', 'department', 'agency', 'public','bureaucracy', 'administration'],
            'media': ['media', 'news', 'magazine','advertising', 'broadcast', 'publishing'],
            'tourism': [ 'tourism', 'accommodation', 'vacation','travel', 'hotel', 'airlines']
        }
        
        all_text = soup.get_text().lower()
        industry_detected = None
        
        for ind, keywords in industry_keywords.items():
            if any(keyword.lower() in all_text for keyword in keywords):
                industry_detected = ind
                break
        
        email_id_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
        contact_no_pattern = r'\b(?:\+?61|0)[2-478](?:[ -]?[0-9]){8}\b' 
        
        email_id = re.findall(email_id_pattern, html)
        contact_no = re.findall(contact_no_pattern, html)
        
        social_links = []
        social_link_patterns = ['facebook.com', 'twitter.com', 'linkedin.com', 'instagram.com', 'youtube.com']
        
        for link in soup.find_all('a', href=True):
            href = link['href']
            if any(pattern in href for pattern in social_link_patterns):
                social_links.append(href)
        
        return {
            'url': url,
            'company_name': company_name,
            'title': title,
            'description': description,
            'industry': industry_detected,
            'email_id': ';'.join(set(emails)),
            'contact_no': ';'.join(set(phones)),
            'social_links': ';'.join(set(social_links)),
            'snapshot_date': timestamp
        }
    except Exception as e:
        print(f"Error extracting company info from {url}: {e}")
        return {
            'url': url,
            'company_name': None,
            'title': None,
            'description': None,
            'industry': None,
            'email_id': None,
            'contact_no': None,
            'social_links': None,
            'snapshot_date': timestamp
        }

def process_record(record):
    """Process a single record from the Common Crawl index"""
    url = record.get('url', '')
    timestamp = record.get('timestamp', '')
    html = fetch_warc_record(record)
    
    if html:
        return extract_company_infomation(html, url, timestamp)
    else:
        return None

def collect_australian_websites(total_limit=20000, per_domain_limit=5000):
    """
    Collect information about Australian websites from Common Crawl.
    Returns a pandas DataFrame with the collected data.
    """
    all_results = []
    
    for domain_suffix in AU_DOMAINS:
        print(f"Searching for websites with domain suffix: {domain_suffix}")
        records = search_cc_index(domain_suffix, limit=per_domain_limit)
        
        if records:
            print(f"Found {len(records)} records for {domain_suffix}")
            
            with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
                results = list(tqdm.tqdm(
                    executor.map(process_record, records),
                    total=len(records),
                    desc=f"Processing {domain_suffix}"
                ))
                
                valid_results = [r for r in results if r and r.get('company_name')]
                all_results.extend(valid_results)
                
                print(f"Extracted information from {len(valid_results)} websites with {domain_suffix}")
                
                if len(all_results) >= total_limit:
                    break
        else:
            print(f"No records found for {domain_suffix}")
    
    if all_results:
        df = pd.DataFrame(all_results)
        df = df.drop_duplicates(subset=['url'])
        return df.head(total_limit)
    else:
        return pd.DataFrame()

def main():
    """Main function to run the Australian website data collection"""
    print(f"Starting Australian website data collection using Common Crawl index: {INDEX_NAME}")
    
    
    df = collect_australian_websites(total_limit=1000, per_domain_limit=200)
    
    if not df.empty:
        output_file = 'dbfs_file_Path/australian_websites.csv'
        df.to_csv(output_file, index=False)
        print(f"Successfully collected data for {len(df)} Australian websites")
        print(f"Data saved to {output_file}")
        
        if 'industry' in df.columns:
            industry_counts = df['industry'].value_counts()
            print("\nIndustry distribution:")
            for industry, count in industry_counts.items():
                if industry:
                    print(f"  {industry}: {count}")
    else:
        print("Failed to collect any data for Australian websites")

if __name__ == "__main__":
    main()


In [0]:
%python
%pip install lxml

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("XML Parsing") \
    .config("spark.sql.files.maxPartitionBytes", "128MB") \
    .getOrCreate()

# Define file path
abr_file_path = "dbfs_file_path/abr_data_filename.xml"

# Define schema
schema = StructType([
    StructField("ABN", StringType(), True),
    StructField("NonIndividualNameText", StringType(), True),
    StructField("State", StringType(), True),
    StructField("Postcode", StringType(), True)
])

# Function to parse XML and extract fields
def parse_abr(record):
    from lxml import etree
    try:
        root = etree.fromstring(record)
        abn = root.findtext(".//ABN")
        state = root.findtext(".//AddressDetails/State")
        postcode = root.findtext(".//AddressDetails/Postcode")
        non_individual_name = root.findtext(".//MainEntity/NonIndividualName/NonIndividualNameText")
        return (abn, non_individual_name, state, postcode)
    except etree.XMLSyntaxError:
        return (None, None, None, None)

# Load XML file as RDD
abr_raw_file = spark.sparkContext.textFile(abr_file_path)

# Parse XML using map
abr_raw_data = abr_raw_file.map(parse_abr)

# Filter out records with None ABN
filtered_abr_data = parsed_rdd.filter(lambda x: x[0] is not None)

# Convert to DataFrame
abr_data = spark.createDataFrame(filtered_abr_data, schema=schema)


In [0]:
from pyspark.sql.functions import trim, lower
from pyspark.sql.functions import col
from pyspark.sql.functions import broadcast

# Clean and normalize website data
auswebsites_raw= spark.read.csv(
    'dbfs_filePath/australian_websites.csv',
    header=True,
    inferSchema=True
)
auswebsites_cleaned = auswebsites_raw.select(
    trim(lower(col("company_name"))),
    trim(lower(col("industry"))),
    col("url"),
    col("emails"), 
    col("phones"),
    col("social_link")
)

cleaned_abrdata_df = abr_data.select(
    col("abn"),
    col("NonIndividualNameText").alias as ("company_name"),
    col("state"),
    col("postcode")
)

# Deduplicate cleaned website data based on URL and company_name
auswebsites_deduplicated = auswebsites_cleaned.dropDuplicates(["url", "company_name"])

# Deduplicate ABR data based on ABN
abr_data_deduplicated = cleaned_abrdata_df.dropDuplicates(["abn"])

# Broadcast ABR data for efficient joins
auswebsitedata = auswebsites_deduplicated.join(
    broadcast(abr_data_deduplicated),
    auswebsites_deduplicated.company_name == abr_data_deduplicated.company_name,
    "left"
)

# Repartition website data to balance load
auswebsites_partitioned = auswebsites_deduplicated.repartition(10, "company_name")

# Cache datasets for faster access
abr_data_deduplicated.cache()
auswebsites_partitioned.cache()

