In [1]:
import pymongo
import csv
from decimal import Decimal

In [2]:
def convert_to_int(value):
    """Convert string to int, return None if empty or invalid"""
    try:
        return int(float(value)) if value and value.strip() else None
    except ValueError:
        return None

def convert_to_float(value):
    """Convert string to float, return None if empty or invalid"""
    try:
        return float(value) if value and value.strip() else None
    except ValueError:
        return None

def convert_to_bigint(value):
    """Convert string to bigint (int in Python), return None if empty or invalid"""
    try:
        return int(float(value)) if value and value.strip() else None
    except ValueError:
        return None

In [3]:
def import_ifsc_master(collection):
    """Import IFSC_master data from CSV"""
    with open('ifsc_combined.csv', 'r', encoding='utf-8') as file:
        csv_reader = csv.reader(file)
        next(csv_reader)  # Skip header if exists (assuming there's no header based on your example)
        
        documents = []
        for row in csv_reader:
            if len(row) >= 9:  # Ensure row has enough columns
                doc = {
                    'BANK': row[0],
                    'IFSC': row[1],
                    'BRANCH': row[2],
                    'ADDRESS': row[3],
                    'CITY1': row[4],
                    'CITY2': row[5],
                    'STATE': row[6],
                    'STD_CODE': convert_to_int(row[7]),
                    'PHONE': convert_to_bigint(row[8])
                }
                documents.append(doc)
        
        if documents:
            collection.insert_many(documents)
            print(f"Inserted {len(documents)} documents into IFSC_master")

In [4]:
def import_merged_banks(collection):
    """Import Merged_banks data from CSV"""
    with open('merged_banks_ifsc_combined.csv', 'r', encoding='utf-8') as file:
        csv_reader = csv.reader(file)
        next(csv_reader)  # Skip header if exists
        
        documents = []
        for row in csv_reader:
            if len(row) >= 4:  # Ensure row has enough columns
                doc = {
                    'Old_IFSC_Code': row[0],
                    'New_IFSC_Code': row[1],
                    'Old_MICR': convert_to_int(row[2]),
                    'New_MICR': convert_to_int(row[3])
                }
                documents.append(doc)
        
        if documents:
            collection.insert_many(documents)
            print(f"Inserted {len(documents)} documents into Merged_banks")

In [5]:
def import_bank_participants(collection):
    """Import bank_participants data from CSV"""
    with open('bank_participants.csv', 'r', encoding='utf-8') as file:
        csv_reader = csv.reader(file)
        next(csv_reader)  # Skip header if exists
        
        documents = []
        for row in csv_reader:
            if len(row) >= 4:  # Ensure row has enough columns
                doc = {
                    'merged_participant_code': row[0],
                    'merged_participant_name': row[1],
                    'target_participant_code': row[2],
                    'target_participant_name': row[3]
                }
                documents.append(doc)
        
        if documents:
            collection.insert_many(documents)
            print(f"Inserted {len(documents)} documents into bank_participants")

In [6]:
def import_submember_banks(collection):
    """Import submember_banks data from CSV"""
    with open('submember_banks.csv', 'r', encoding='utf-8') as file:
        csv_reader = csv.reader(file)
        next(csv_reader)  # Skip header if exists
        
        documents = []
        for row in csv_reader:
            if len(row) >= 4:  # Ensure row has enough columns
                doc = {
                    'bank_code': row[0],
                    'ifsc': row[1],
                    'micr': row[2],
                    'bank_name': row[3]
                }
                documents.append(doc)
        
        if documents:
            collection.insert_many(documents)
            print(f"Inserted {len(documents)} documents into submember_banks")

In [7]:
def create_indexes(db):
    """Create appropriate indexes based on MySQL key configurations"""
    # IFSC_master indexes
    db.IFSC_master.create_index([('IFSC', pymongo.ASCENDING)], unique=True)  # Primary key
    db.IFSC_master.create_index([('BANK', pymongo.ASCENDING)])  # Multiple key
    db.IFSC_master.create_index([('BRANCH', pymongo.ASCENDING)])  # Multiple key
    db.IFSC_master.create_index([('STATE', pymongo.ASCENDING)])  # Multiple key
    
    # Merged_banks indexes
    db.Merged_banks.create_index([('Old_IFSC_Code', pymongo.ASCENDING)])  # Multiple key
    db.Merged_banks.create_index([('New_IFSC_Code', pymongo.ASCENDING)])  # Multiple key
    
    # bank_participants indexes
    db.bank_participants.create_index([('merged_participant_code', pymongo.ASCENDING)])  # Multiple key
    db.bank_participants.create_index([('target_participant_code', pymongo.ASCENDING)])  # Multiple key
    
    # submember_banks indexes
    db.submember_banks.create_index([('bank_code', pymongo.ASCENDING)])  # Multiple key
    db.submember_banks.create_index([('ifsc', pymongo.ASCENDING)])  # Multiple key
    
    print("Indexes created successfully")

In [16]:
def main():
    # Connect to MongoDB
    client = pymongo.MongoClient('mongodb://localhost:27017/')
    
    # Create/get database
    db = client['BANK']
    
    # Drop existing collections (optional - comment out if you want to append)
    collections = ['IFSC_master', 'Merged_banks', 'bank_participants', 'submember_banks']
    for collection_name in collections:
        db[collection_name].drop()
        print(f"Dropped collection: {collection_name}")
    
    # Import data
    import_ifsc_master(db.IFSC_master)
    import_merged_banks(db.Merged_banks)
    import_bank_participants(db.bank_participants)
    import_submember_banks(db.submember_banks)
    
    # Create indexes
    create_indexes(db)
    
    # Print collection counts
    for collection_name in collections:
        count = db[collection_name].count_documents({})
        print(f"{collection_name} contains {count} documents")
    
    client.close()
    print("Data import completed successfully")

if __name__ == "__main__":
    main()

Dropped collection: IFSC_master
Dropped collection: Merged_banks
Dropped collection: bank_participants
Dropped collection: submember_banks
Inserted 172030 documents into IFSC_master
Inserted 22143 documents into Merged_banks
Inserted 55 documents into bank_participants
Inserted 71444 documents into submember_banks
Indexes created successfully
IFSC_master contains 172030 documents
Merged_banks contains 22143 documents
bank_participants contains 55 documents
submember_banks contains 71444 documents
Data import completed successfully


In [11]:
import pymongo
import time
import unittest

class TestMongoDBImport(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        """Set up MongoDB connection before all tests"""
        cls.client = pymongo.MongoClient('mongodb://localhost:27017/')
        cls.db = cls.client['BANK']
        
    @classmethod
    def tearDownClass(cls):
        """Clean up after all tests"""
        cls.client.close()
    
    def test_ifsc_lookup(self):
        """Test Case for Basic IFSC Code Lookup"""
        collection = self.db.IFSC_master
        
        # Test with a known IFSC code
        test_ifsc = 'ABHY0065101'  # Replace with a valid IFSC from your db
        
        # Check if the index exists
        indexes = collection.list_indexes()
        index_found = False
        for index in indexes:
            if index['key'].get('IFSC'):
                index_found = True
                break
        self.assertTrue(index_found, "IFSC index does not exist on IFSC_master collection")
        
        # Use explain to check if the index is being used
        explain_result = collection.find({'IFSC': test_ifsc}).hint('IFSC_1').explain()
        self.assertIn('inputStage', explain_result['queryPlanner']['winningPlan']['inputStage'])
        
        # Perform the actual query
        result = collection.find_one({'IFSC': test_ifsc})
        
        self.assertIsNotNone(result, "IFSC lookup failed")
        self.assertEqual(result['IFSC'], test_ifsc, "Incorrect IFSC returned")
        
        print("Basic IFSC lookup test passed!")
    
    def test_branch_city_search(self):
        """Test Case for Branch and City Search"""
        collection = self.db.IFSC_master
        
        test_branch = "KOLKATA MAIN"
        test_city = "KOLKATA"
        
        # Check if we're using indexes
        explain_result = collection.find({
            'BRANCH': test_branch,
            'CITY1': test_city
        }).explain()
        
        # Find documents
        results = list(collection.find({
            'BRANCH': test_branch,
            'CITY1': test_city
        }).limit(10))
        
        self.assertGreater(len(results), 0, "No results for branch-city search")
        for result in results:
            self.assertEqual(result['BRANCH'], test_branch, "Incorrect branch returned")
            self.assertEqual(result['CITY1'], test_city, "Incorrect city returned")
        
        print("Branch-city search test passed!")
    
    def test_performance_comparison(self):
        """Test Case for Performance Comparison Test"""
        collection = self.db.IFSC_master
        test_ifsc = 'SBIN0000817'
        
        # Test with index
        start_time = time.time()
        list(collection.find({'IFSC': test_ifsc}).hint('IFSC_1'))
        indexed_time = time.time() - start_time
        
        # Drop index temporarily
        collection.drop_index('IFSC_1')
        
        # Test without index
        start_time = time.time()
        list(collection.find({'IFSC': test_ifsc}))
        non_indexed_time = time.time() - start_time
        
        # Re-create index
        collection.create_index([('IFSC', pymongo.ASCENDING)], unique=True)
        
        print(f"Indexed query time: {indexed_time:.6f} seconds")
        print(f"Non-indexed query time: {non_indexed_time:.6f} seconds")
        
        self.assertLess(indexed_time, non_indexed_time, "Index not improving performance")
        print("Performance comparison test passed!")
    
    def test_merged_bank_lookup(self):
        """Test Case for Merged Bank IFSC Lookup"""
        # NOTE: The provided code doesn't import merged_banks, but I'll write the test for it
        collection = self.db.Merged_banks
        
        test_old_ifsc = "ALLA0210127"
        
        # Check if the index exists for Old_IFSC_Code
        indexes = collection.list_indexes()
        index_found = False
        for index in indexes:
            if index['key'].get('Old_IFSC_Code'):
                index_found = True
                break
        self.assertTrue(index_found, "Old_IFSC_Code index does not exist on Merged_banks collection")
        
        # Perform the lookup
        result = collection.find_one({'Old_IFSC_Code': test_old_ifsc})
        
        self.assertIsNotNone(result, "Merged bank lookup failed")
        self.assertEqual(result['Old_IFSC_Code'], test_old_ifsc, "Incorrect old IFSC returned")
        
        print("Merged bank lookup test passed!")
    
    def test_state_city_search(self):
        """Test Case for State and City Combination Search"""
        collection = self.db.IFSC_master
        
        test_state = "MAHARASHTRA"
        test_city = "PUNE"
        
        # Use explain to check query performance
        explain_result = collection.find({
            'STATE': test_state,
            'CITY1': test_city
        }).explain()
        
        # Perform the search
        results = list(collection.find({
            'STATE': test_state,
            'CITY1': test_city
        }).limit(10))
        
        self.assertGreater(len(results), 0, "No results for state-city search")
        for result in results:
            self.assertEqual(result['STATE'], test_state, "Incorrect state returned")
            self.assertEqual(result['CITY1'], test_city, "Incorrect city returned")
        
        print("State-city search test passed!")

In [15]:
# Function to run tests in Jupyter
def run_tests_in_jupyter():
    """Function to run tests that works in Jupyter notebooks"""
    suite = unittest.TestLoader().loadTestsFromTestCase(TestMongoDBImport)
    runner = unittest.TextTestRunner(verbosity=2)
    runner.run(suite)

# Individual test runner functions
def test_ifsc_lookup(x):
    """Standalone function for basic IFSC lookup test"""
    client = pymongo.MongoClient('mongodb://localhost:27017/')
    db = client['BANK']
    collection = db.IFSC_master
    
    test_ifsc = x
    result = collection.find_one({'IFSC': test_ifsc})
    
    assert result is not None, f"IFSC {test_ifsc} not found"
    assert result['IFSC'] == test_ifsc, "Incorrect IFSC returned"
    
    print(f"IFSC lookup test passed for {test_ifsc}!")
    client.close()

def test_branch_city_search(branch, city):
    """Standalone function for branch-city search test"""
    client = pymongo.MongoClient('mongodb://localhost:27017/')
    db = client['BANK']
    collection = db.IFSC_master
    
    results = list(collection.find({
        'BRANCH': branch,
        'CITY1': city
    }).limit(10))
    
    assert len(results) > 0, f"No results for branch {branch} in city {city}"
    for result in results:
        assert result['BRANCH'] == branch, "Incorrect branch returned"
        assert result['CITY1'] == city, "Incorrect city returned"
    
    print(f"Branch-city search test passed for {branch}, {city}!")
    client.close()

def test_merged_bank_lookup(old_ifsc):
    """Standalone function for merged bank lookup test"""
    client = pymongo.MongoClient('mongodb://localhost:27017/')
    db = client['BANK']
    collection = db.Merged_banks
    
    result = collection.find_one({'Old_IFSC_Code': old_ifsc})
    
    assert result is not None, f"Old IFSC {old_ifsc} not found"
    assert result['Old_IFSC_Code'] == old_ifsc, "Incorrect old IFSC returned"
    
    print(f"Merged bank lookup test passed for {old_ifsc}!")
    client.close()

def test_state_city_search(state, city):
    """Standalone function for state-city search test"""
    client = pymongo.MongoClient('mongodb://localhost:27017/')
    db = client['BANK']
    collection = db.IFSC_master
    
    results = list(collection.find({
        'STATE': state,
        'CITY1': city
    }).limit(10))
    
    if len(results) == 0:
        print(f"No results for state {state} and city {city}, trying alternate search...")
        # Try searching just by state
        results = list(collection.find({'STATE': state}).limit(10))
        if len(results) > 0:
            print(f"Found {len(results)} results for state {state}")
        else:
            print(f"No results found for state {state}")
    else:
        for result in results:
            assert result['STATE'] == state, "Incorrect state returned"
            assert result['CITY1'] == city, "Incorrect city returned"
        print(f"State-city search test passed for {state}, {city}!")
    
    client.close()

def test_performance_comparison():
    """Standalone function for performance comparison test"""
    client = pymongo.MongoClient('mongodb://localhost:27017/')
    db = client['BANK']
    collection = db.IFSC_master
    test_ifsc = 'SBIN0000817'
    
    # Test with index
    start_time = time.time()
    list(collection.find({'IFSC': test_ifsc}).hint('IFSC_1'))
    indexed_time = time.time() - start_time
    
    # Get current indexes
    current_indexes = collection.index_information()
    
    # Drop IFSC index if it exists
    if 'IFSC_1' in current_indexes:
        collection.drop_index('IFSC_1')
    
    # Test without index
    start_time = time.time()
    list(collection.find({'IFSC': test_ifsc}))
    non_indexed_time = time.time() - start_time
    
    # Re-create index
    collection.create_index([('IFSC', pymongo.ASCENDING)], unique=True)
    
    print(f"Indexed query time: {indexed_time:.6f} seconds")
    print(f"Non-indexed query time: {non_indexed_time:.6f} seconds")
    
    assert indexed_time < non_indexed_time, "Index not improving performance"
    print("Performance comparison test passed!")
    
    client.close()

# Function to run all tests in proper sequence
def run_all_tests():
    """Run all tests in sequence, avoiding Jupyter conflicts"""
    try:
        # Run unittest tests
        print("Running unittest test suite...")
        run_tests_in_jupyter()
        
        print("\nRunning individual tests...")
        test_ifsc_lookup('ABHY0065101')
        test_ifsc_lookup('ABHY0065001')
        test_branch_city_search("KOLKATA MAIN", "KOLKATA")
        test_branch_city_search("KOLKATA MAIN", "Bellary")
        test_merged_bank_lookup("ALLA0210127")
        test_merged_bank_lookup("ALBA0211111")
        test_performance_comparison()
        test_state_city_search("MAHARASHTRA", "PUNE")
        test_state_city_search("TAMIL NADU", "CHENNAI")
        test_state_city_search("TAMIL NADU", "PUNE")
    except Exception as e:
        print(f"Error running tests: {str(e)}")

# For running in Jupyter notebook, use:
run_all_tests()

test_branch_city_search (__main__.TestMongoDBImport)
Test Case for Branch and City Search ... ok
test_ifsc_lookup (__main__.TestMongoDBImport)
Test Case for Basic IFSC Code Lookup ... FAIL
test_merged_bank_lookup (__main__.TestMongoDBImport)
Test Case for Merged Bank IFSC Lookup ... FAIL
test_performance_comparison (__main__.TestMongoDBImport)
Test Case for Performance Comparison Test ... ok
test_state_city_search (__main__.TestMongoDBImport)
Test Case for State and City Combination Search ... ok

FAIL: test_ifsc_lookup (__main__.TestMongoDBImport)
Test Case for Basic IFSC Code Lookup
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/tmp/ipykernel_8340/56021491.py", line 35, in test_ifsc_lookup
    self.assertIn('inputStage', explain_result['queryPlanner']['winningPlan']['inputStage'])
AssertionError: 'inputStage' not found in {'stage': 'IXSCAN', 'keyPattern': {'IFSC': 1}, 'indexName': 'IFSC_1', 'isMultiKey': False, 'mult

Running unittest test suite...
Branch-city search test passed!
Indexed query time: 0.000394 seconds
Non-indexed query time: 0.037770 seconds
Performance comparison test passed!
State-city search test passed!

Running individual tests...
Error running tests: IFSC ABHY0065101 not found
