# ETL Pipeline: MySQL to MongoDB

This notebook contains code for extracting data from MySQL, transforming it with pandas, and loading it into MongoDB. See the README for setup and details.

In [20]:
import pandas as pd
from sqlalchemy import create_engine
from pymongo import MongoClient
import datetime
from dotenv import load_dotenv
import os

In [21]:
# Load environment variables from .env file
load_dotenv()

mysql_user = os.getenv('MYSQL_USER')
mysql_password = os.getenv('MYSQL_PASSWORD')
mysql_host = os.getenv('MYSQL_HOST', 'localhost')
mysql_port = int(os.getenv('MYSQL_PORT', 3306))
mysql_db = os.getenv('MYSQL_DB')

engine = create_engine(f"mysql+pymysql://{mysql_user}:{mysql_password}@{mysql_host}:{mysql_port}/{mysql_db}")

query = "SELECT * FROM employees"

df = pd.read_sql(query, engine)

df.head()


Unnamed: 0,emp_id,name,department_id,salary,hire_date,manager_id,bonus
0,1,Ali,1,65884.5,2022-01-10,,0.0
1,2,Sara,2,66000.0,2021-03-15,1.0,0.0
2,3,Ahmed,3,99825.0,2020-06-01,1.0,0.0
3,4,Zainab,3,66550.0,2023-01-12,2.0,0.0
4,6,Kamran,1,55635.8,2021-07-01,1.0,0.0


In [22]:
# Drop duplicate rows
df = df.drop_duplicates()

# Drop rows with missing values
df = df.dropna()

# Convert all datetime.date columns to datetime.datetime
for col in df.columns:
    if df[col].apply(lambda x: isinstance(x, datetime.date) and not isinstance(x, datetime.datetime)).any():
        df[col] = df[col].apply(lambda x: datetime.datetime.combine(x, datetime.time()) if isinstance(x, datetime.date) and not isinstance(x, datetime.datetime) else x)

df.head()


Unnamed: 0,emp_id,name,department_id,salary,hire_date,manager_id,bonus
1,2,Sara,2,66000.0,2021-03-15,1.0,0.0
2,3,Ahmed,3,99825.0,2020-06-01,1.0,0.0
3,4,Zainab,3,66550.0,2023-01-12,2.0,0.0
4,6,Kamran,1,55635.8,2021-07-01,1.0,0.0
5,7,Maryam,1,61492.2,2022-02-12,1.0,0.0


In [23]:
# Best practice: Avoid duplicate records by using upsert (update or insert) based on a unique key, e.g., emp_id.
# Ensure a unique index exists on emp_id in your MongoDB collection for efficiency.

mongo_uri = os.getenv('MONGO_URI')
mongo_db_name = os.getenv('MONGO_DB_NAME')
mongo_collection_name = os.getenv('MONGO_COLLECTION_NAME')

client = MongoClient(mongo_uri)
db = client[mongo_db_name]
collection = db[mongo_collection_name]

# Create a unique index on emp_id (runs safely even if index already exists)
collection.create_index('emp_id', unique=True)

# Insert or update each record based on emp_id
upserted_count = 0
for record in df.to_dict(orient='records'):
    result = collection.update_one({'emp_id': record['emp_id']}, {'$set': record}, upsert=True)
    if result.upserted_id is not None or result.modified_count > 0:
        upserted_count += 1

print(f"Upserted {upserted_count} records into MongoDB collection '{mongo_collection_name}'.")


Upserted 0 records into MongoDB collection 'employees'.
