In [2]:
import sqlite3
import xml.etree.ElementTree as ET
import requests
import zipfile
import os

def download_and_unzip(url, save_path, extract_to):
    # Ensure the directory for saving the ZIP file exists
    save_path = os.path.abspath(save_path)
    extract_to = os.path.abspath(extract_to)

    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    os.makedirs(extract_to, exist_ok=True)

    # Step 1: Check if the file already exists
    if not os.path.exists(save_path):
        # Download the file
        print(f"Downloading file from {url}...")
        response = requests.get(url, stream=True)
        if response.status_code == 200:
            with open(save_path, 'wb') as file:
                for chunk in response.iter_content(chunk_size=8192):
                    file.write(chunk)
            print(f"File downloaded and saved as {save_path}")
        else:
            print(f"Failed to download file. Status code: {response.status_code}")
            return
    else:
        print(f"File already exists at {save_path}, skipping download.")

    # Step 2: Unzip the file
    if zipfile.is_zipfile(save_path):
        print(f"Unzipping file to {extract_to}...")
        with zipfile.ZipFile(save_path, 'r') as zip_ref:
            zip_ref.extractall(extract_to)
        print(f"File successfully unzipped to {extract_to}")

        # Rename the extracted file
        extracted_files = os.listdir(extract_to)
        if extracted_files:
            old_file_path = os.path.join(extract_to, extracted_files[0])  # Assuming single file in ZIP
            new_file_path = os.path.join(extract_to, "lei.xml")
            os.rename(old_file_path, new_file_path)
            print(f"File renamed from {extracted_files[0]} to 'lei.xml'")

        # Step 3: Delete the ZIP file
        os.remove(save_path)
        print(f"Temporary ZIP file removed: {save_path}")
    else:
        print(f"The downloaded file is not a valid ZIP file.")
        # Optionally, remove invalid file
        if os.path.exists(save_path):
            os.remove(save_path)
            print(f"Invalid ZIP file removed: {save_path}")

In [3]:
lei_url = "https://leidata.gleif.org/api/v1/concatenated-files/lei2/get/36200/zip"
lei_save_path = "files/temp/lei.zip"
lei_extract_to = "files/lei_files"

download_and_unzip(lei_url, lei_save_path, lei_extract_to)

Downloading file from https://leidata.gleif.org/api/v1/concatenated-files/lei2/get/36200/zip...
File downloaded and saved as /Users/danish/PycharmProjects/ForwardAnalytics/files/temp/lei.zip
Unzipping file to /Users/danish/PycharmProjects/ForwardAnalytics/files/lei_files...
File successfully unzipped to /Users/danish/PycharmProjects/ForwardAnalytics/files/lei_files
File renamed from 20241116-gleif-concatenated-file-lei2.xml to 'lei.xml'
Temporary ZIP file removed: /Users/danish/PycharmProjects/ForwardAnalytics/files/temp/lei.zip


In [4]:
# Connect to SQLite database (or create one if it doesn't exist)
db_path = "db/lei_records.sqlite3"
conn = sqlite3.connect(db_path)
cursor = conn.cursor()

In [5]:
cursor.execute("""DROP INDEX IF EXISTS lei_records""")
cursor.execute("""DROP INDEX IF EXISTS legal_name""")

<sqlite3.Cursor at 0x12001c340>

In [6]:
# Create table to store LEI records
cursor.execute("""
    CREATE TABLE IF NOT EXISTS lei_records (
        lei TEXT TEXT PRIMARY KEY,
        legal_name TEXT
    )
""")
cursor.execute("""
    CREATE INDEX IF NOT EXISTS legal_name ON lei_records(legal_name)
""")
conn.commit()


In [7]:
# Function to process and commit LEI records in batches
def process_lei_records(xml_file, batch_size=10000):
    name_space = {"lei": "http://www.gleif.org/data/schema/leidata/2016"}
    records = []
    for event, elem in ET.iterparse(xml_file, events=("end",)):
        if elem.tag.endswith("LEIRecord"):
            lei_tag = elem.find("lei:LEI", name_space).text
            legal_name_tag = elem.find("lei:Entity/lei:LegalName", name_space).text

            # Add to batch
            records.append((lei_tag, legal_name_tag, ))
            # Commit batch to database
            if len(records) >= batch_size:
                cursor.executemany("""
                    INSERT OR IGNORE INTO lei_records (lei, legal_name)
                    VALUES (?, ?)
                """, records)
                conn.commit()
                records = []  # Clear the batch

            elem.clear()  # Clear memory for processed element

In [8]:
# Path to the large XML file
large_xml_file = "files/lei_files/lei.xml"
# Process the XML file and commit records in batches of 10000
process_lei_records(large_xml_file, batch_size=10000)

# Close the database connection
conn.close()

print(f"Data committed to database {db_path}.")

Data committed to database db/lei_records.sqlite3.
