1. Preparing Source Data

In [None]:
%pip install pyodbc
%pip install elasticsearch

Once you have the environment ready, use the following Python code to create a SQL Server database (PersonSearchDB), setting up a Persons table, and populating it with 1 million realistic random records (first names, last names, cities, states, and email addresses using randommail.com for privacy). After testing, a cleanup script can safely remove the database and table.

In [None]:
import pyodbc
import random
import datetime
from elasticsearch import Elasticsearch, helpers

# Define server name as a global variable so it can be set once
server = 'localhost'
db_name = 'PersonSearchDB'

# Connection to 'master' for creating the database
master_conn_str = f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE=master;Trusted_Connection=yes;'
master_conn = pyodbc.connect(master_conn_str)

# Set autocommit to True to disable transactions
master_conn.autocommit = True

master_cursor = master_conn.cursor()

# Check if the database exists, if not, create it
create_db_query = f"IF DB_ID('{db_name}') IS NULL CREATE DATABASE {db_name};"
master_cursor.execute(create_db_query)

# Close master connection
master_cursor.close()
master_conn.close()

# Function to return SQL Server connection
def get_sql_connection(db_name):
    """
    Returns a SQL Server connection object for the given database name.
    If the database does not exist, the function connects to 'master' first to create it.
    
    Parameters:
    db_name (str): The name of the database to connect to.

    Returns:
    pyodbc.Connection: A connection object to the specified database.
    """    

    # Connection to the newly created or existing database
    conn_str = f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={db_name};Trusted_Connection=yes;'
    return pyodbc.connect(conn_str)

# Function to execute a SQL query
def execute_SQL_Query(db_name, query, params=None):
    """
    Executes a SQL query on the given database. Uses the connection obtained from get_sql_connection.
    
    Parameters:
    db_name (str): The name of the database.
    query (str): The SQL query to execute.
    params (tuple): Parameters to pass to the query (optional).
    
    Returns:
    list: The result of the query if it's a SELECT query, otherwise None.
    """
    conn = get_sql_connection(db_name)
    cursor = conn.cursor()
    if params:
        cursor.execute(query, params)
    else:
        cursor.execute(query)
    
    if query.strip().upper().startswith("SELECT"):
        result = cursor.fetchall()  # Fetch all results if it's a SELECT query
    else:
        conn.commit()
        result = None

    cursor.close()
    conn.close()
    
    return result


# Create database
execute_SQL_Query('master', f"IF DB_ID('{db_name}') IS NULL CREATE DATABASE {db_name};")

# List of Indian first names, last names, cities, and states
first_names = ['Rahul', 'Anjali', 'Amit', 'Pooja', 'Rajesh', 'Sneha', 'Vikram', 'Neha', 'Suresh', 'Sunita']
last_names = ['Sharma', 'Patel', 'Gupta', 'Mehta', 'Jain', 'Agarwal', 'Reddy', 'Singh', 'Kumar', 'Verma']
cities = ['Mumbai', 'Delhi', 'Bangalore', 'Chennai', 'Hyderabad', 'Ahmedabad', 'Kolkata', 'Pune', 'Jaipur', 'Lucknow']
states = ['MH', 'DL', 'KA', 'TN', 'TS', 'GJ', 'WB', 'MH', 'RJ', 'UP']

def random_name():
    return random.choice(first_names), random.choice(last_names)

def random_email(first_name, last_name):
    return f"{first_name.lower()}.{last_name.lower()}@randommail.com"

def random_DOB():
    start_date = datetime.date(1950, 1, 1)
    end_date = datetime.date(2005, 12, 31)
    time_between_dates = end_date - start_date
    days_between_dates = time_between_dates.days
    random_number_of_days = random.randrange(days_between_dates)
    random_date = start_date + datetime.timedelta(days=random_number_of_days)
    return random_date

def random_zipcode():
    return str(random.randint(100000, 999999))  # Indian zip codes are 6 digits

# Function to create the table and insert records in bulk
def setup_database_and_bulk_insert_data(db_name, record_count=1000, batch_size=100):
    # Create Persons table using execute_SQL_Query
    create_table_query = '''
    IF OBJECT_ID('Persons', 'U') IS NOT NULL DROP TABLE Persons;
    CREATE TABLE Persons (
        FirstName NVARCHAR(50),
        LastName NVARCHAR(50),
        PreferredName NVARCHAR(50),
        City NVARCHAR(50),
        State NVARCHAR(50),
        ZipCode NVARCHAR(10),
        DOB DATE,
        Email NVARCHAR(100)
    );
    '''
    execute_SQL_Query(db_name, create_table_query)

    # Insert records in batches
    for batch_start in range(0, record_count, batch_size):
        values = []
        for _ in range(batch_size):
            first_name, last_name = random_name()
            preferred_name = first_name  # Assume preferred name is the first name
            city = random.choice(cities)
            state = random.choice(states)
            zipcode = random_zipcode()
            dob = random_DOB()
            dob_str = dob.strftime('%Y-%m-%d')
            email = random_email(first_name, last_name)
            values.append(f"SELECT '{first_name}', '{last_name}', '{preferred_name}', '{city}', '{state}', '{zipcode}', '{dob_str}', '{email}'")

        # Create bulk insert query using INSERT INTO ... SELECT
        insert_query = '''
        INSERT INTO Persons (FirstName, LastName, PreferredName, City, State, ZipCode, DOB, Email)
        ''' + " UNION ALL ".join(values)

        execute_SQL_Query(db_name, insert_query)
        print(f"Inserted batch starting at record {batch_start}")

# Example usage: setting up the database and bulk inserting data
setup_database_and_bulk_insert_data(db_name, record_count=1000, batch_size=100)


2. Setting Up Elasticsearch

    • Download and Install: Obtain Elasticsearch from the official website and follow the installation guide.
    • Configuration: Adjust settings as needed, such as cluster name and network configurations.


3. Creating an Elasticsearch Index
    • Define Mappings: Specify how each field should be indexed and analyzed, particularly those requiring fuzzy search capabilities.

    • Index Creation: Use Python scripts or tools like Kibana to create the index with the defined mappings.

In [None]:

from elasticsearch import Elasticsearch, helpers
import warnings
import urllib3

# Suppress warnings about insecure connections (optional)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

   
# Replace with your actual password
elastic_password = "IMoWOv8DHTMNnQod37NS"

# Initialize the Elasticsearch client with SSL and authentication
# make sure elastic search is running on port 9200, 
# Use the following command to start elastic search
# .\elasticsearch-8.11.1\bin\elasticsearch.bat


es = Elasticsearch(
    ["https://localhost:9200"],
    ca_certs=False,          # Disable SSL certificate verification
    verify_certs=False,      # Disable SSL cert verification (use with caution)
    basic_auth=("elastic", elastic_password),
)

def index_data_to_elasticsearch(db_name, index_name, batch_size=10000):
    
    # Delete the existing index if it exists
    if es.indices.exists(index=index_name):
        es.indices.delete(index=index_name)
        print(f"Deleted existing index: {index_name}")

    # Create a new index with mappings
    index_mappings = {
        "mappings": {
            "properties": {
                "FirstName": {"type": "text"},
                "LastName": {"type": "text"},
                "PreferredName": {"type": "text"},
                "City": {"type": "text"},
                "State": {"type": "keyword"},
                "ZipCode": {"type": "keyword"},
                "DOB": {"type": "date"},
                "Email": {"type": "keyword"}
            }
        }
    }
    es.indices.create(index=index_name, body=index_mappings)
    print(f"Created new index: {index_name}")

    # Get total number of records
    total_records_query = "SELECT COUNT(*) FROM Persons"
    total_records_result = execute_SQL_Query(db_name, total_records_query)
    total_records = total_records_result[0][0]

    offset = 0
    while offset < total_records:
        query = f'''
        SELECT FirstName, LastName, PreferredName, City, State, ZipCode, DOB, Email
        FROM Persons
        ORDER BY FirstName
        OFFSET {offset} ROWS FETCH NEXT {batch_size} ROWS ONLY
        '''
        rows = execute_SQL_Query(db_name, query)
        actions = []
        for row in rows:
            print(f"\n\nrow: {row}")
            doc = {
                "_index": index_name,
                "_source": {
                    "FirstName": row[0],
                    "LastName": row[1],
                    "PreferredName": row[2],
                    "City": row[3],
                    "State": row[4],
                    "ZipCode": row[5],
                    "DOB": row[6].strftime('%Y-%m-%d') if row[6] else None,
                    "Email": row[7]
                }
            }
            actions.append(doc)
            
        try:
            helpers.bulk(es, actions)
        except helpers.BulkIndexError as e:
            print(f"Bulk indexing error: {e}")
            for error in e.errors:
                print(error)
        
        offset += batch_size
        print(f"Indexed {offset}/{total_records} records")

# Example usage: setting up the database and bulk inserting data
# setup_database_and_bulk_insert_data(db_name, record_count=1000, batch_size=100)
index_data_to_elasticsearch('PersonSearchDB', 'person_index', batch_size=1000)