In [None]:
#Code to extract datasets from Common Crawl and ABR

In [None]:
import gzip
import json
import requests
import re

# ---------- CONFIG ----------
WET_INDEX_URL = "https://data.commoncrawl.org/crawl-data/CC-MAIN-2025-13/wet.paths.gz"
MAX_COMPANIES = 215000
OUTPUT_FILE = "aus_output.jsonl"
AU_DOMAINS = [".com.au", ".org.au", ".net.au", ".edu.au"]

# ---------- FUNCTION: Get WET File Paths ----------
def get_wet_paths(index_url):
    print("Downloading WET index...")
    paths = []
    with requests.get(index_url, stream=True) as r:
        with gzip.open(r.raw, mode='rt', encoding='utf-8', errors='ignore') as f:
            for line in f:
                paths.append(line.strip())
    print(f"Found {len(paths)} WET files.")
    return paths

# ---------- FUNCTION: Parse and Write Record Immediately ----------
def parse_and_write_record(record, url, outfile, seen_urls, total_count):
    try:
        content_start = False
        content_lines = []
        for rline in record:
            if content_start:
                content_lines.append(rline)
            if rline.strip() == "":
                content_start = True
        content = "\n".join(content_lines).strip()

        # Extract title
        title_match = re.search(r"<title>(.*?)</title>", content, re.IGNORECASE)
        if title_match:
            title = title_match.group(1).strip()
        else:
            lines = [l.strip() for l in content.splitlines() if l.strip()]
            title = lines[0] if lines else ""

        # Clean title
        if " - " in title:
            title = title.split(" - ")[0]
        elif " | " in title:
            title = title.split(" | ")[0]
        title = title.title()

        if any(domain in url for domain in AU_DOMAINS) and url not in seen_urls:
            entry = {"url": url.strip(), "title": title[:120], "industry": ""}
            json.dump(entry, outfile)
            outfile.write("\n")
            seen_urls.add(url)
            print(f"[{total_count}] Extracted: {entry}")
            return 1  # increment count

    except Exception as e:
        print(f"Error parsing record: {e}")
    return 0

# ---------- FUNCTION: Extract and Write AU Company Records ----------
def extract_and_stream(wet_file_url, outfile, seen_urls, total_count):
    try:
        with requests.get(wet_file_url, stream=True, timeout=60) as r:
            with gzip.open(r.raw, mode='rt', encoding='utf-8', errors='ignore') as f:
                record = []
                url = ""

                for line in f:
                    if line.startswith("WARC/1.0"):
                        if record and total_count < MAX_COMPANIES:
                            total_count += parse_and_write_record(record, url, outfile, seen_urls, total_count + 1)
                        record = [line]
                        url = ""
                    elif line.startswith("WARC-Target-URI:"):
                        url = line[len("WARC-Target-URI:"):].strip()
                        record.append(line)
                    else:
                        record.append(line)

                # Final record
                if record and total_count < MAX_COMPANIES:
                    total_count += parse_and_write_record(record, url, outfile, seen_urls, total_count + 1)

    except Exception as e:
        print(f"Error reading WET file: {e}")
    return total_count

# ---------- MAIN ----------
def main():
    paths = get_wet_paths(WET_INDEX_URL)
    seen_urls = set()
    total_count = 0

    with open(OUTPUT_FILE, "a", encoding="utf-8") as outfile:
        for idx, path in enumerate(paths):
            wet_url = f"https://data.commoncrawl.org/{path}"
            print(f"[{idx + 1}/{len(paths)}] Processing: {wet_url}")
            total_count = extract_and_stream(wet_url, outfile, seen_urls, total_count)

            print(f" -> Total records written: {total_count}")
            if total_count >= MAX_COMPANIES:
                print("🎯 Reached target. Stopping.")
                break

    print(f"\n✅ Finished. {total_count} records written to {OUTPUT_FILE}")

if __name__ == "__main__":
    main()


In [None]:
import xml.etree.ElementTree as ET
import csv
import os

# Set your folder path containing the XML files
folder_path = r"C:\Users\Vimalan\Downloads\common\abr_bulk_extract_10"  # Replace with your folder

# Prepare CSV
with open("abr_bulk_extract_10.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.writer(f)

    # Required CSV Header
    writer.writerow([
        "ABN (Australian Business Number)",
        "Entity Name",
        "Entity Type",
        "Entity Status",
        "Entity Address",
        "Entity Postcode",
        "Entity State",
        "Entity Start Date"
    ])

    # Loop through each XML file in the folder
    for filename in os.listdir(folder_path):
        if filename.endswith(".xml"):
            file_path = os.path.join(folder_path, filename)
            try:
                tree = ET.parse(file_path)
                root = tree.getroot()

                for abr in root.findall('ABR'):
                    abn_elem = abr.find('ABN')
                    abn = abn_elem.text if abn_elem is not None else ''
                    abn_status = abn_elem.attrib.get('status', '') if abn_elem is not None else ''
                    abn_start_date = abn_elem.attrib.get('ABNStatusFromDate', '') if abn_elem is not None else ''

                    entity_type_text = ''
                    entity_type = abr.find('EntityType')
                    if entity_type is not None:
                        entity_type_text = entity_type.findtext('EntityTypeText', default='')

                    entity_name = abr.findtext('./MainEntity/NonIndividualName/NonIndividualNameText', default='')

                    state = abr.findtext('./MainEntity/BusinessAddress/AddressDetails/State', default='')
                    postcode = abr.findtext('./MainEntity/BusinessAddress/AddressDetails/Postcode', default='')
                    entity_address = f"{state} {postcode}".strip()

                    # Write formatted row
                    writer.writerow([
                        abn,
                        entity_name,
                        entity_type_text,
                        abn_status,
                        entity_address,
                        postcode,
                        state,
                        abn_start_date
                    ])

                # Print the filename after processing
                print(f"Processed file: {filename}")

            except ET.ParseError as e:
                print(f"Error parsing {filename}: {e}")

print("CSV created with transformed data from all XML files.")


In [None]:
# schema design in postgres

# CREATE TABLE common_crawl_data (
#     industry VARCHAR(100),
#     title VARCHAR(255),
#     url VARCHAR(300) PRIMARY KEY
# );


# CREATE TABLE abr_data (
#     abn CHAR(11) PRIMARY KEY,
#     entity_name VARCHAR(255),
#     entity_type VARCHAR(100),
#     entity_status CHAR(3),
#     entity_address VARCHAR(255),
#     entity_postcode CHAR(4),
#     entity_state CHAR(3),
#     entity_start_date DATE
# );


In [2]:
# data loading in postgres

from pyspark.sql import SparkSession

# Initialize Spark session with PostgreSQL JDBC driver
spark = SparkSession.builder \
    .appName("Load Data into PostgreSQL") \
    .config("spark.jars", "postgresql-42.7.5.jar") \
    .config("spark.driver.extraClassPath", "postgresql-42.7.5.jar") \
    .getOrCreate()

# JDBC connection properties
url = "jdbc:postgresql://localhost:5432/test_db"
properties = {
    "user": "root",
    "password": "root",
    "driver": "org.postgresql.Driver"
}

# Load Common Crawl data in JSONL format
common_crawl_data = spark.read.json(r"C:\Users\Vimalan\Downloads\common\aus_output.jsonl")

# Load ABR data (CSV)
abr_data = spark.read.option("header", "true").csv(r"C:\Users\Vimalan\Downloads\common\abr_bulk_extract_10.csv")

# Rename columns for easier access in ABR data
abr_data = abr_data.withColumnRenamed("ABN (Australian Business Number)", "abn") \
                   .withColumnRenamed("Entity Name", "entity_name") \
                   .withColumnRenamed("Entity Type", "entity_type") \
                   .withColumnRenamed("Entity Status", "entity_status") \
                   .withColumnRenamed("Entity Address", "entity_address") \
                   .withColumnRenamed("Entity Postcode", "entity_postcode") \
                   .withColumnRenamed("Entity State", "entity_state") \
                   .withColumnRenamed("Entity Start Date", "entity_start_date")

# Perform any necessary transformations or renaming for Common Crawl data
common_crawl_data = common_crawl_data.select("industry", "title", "url")

# Perform any necessary transformations or renaming for ABR data
abr_data = abr_data.select("abn", "entity_name", "entity_type", "entity_status", 
                           "entity_address", "entity_postcode", "entity_state", "entity_start_date")

# Write the Common Crawl data to PostgreSQL (overwrite mode)
common_crawl_data.write.jdbc(url=url, table="common_crawl_data", mode="overwrite", properties=properties)

# Write the ABR data to PostgreSQL (overwrite mode)
abr_data.write.jdbc(url=url, table="abr_data", mode="overwrite", properties=properties)

print("Data ingestion into PostgreSQL completed successfully.")


Data ingestion into PostgreSQL completed successfully.
