1️⃣ What is an ETL Pipeline?
An ETL pipeline is a structured process to:

Extract → Pull data from one or more sources (files, APIs, databases, streams, etc.).

Transform → Clean, validate, reformat, enrich, or aggregate the data into the required form.

Load → Push the processed data into a target system (e.g., SQL database, data warehouse, analytics tool).

📌 In Data Engineering, ETL is used to move data from raw sources → analytics-ready formats.

2️⃣ Core Components
A. Extract
Sources can be:

Files → CSV, Excel, JSON, Parquet

Databases → MySQL, PostgreSQL, SQLite

APIs → REST, GraphQL

Streaming → Kafka, RabbitMQ

Cloud Storage → AWS S3, Google Cloud Storage

Python tools for extraction:

pandas.read_csv() / read_excel() / read_json()

requests or httpx for API calls

sqlite3 / SQLAlchemy / psycopg2 for DB queries

boto3 for AWS S3

B. Transform
This is often the heaviest part of ETL.
Common transformations:

Data Cleaning → Handle missing values, remove duplicates, fix data types.

Data Enrichment → Add calculated fields, merge with reference data.

Data Aggregation → Summaries, grouping, rolling averages.

Data Standardization → Consistent formats for dates, currencies, casing.

Python tools for transformation:

pandas → For tabular data wrangling.

numpy → For numeric operations.

pyarrow / fastparquet → For efficient file formats.

Business Logic → Applying rules from domain knowledge.

C. Load
Targets can be:

Databases → MySQL, PostgreSQL, SQLite

Data Warehouses → Snowflake, BigQuery, Redshift

Data Lakes → S3, HDFS

Files → CSV, Parquet, JSON for reporting

Python tools for loading:

pandas.to_sql() (with SQLAlchemy engine)

Database connectors (mysql-connector-python, psycopg2)

boto3 for S3 uploads

to_csv() / to_parquet() for local files

3️⃣ Simple Example ETL in Python

import pandas as pd
import sqlite3
import requests

# === 1. EXTRACT ===


# === 2. TRANSFORM ===

# === 3. LOAD ===

4️⃣ Best Practices for ETL Pipelines
✅ Make it repeatable — wrap in functions or classes.
✅ Parameterize — avoid hardcoding file paths or credentials.
✅ Log everything — so you can debug issues.
✅ Validate data — ensure correct schema & no corruption.
✅ Use incremental loads — only process new or changed data when possible.
✅ Consider orchestration tools — Airflow, Prefect, Kedro for production.

5️⃣ Scaling Up
When your ETL gets bigger:

Batch vs. Streaming ETL

Batch → Runs at fixed intervals (daily/hourly).

Streaming → Processes data in near real-time (Kafka, Spark Streaming).

Distributed Processing → Use Spark or Dask for large datasets.

Data Quality Checks → Great Expectations, dbt tests.



# working with sqlite3 database to experience transformation of data

In [28]:
import pandas as pd 
import requests
import sqlite3


# === 1. EXTRACT ===

url = "https://api.exchangerate-api.com/v4/latest/USD"
data =requests.get(url).json()
print(type(data))   # See if it's a dict or list
# print(data)         # Look at the whole structure

if isinstance(data, dict):
    print(data.keys())
    
df = pd.DataFrame(data['rates'].items(), columns=['currency', 'rate'])
df.head()



<class 'dict'>


Unnamed: 0,currency,rate
0,USD,1.0
1,AED,3.67
2,AFN,68.39
3,ALL,83.19
4,AMD,383.84


In [29]:
# === 2. TRANSFORM ===
df['rate'] = df['rate'].round(2)
df = df.sort_values(by='rate', ascending=False)
df.head()





Unnamed: 0,currency,rate
82,LBP,89500.0
66,IRR,42264.58
150,VND,26184.31
128,SLL,23234.29
81,LAK,21708.91


In [31]:
# === 3. LOAD ===
conn = sqlite3.connect('exchange_rates.db')
df.to_sql('rates', conn, if_exists='replace', index=False)
conn.close()

print("ETL Pipeline completed successfully!")

ETL Pipeline completed successfully!


In [2]:
import sqlite3
import pandas as pd

# Connect to the database
conn = sqlite3.connect('exchange_rates.db')

# Read data into a DataFrame
df_read = pd.read_sql_query("SELECT * FROM rates", conn)

conn.close()

df_read.head()  # Show first 5 rows


Unnamed: 0,currency,rate
0,LBP,89500.0
1,IRR,42264.58
2,VND,26184.31
3,SLL,23234.29
4,LAK,21708.91


In [3]:
df = pd.read_csv("data.csv")
df.head()

Unnamed: 0,Name,Age,City,Salary
0,David,34,San Diego,79076
1,Charlie,35,Phoenix,76699
2,Quinn,38,New York,93478
3,Eve,32,Phoenix,92233
4,Paul,27,Houston,96396


Step 2 – Transform: Clean and Analyze in Pandas

In [4]:
df.drop_duplicates(inplace=True)
# souppose we want see high erners

high_earners = df[df['Salary'] > 80000]
high_earners.head()

Unnamed: 0,Name,Age,City,Salary
2,Quinn,38,New York,93478
3,Eve,32,Phoenix,92233
4,Paul,27,Houston,96396
7,Grace,47,San Diego,83846
8,Nate,59,Houston,84534


In [5]:
import mysql.connector
import pandas as pd 
conn = mysql.connector.connect(
      host='127.0.0.1',
        port=3306,
        user='root',
        password='*Notebook@2025',
        database='employee'
)

cursor = conn.cursor()

In [6]:
cursor.execute("SHOW DATABASES")
for db in cursor:
    print(db[0])


30day_sql_query
amazon_present_absent
basic_sql
case_study
company_db
employee
football_matach
information_schema
invoicing
mysql
performance_schema
sql_hr
sql_inventory
sql_invoicing
sql_store
store
sys


now databse works starts

In [7]:
create_table_query= '''CREATE TABLE IF NOT EXISTS emp(
        Name VARCHAR(100), Age INT, City VARCHAR(100), Salary DECIMAL(10,2)
        ) '''
cursor.execute(create_table_query)
conn.commit()

In [8]:
insert_query = ''' INSERT INTO emp(Name, Age, City, Salary) VALUES(%s, %s, %s, %s) '''

for _,row in df.iterrows():
    cursor.execute(insert_query, tuple(row))

conn.commit()    

In [9]:
# Top 5 highest salaries
cursor.execute('SELECT Name, Salary FROM emp ORDER BY Salary DESC LIMIT 5')
cursor.fetchall()

[('Liam', Decimal('118931.00')),
 ('Liam', Decimal('118931.00')),
 ('Paul', Decimal('96396.00')),
 ('Paul', Decimal('96396.00')),
 ('Steve', Decimal('94769.00'))]

In [13]:
cursor.execute("SELECT DATABASE();")
cursor.fetchone()


('employee',)

In [14]:
cursor.execute("SELECT COUNT(*) FROM emp;")
cursor.fetchone()

(40,)

In [15]:
cursor.execute("SELECT * FROM emp;")

for row in cursor.fetchall():
    print(row)




('David', 34, 'San Diego', Decimal('79076.00'))
('Charlie', 35, 'Phoenix', Decimal('76699.00'))
('Quinn', 38, 'New York', Decimal('93478.00'))
('Eve', 32, 'Phoenix', Decimal('92233.00'))
('Paul', 27, 'Houston', Decimal('96396.00'))
('Hannah', 58, 'New York', Decimal('59351.00'))
('Nate', 29, 'San Antonio', Decimal('60027.00'))
('Grace', 47, 'San Diego', Decimal('83846.00'))
('Nate', 59, 'Houston', Decimal('84534.00'))
('Quinn', 59, 'San Antonio', Decimal('83279.00'))
('Grace', 39, 'San Antonio', Decimal('79001.00'))
('Ian', 43, 'San Antonio', Decimal('65220.00'))
('Bob', 44, 'San Diego', Decimal('48379.00'))
('Hannah', 56, 'New York', Decimal('84126.00'))
('Paul', 35, 'Houston', Decimal('91438.00'))
('Liam', 32, 'Chicago', Decimal('118931.00'))
('Hannah', 36, 'Los Angeles', Decimal('80513.00'))
('Nate', 29, 'San Antonio', Decimal('89020.00'))
('Olivia', 49, 'San Antonio', Decimal('87224.00'))
('Steve', 38, 'New York', Decimal('94769.00'))
('David', 34, 'San Diego', Decimal('79076.00'))

# working with sql database 

In [20]:
import pandas as pd
from sqlalchemy import create_engine
from urllib.parse import quote_plus 

# ---- 1. Connection Details ----
host = "127.0.0.1"
port = 3306
user = "root"
password = quote_plus("*Notebook@2025")  # URL-encode special chars
database = "employee"

# SQLAlchemy connection string for MySQL
engine = create_engine(f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}")





In [234]:
query = "SELECT city, AVG(salary) AS avg_salary FROM emp GROUP BY city"
df_avg = pd.read_sql(query, engine)
print(df_avg)

Empty DataFrame
Columns: [city, avg_salary]
Index: []


In [235]:
from sqlalchemy import text

with engine.connect() as conn:
    result = conn.execute(text("SELECT * FROM emp"))
    rows = result.fetchall()

print(rows)  # see all rows


[]


Extract – Get the dataset (CSV or from an API).

In [22]:
import pandas as pd 

df = pd.read_csv("data.csv")
df.head()

Unnamed: 0,Name,Age,City,Salary
0,David,34,San Diego,79076
1,Charlie,35,Phoenix,76699
2,Quinn,38,New York,93478
3,Eve,32,Phoenix,92233
4,Paul,27,Houston,96396


Step 2 – Transform

In [23]:
# Remove duplicates
df = df.drop_duplicates()

# Remove rows with missing values in important columns
df = df.dropna(subset=['Name', 'Salary'])

# Convert salary to integer if not already
df['Salary'] = df['Salary'].astype(int)

# Add a column for yearly bonus (10% of salary)
df['yearly_bonus'] = df['Salary'] * 0.10

df.head()

Unnamed: 0,Name,Age,City,Salary,yearly_bonus
0,David,34,San Diego,79076,7907.6
1,Charlie,35,Phoenix,76699,7669.9
2,Quinn,38,New York,93478,9347.8
3,Eve,32,Phoenix,92233,9223.3
4,Paul,27,Houston,96396,9639.6


Step 3 – Load into MySQL

In [26]:
import mysql.connector
from urllib.parse import quote_plus 


conn = mysql.connector.connect(
        host='127.0.0.1',
        port=3306,
        user='root',
        password='*Notebook@2025',
        database='employee'
)

cursor = conn.cursor()


# create a table 

create_table_query = '''
    CREATE TABLE IF NOT EXISTS employees(
            name VARCHAR(255),
            age INT,
            city VARCHAR(255),
            salary INT,
            yearly_bonus FLOAT
            ) ''' 

cursor.execute(create_table_query)


# Insert data into table

insert_data_q='''
    INSERT INTO employees(name, age, city, salary, yearly_bonus)
                VALUES(%s, %s, %s, %s, %s )
'''

for _, row in df.iterrows():
    cursor.execute(insert_data_q, tuple(row))

conn.commit()
conn.close()
print("Data successfully loaded into MySQL!")


Data successfully loaded into MySQL!


etl_pipeline_notebook.py (but all inside your Jupyter cell)

In [9]:
import pandas as pd
import mysql.connector
from urllib.parse import quote_plus 


# 1️⃣ EXTRACT: Load CSV into Pandas DataFrame

def extract_data(file_path):
    df = pd.read_csv(file_path)
    return df


# 2️⃣ TRANSFORM: Example transformations

def transform_data(df):
    # Remove duplicate rows
    df = df.drop_duplicates()

    # Remove rows with missing names
    df = df.dropna(subset=['Name','Salary'])

    # Example: Ensure salary is numeric
    df['salary'] = pd.to_numeric(df['Salary'], errors='coerce')

    return df

# 3️⃣ LOAD: Insert into MySQL

def load_data(df):


    conn = mysql.connector.connect(
        host='127.0.0.1',
        port=3306,
        user='root',
        password='*Notebook@2025',
        database='employee'
    )

    cursor = conn.cursor()
    # Create table if not exists
    cursor.execute("""CREATE TABLE IF NOT EXISTS employees(
                name VARCHAR(100),
                age INT,
                city VARCHAR(100),
                slaray DECIMAL(10,2),
                yearly_bonus DECIMAL(10,2)
    ) """)

    # Insert data row-by-row
    query="""INSERT INTO employees(name,age, city, salary, yearly_bonus) VALUES(%s, %s, %s, %s, %s) """
    
    for _,row in df.iterrows():
        cursor.execute(query, tuple(row))

    conn.commit()
    conn.close()

# 4️⃣ RUN the ETL

def run_etl(file_path):
    print("📥 Extracting data...")
    df = extract_data(file_path)
    print(df.head())

    print("🔄 Transforming data...")
    df = transform_data(df)

    print("💾 Loading data into MySQL...")
    load_data(df)
    
    print("✅ ETL Pipeline complete.")

# Example usage in Jupyter
run_etl("data.csv")    

📥 Extracting data...
      Name  Age       City  Salary
0    David   34  San Diego   79076
1  Charlie   35    Phoenix   76699
2    Quinn   38   New York   93478
3      Eve   32    Phoenix   92233
4     Paul   27    Houston   96396
🔄 Transforming data...
💾 Loading data into MySQL...
✅ ETL Pipeline complete.


In [70]:
df.columns.tolist()


['Name', 'Age', 'City', 'Salary', 'yearly_bonus']


# 4️⃣ REPORT: Run analytics and export

In [7]:
import pandas as pd
import mysql.connector
from urllib.parse import quote_plus 
import warnings


warnings.filterwarnings('ignore', message='pandas only supports SQLAlchemy')


# 1️⃣ EXTRACT: Load CSV into Pandas DataFrame
def extract_data(file_path):
    df = pd.read_csv(file_path)
    return df

# 2️⃣ TRANSFORM: Example transformations
def transform_data(df):
    # Remove duplicate rows
    df = df.drop_duplicates()

    # Remove rows with missing names
    df = df.dropna(subset=['Name','Salary'])

    # Example: Ensure salary is numeric
    df['Salary'] = pd.to_numeric(df['Salary'], errors='coerce')

    return df

# 3️⃣ LOAD: Insert into MySQL

def load_data(df):


    conn = mysql.connector.connect(
        host='127.0.0.1',
        port=3306,
        user='root',
        password='*Notebook@2025',
        database='employee'
    )

    cursor = conn.cursor()    
    insert_query = """
    INSERT INTO employees (Name, Age, City, Salary)
    VALUES (%s, %s, %s, %s)
    """

    for _, row in df.iterrows():
        cursor.execute(insert_query, (
        row['Name'],
        row['Age'],
        row['City'],
        row['Salary']
        ))


    conn.commit()
    conn.close()
    print("✅ Data loaded into MySQL.")


    
def export_reports():
        conn = mysql.connector.connect(
        host='127.0.0.1',
        port=3306,
        user='root',
        password='*Notebook@2025',
        database='employee'
        )

        cursor = conn.cursor()

# Example: Average salary per city
        query="""SELECT city, AVG(Salary) AS average_salary FROM employees 
            GROUP BY City 
            ORDER BY average_salary DESC
        """
        report_df = pd.read_sql(query, conn)  # ✅ include the connection


        # Export to CSV
        report_df.to_csv("Avg_Salary_report.csv", index=False)
        # Export to Excel
        report_df.to_excel("Avg_Salary_report.xlsx", index=False)

        conn.commit()
        conn.close()
        print("📊 Reports exported: salary_report.csv & salary_report.xlsx")

# 5️⃣ RUN EVERYTHING


def run_etl_with_reports(file_path):
    print("📥 Extracting data...")
    df = extract_data(file_path)
    print(df.head())

    print("🔄 Transforming data...")
    df = transform_data(df)

    print("💾 Loading data into MySQL...")
    load_data(df)

    print("📊 Generating reports...")
    export_reports()
    
    print("✅ ETL Pipeline complete.")

run_etl_with_reports("data.csv")    
    

📥 Extracting data...
      Name  Age       City  Salary
0    David   34  San Diego   79076
1  Charlie   35    Phoenix   76699
2    Quinn   38   New York   93478
3      Eve   32    Phoenix   92233
4     Paul   27    Houston   96396
🔄 Transforming data...
💾 Loading data into MySQL...
✅ Data loaded into MySQL.
📊 Generating reports...
📊 Reports exported: salary_report.csv & salary_report.xlsx
✅ ETL Pipeline complete.


In [8]:
pd.read_excel("Avg_Salary_report.xlsx")


Unnamed: 0,city,average_salary
0,Chicago,118931.0
1,Houston,90789.3333
2,Phoenix,84466.0
3,New York,82931.0
4,Los Angeles,80513.0
5,San Antonio,77295.1667
6,San Diego,70433.6667
