In [None]:
import csv
from pymongo import MongoClient
import time
import os


DATABASE_NAME = "umls"                   # Your target database name
COLLECTION_NAME = "mrconso"              # Your target collection name

# File Settings
# IMPORTANT: Update this to the correct path of your MRCONSO.RRF file!
FILE_PATH = "/pipeline_datalake/umls-2025AA-metathesaurus-full/2025AA/META/MRCONSO.RRF"
FILE_ENCODING = 'utf-8'  # Common encoding for UMLS files, adjust if necessary
# If connecting from a Docker container to another Docker container named 'mongodb' (NO AUTHENTICATION):
mongouser = os.getenv('MONGO_INITDB_ROOT_USERNAME')
mongopass = os.getenv('MONGO_INITDB_ROOT_PASSWORD')
# Import Settings
BATCH_SIZE = 10000  # Number of documents to insert in each batch
DROP_COLLECTION_BEFORE_IMPORT = True # Set to False if you want to append to existing collection


### Loading MRCONSO in mongo

code from a previous personal project



In [None]:

# Connect to MongoDB
client = MongoClient(f"mongodb://{mongouser}:{mongopass}@mongodb:27017")


In [None]:

# Test the connection
try:
    # The ismaster command is cheap and does not require auth
    client.admin.command('ismaster')
    print("Successfully connected to MongoDB!")
    
    # List all databases
    print("\nAvailable databases:")
    for db in client.list_database_names():
        print(f"- {db}")
        
except Exception as e:
    print(f"Failed to connect to MongoDB: {e}")
 

In [None]:



FIELD_NAMES = [
    "CUI", "LAT", "TS", "LUI", "STT", "SUI", "ISPREF", "AUI",
    "SAUI", "SCUI", "SDUI", "SAB", "TTY", "CODE", "STR",
    "SRL", "SUPPRESS", "CVF"
]


def import_mrconso_to_mongodb():
    """
    Reads the MRCONSO.RRF file and imports its content into MongoDB in batches.
    """
    print(f"Starting import of '{FILE_PATH}' into MongoDB '{DATABASE_NAME}.{COLLECTION_NAME}'")

    try:
        # Ensure the MONGO_URI is correctly formatted.
        # This print statement attempts to hide credentials if they were present in the URI.
        
        db = client[DATABASE_NAME]
        collection = db[COLLECTION_NAME]
        
        # Ping the server to confirm connection.
        # If auth is truly off, this should succeed. If auth is on, this might succeed anomymously
        # but subsequent operations requiring auth will fail.
        client.admin.command('ping')
        print(f"Successfully pinged MongoDB server.")

    except Exception as e:
        print(f"Error connecting or pinging MongoDB: {e}")
        return

    if DROP_COLLECTION_BEFORE_IMPORT:
        try:
            print(f"Dropping existing collection '{COLLECTION_NAME}'...")
            collection.drop()
            print(f"Collection '{COLLECTION_NAME}' dropped.")
        except Exception as e:
            print(f"Error dropping collection: {e}")
            # If drop fails (e.g. due to auth if it's actually on), subsequent operations might also fail.

    documents_batch = []
    total_rows_processed = 0
    total_rows_inserted = 0
    batch_count = 0
    start_time = time.time()

    try:
        # Check if file exists
        if not os.path.exists(FILE_PATH):
            print(f"ERROR: File not found at '{FILE_PATH}'. Please check the path.")
            return

        with open(FILE_PATH, 'r', encoding=FILE_ENCODING) as file:
            reader = csv.reader(file, delimiter='|')
            print(f"Processing file. Batch size: {BATCH_SIZE} documents.")

            for row in reader:
                total_rows_processed += 1

                if len(row) < len(FIELD_NAMES):
                    print(f"Warning: Row {total_rows_processed} has only {len(row)} fields, expected {len(FIELD_NAMES)}. Skipping: {row}")
                    continue
                
                actual_row_data = row[:len(FIELD_NAMES)]
                document = dict(zip(FIELD_NAMES, actual_row_data))
                
                documents_batch.append(document)

                if len(documents_batch) >= BATCH_SIZE:
                    try:
                        collection.insert_many(documents_batch, ordered=False) # ordered=False can improve performance for large batches
                        total_rows_inserted += len(documents_batch)
                        batch_count += 1
                        documents_batch = [] 
                        print(f"Batch {batch_count} inserted. Total rows inserted: {total_rows_inserted}. Time: {time.time() - start_time:.2f}s")
                    except Exception as e:
                        print(f"Error inserting batch: {e}")
                        # If auth error, further batches will also fail.

            if documents_batch:
                try:
                    collection.insert_many(documents_batch, ordered=False)
                    total_rows_inserted += len(documents_batch)
                    batch_count += 1
                    if batch_count % 10 == 0:
                        print(f"Final batch ({batch_count}) inserted. Total rows inserted: {total_rows_inserted}.")
                except Exception as e:
                    print(f"Error inserting final batch: {e}")

    except FileNotFoundError:
        print(f"ERROR: File not found at '{FILE_PATH}'. Please check the path.")
        return
    except Exception as e:
        print(f"An unexpected error occurred during file processing or insertion: {e}")
    finally:
        if 'client' in locals() and client:
            client.close()
            print("MongoDB connection closed.")

    end_time = time.time()
    print(f"\n--- Import Summary ---")
    print(f"Total rows processed from file: {total_rows_processed}")
    print(f"Total rows successfully inserted into MongoDB: {total_rows_inserted}")
    print(f"Total time taken: {end_time - start_time:.2f} seconds")
    print(f"Import finished for '{DATABASE_NAME}.{COLLECTION_NAME}'.")

    


In [None]:
import_mrconso_to_mongodb()

In [None]:

# Update all documents to add 'str_lower' as lowercase of 'STR'
result = collection.update_many(
    {},
    [
        {
            "$set": {
                "STR_LOWER": { "$toLower": "$STR" }
            }
        }
    ]
)

print(f"Modified {result.modified_count} documents.")

# Create an index on the 'str_lower' field for faster lookups
index_name = collection.create_index("STR_LOWER")
index_name = collection.create_index("STR")

print(f"Created index: {index_name}")
