### Import Library

In [None]:
# load libraries
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from pymongo import MongoClient
import time
import os
import zipfile
import xmltodict
import json
import xml.etree.ElementTree as ET
import tempfile
import pymongo
from datetime import datetime


### Scrapping data laporan keuangan dari IDX
Langkah yang dilakukan :
1. Automasi web menggunakan selenium untuk mendownload file instance.zip di folder downloads (akan membuat folder baru jika belum ada)
2. extract zip file lalu parsing file taxonomy dengan xml
3. setelah berhasil di parsing, output akan disimpan di financial_reports.json

In [None]:
# === Konfigurasi Paths ===
CURRENT_DIR = os.getcwd()
BASE_DIR = os.path.join(CURRENT_DIR, "downloads")
os.makedirs(BASE_DIR, exist_ok=True)

# Location to save the final JSON data
JSON_OUTPUT_FILE = os.path.join(CURRENT_DIR, "financial_reports.json")

# Load existing data if file exists
existing_reports = []
already_scraped_companies = set()
if os.path.exists(JSON_OUTPUT_FILE):
    try:
        with open(JSON_OUTPUT_FILE, 'r') as f:
            existing_reports = json.load(f)
            print(f"Loaded {len(existing_reports)} existing reports from {JSON_OUTPUT_FILE}")
            # Create a set of companies that have already been scraped
            already_scraped_companies = {report['company'] for report in existing_reports}
            print(f"Already scraped {len(already_scraped_companies)} companies")
    except json.JSONDecodeError:
        print(f"Error loading {JSON_OUTPUT_FILE}, will create a new one")
        existing_reports = []

# === Konfigurasi Selenium WebDriver ===
options = webdriver.ChromeOptions()
options.add_experimental_option("prefs", {
    "download.default_directory": BASE_DIR,
    "download.prompt_for_download": False,
    "download.directory_upgrade": True,
    "safebrowsing.enabled": True
})
options.add_experimental_option("detach", True)  # Keep browser open
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)

# === URL IDX Laporan Keuangan ===
url = "https://www.idx.co.id/id/perusahaan-tercatat/laporan-keuangan-dan-tahunan"

# open the emiten_list.json file
with open('emiten_list.json') as f:
    data = json.load(f)
    company_codes = [company_code.split(".")[0] for company_code in data]

# Filter out companies that have already been scraped
companies_to_scrape = [company for company in company_codes if company not in already_scraped_companies]
print(f"Will scrape {len(companies_to_scrape)} remaining companies out of {len(company_codes)} total")

# === Fungsi Konversi XML ke Dictionary ===
def xml_to_dict(element):
    """Mengubah XML menjadi dictionary secara rekursif"""
    data = {}
    for child in element:
        tag = child.tag.split("}")[-1]  # Hapus namespace
        if len(child) > 0:
            data[tag] = xml_to_dict(child)
        else:
            data[tag] = child.text
    return data

# === Fungsi Parsing Taxonomy (.xsd) ===
def parse_taxonomy(xsd_path):
    """Parsing file Taxonomy (.xsd) untuk mendapatkan definisi elemen"""
    if not os.path.exists(xsd_path):
        print(f"❌ Taxonomy file {xsd_path} not found!")
        return {}

    try:
        tree = ET.parse(xsd_path)
        root = tree.getroot()
        taxonomy_dict = {}

        for elem in root.iter():
            tag = elem.tag.split("}")[-1]  # Hapus namespace
            if "name" in elem.attrib:
                taxonomy_dict[elem.attrib["name"]] = {
                    "type": elem.attrib.get("type", "unknown"),
                    "documentation": elem.attrib.get("documentation", "No description")
                }

        print(f"✅ Parsed taxonomy {xsd_path}")
        return taxonomy_dict

    except Exception as e:
        print(f"❌ Error parsing taxonomy: {e}")
        return {}

# Dictionary to store all financial reports data, initialize with existing data
all_financial_reports = existing_reports

# === Proses Scraping, Download, Ekstraksi, Parsing, dan Simpan ke JSON ===
for company in companies_to_scrape:
    try:
        driver.get(url)
        wait = WebDriverWait(driver, 10)

        # Input kode perusahaan
        search_box = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "input.vs__search")))
        search_box.clear()
        search_box.send_keys(company)
        time.sleep(1)

        # Pilih hasil pertama
        first_suggestion = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".vs__dropdown-option")))
        first_suggestion.click()

        # Pilih filter laporan keuangan, saham, tahun 2024, tahunan
        driver.find_element(By.ID, "FinancialStatement").click()
        driver.find_element(By.ID, "TypeSaham").click()
        driver.find_element(By.ID, "year1").click()
        driver.find_element(By.ID, "period3").click()
        driver.find_element(By.XPATH, "//button[contains(text(), 'Terapkan')]").click()

        # Tunggu tabel laporan muncul
        wait.until(EC.presence_of_element_located((By.TAG_NAME, "table")))

        # Cari dan download file "instance.zip"
        rows = driver.find_elements(By.TAG_NAME, "tr")
        for row in rows:
            if "instance.zip" in row.text:
                link_element = row.find_element(By.TAG_NAME, "a")
                driver.execute_script("arguments[0].scrollIntoView();", link_element)
                driver.execute_script("arguments[0].click();", link_element)
                print(f"✅ Download started for {company}")
                break

        # Tunggu beberapa detik agar download selesai
        time.sleep(5)

        # === Ekstraksi File ZIP ===
        zip_path = os.path.join(BASE_DIR, "instance.zip")
        extract_dir = os.path.join(BASE_DIR, company)
        os.makedirs(extract_dir, exist_ok=True)

        if os.path.exists(zip_path):
            try:
                with zipfile.ZipFile(zip_path, "r") as zip_ref:
                    zip_ref.extractall(extract_dir)
                print(f"✅ Extracted {company} instance.zip")
            except zipfile.BadZipFile:
                print(f"❌ Error: {company} instance.zip is not a valid ZIP file")
                continue  # Lewati perusahaan ini jika ZIP rusak
            finally:
                os.remove(zip_path)  # Hapus ZIP setelah ekstraksi

        # === Parsing Taxonomy ===
        xsd_path = os.path.join(extract_dir, "taxonomy.xsd")
        taxonomy_dict = parse_taxonomy(xsd_path)

        # === Konversi XBRL ke JSON & Simpan ke Dictionary ===
        xbrl_path = os.path.join(extract_dir, "instance.xbrl")

        if os.path.exists(xbrl_path):
            try:
                tree = ET.parse(xbrl_path)
                root = tree.getroot()
                xbrl_dict = xml_to_dict(root)

                # Gabungkan XBRL dengan Taxonomy
                enriched_data = {
                    "company": company,
                    "timestamp": datetime.now().isoformat(),
                    "taxonomy": taxonomy_dict,
                    "xbrl_data": xbrl_dict
                }

                # Add to our collection of all data
                all_financial_reports.append(enriched_data)
                print(f"✅ {company} data added to collection!")

            except Exception as e:
                print(f"❌ Error processing {company}.xbrl: {e}")
        else:
            print(f"❌ instance.xbrl not found for {company}")

        if len(all_financial_reports) % 5 == 0:
            with open(JSON_OUTPUT_FILE, 'w') as f:
                json.dump(all_financial_reports, f, indent=2)
            print(f"Interim save: {len(all_financial_reports)} reports saved to {JSON_OUTPUT_FILE}")

    except Exception as e:
        print(f"❌ Error processing {company}: {e}")
        # Save progress after any error to ensure we don't lose data
        with open(JSON_OUTPUT_FILE, 'w') as f:
            json.dump(all_financial_reports, f, indent=2)

# === Selesai, Simpan ke JSON dan Tutup Browser ===
with open(JSON_OUTPUT_FILE, 'w') as f:
    json.dump(all_financial_reports, f, indent=2)

print(f"📂 Saved {len(all_financial_reports)} financial reports to {JSON_OUTPUT_FILE}")
driver.quit()
print(f"Browser closed.")

### Ingestion ke MongoDB

In [None]:
# Start timing
start_time = time.time()

# Load data from JSON file
print("Loading data from JSON file...")
try:
    with open("financial_reports.json", "r") as f:
        all_financial_reports = json.load(f)
    print(f"Loaded {len(all_financial_reports)} financial reports from JSON file")
except FileNotFoundError:
    print("Error: financial_reports.json not found!")
    all_financial_reports = []
except json.JSONDecodeError:
    print("Error: Invalid JSON format in financial_reports.json!")
    all_financial_reports = []

if all_financial_reports:
    # Connect to MongoDB Atlas
    connection_string = "mongodb+srv://kelompok-5:FwJP0h7Bo6cTpEol@big-data.do3of.mongodb.net/?retryWrites=true&w=majority&ssl=true"
    client = pymongo.MongoClient(connection_string, 
                               maxPoolSize=100,  # Increase connection pool
                               retryWrites=True)

    # Select database and collection
    db = client["Big_Data_kel_5"]  # Database name
    collection = db["Data_LaporanKeuangan"]     # Collection name

    # Create index for faster lookups if it doesn't exist
    collection.create_index([("company", 1)], unique=True, background=True)

    # Get all existing company records in one query
    print("Fetching existing records...")
    existing_companies = set()
    for doc in collection.find({}, {"company": 1, "_id": 0}):
        existing_companies.add(doc["company"])

    print(f"Found existing records for {len(existing_companies)} companies")

    # Prepare bulk operations
    bulk_ops = []
    new_record_count = 0
    update_count = 0
    batch_size = 20  # Smaller batch size for large documents

    print("Preparing bulk operations...")
    for record in all_financial_reports:
        company = record["company"]
        
        # Check if this company already exists
        if company in existing_companies:
            # Update the existing record
            bulk_ops.append(
                pymongo.UpdateOne(
                    {"company": company},
                    {"$set": record}
                )
            )
            update_count += 1
        else:
            # Insert new record
            bulk_ops.append(pymongo.InsertOne(record))
            new_record_count += 1
            existing_companies.add(company)  # Add to tracking set
        
        # Execute batch if reached batch size
        if len(bulk_ops) >= batch_size:
            if bulk_ops:
                result = collection.bulk_write(bulk_ops, ordered=False)
                print(f"Processed batch: {result.inserted_count} inserted, {result.modified_count} modified")
                bulk_ops = []

    # Insert any remaining operations
    if bulk_ops:
        result = collection.bulk_write(bulk_ops, ordered=False)
        print(f"Processed final batch: {result.inserted_count} inserted, {result.modified_count} modified")

    elapsed_time = time.time() - start_time
    print(f"Completed MongoDB ingestion process. Inserted {new_record_count} new records and updated {update_count} records in {elapsed_time:.2f} seconds")
else:
    print("No data to ingest to MongoDB.")