In [1]:
''' 
Cleaning Customer Names from CSV Data.

Scenario: You've received dataset from a vendor.
It contains inconsistent formatting(extra spaces, wrong casing). 
you need to standardize it before loading into Azure SQL Database.

'''

raw_customer = [" aLICE "," BoB ", "CHARLIE ", " david "]

# clean & format: strip spaces + capitalize
cleaned_customers = list(map(lambda x: x.strip().capitalize(), raw_customer))

print("Cleaned Customers:",cleaned_customers)

Cleaned Customers: ['Alice', 'Bob', 'Charlie', 'David']


In [None]:
### Every Data Engineer follow this before load into the data ware house

In [3]:
''' 
Use Case 2: Filtering Failed API Logs

Scenario: you collect daily logs from APIs and must extract only failed ones for debugging.
'''
api_logs = [
    {"endpoint":"/login","status":200},
    {"endpoint":"/fetch_data","status":500},
    {"endpoint":"/update_user","status":200},
    {"endpoint":"/logout","status":500}
]

# Filter failed logs (status code >= 400).

failed_logs = list(filter(lambda log: log['status'] >= 400, api_logs))
print("Failed API logic:", failed_logs)

Failed API logic: [{'endpoint': '/fetch_data', 'status': 500}, {'endpoint': '/logout', 'status': 500}]


In [None]:
## Real world concepts:
## Monitoring & filtering error events from system logs before alerting engineers(common in productions)


In [9]:
'''
usecase 3: Salary Conversion in ETL.
Scenarios:
HR Provides salary data in USD; your job is to convert it to INR before saving into Azure Blob Storage.

'''

employee_data = [
    {"name":"Alice","salary_usd":50000},
    {"name":"Bob","salary_usd":60000},
    {"name":"Surya","salary_usd":80000}
]

# Convert to INR using map
usd_to_INR = 83
converted = list(map(lambda e: {**e,"salary_inr": e["salary_usd"] * usd_to_INR}, employee_data))
print("Converted Salary Data:", converted)

Converted Salary Data: [{'name': 'Alice', 'salary_usd': 50000, 'salary_inr': 4150000}, {'name': 'Bob', 'salary_usd': 60000, 'salary_inr': 4980000}, {'name': 'Surya', 'salary_usd': 80000, 'salary_inr': 6640000}]


In [10]:
## Real-world Concept:
# Transformation step during ETL (Extract -> Transform -> Load)


In [11]:
'''
Usecase 4: Identify Active Users 
Scenario: You're cleaning user engagement data from a website. 
you must filter users who have more then 5 active sessions.
'''

user_sessions = [
    {"users": "Alice","sessions":3},
    {"users": "Bob","sessions":4},
    {"users": "Ram","sessions":6},
    {"users": "Raj","sessions":7}
]

# Filter active users
active_users = list(filter(lambda x: x['sessions'] > 5, user_sessions))
print("Active Users:", active_users)
    

Active Users: [{'users': 'Ram', 'sessions': 6}, {'users': 'Raj', 'sessions': 7}]


In [13]:
## Use Case 5:
## CLean IoT Sensor Data
'''
Scenario:
IoT sensors stream temperature readings, 
but some are invalid( None or Negatiive). 
you must filter valid readings and convert to Fahrenheit.
'''

sensor_data = [38.5,None, -5, 28.5, 32.0, None]

# Filter valid readings:
valid_readings = list(filter(lambda t: t is not None and t >= 0,sensor_data))

# Convert C to F.
fahrenheit = list(map(lambda t: round(( t * 9/5) + 32,2), valid_readings))

print(" Valid Readings ( Fahrenheit):",fahrenheit)

 Valid Readings ( Fahrenheit): [101.3, 83.3, 89.6]


## ETL Pipeline - Sales Data Cleaner
### Real-World Scenario
    

In [16]:
import csv

def extract_data(filepath):
    """ Extract: Read CSV into a list of dicts """
    with open(filepath, 'r') as file:
        reader = csv.DictReader(file)
        return list(reader)
        
def transform_data(records):
    """ Transform: Clean and process the data """
    # step 1 - Normalize customer names
    records = list(map(lambda r:{**r,
    "customer_name": r["customer_name"].strip().title(),
    "product":r["product"].strip().title(),
    "amount":r["amount"].strip()},records))

# step 2 - Filter invalid rows (missing /zero amount)
    records = list(filter(lambda r: r["amount"] and float(r["amount"]) > 0, records))

# step 3 - Add 10% commission column  
    for r in records:
        amount = float(r["amount"])
        r["commission"] = round(amount * 0.10, 2)

        return records

def load_data(records, output_path):
    """ Load: write cleaned data to a new CSV """
    with open(output_path, 'w', newline='') as file:
        fieldnames = ["customer_name","product","amount","commission"]
        writer = csv.DictWriter(file,fieldnames=filednames)
        writer.writeheader()
        writer.writerows(records)

def etl_pipeline(input_file, output_file):
    """ Run the full ETL pipelines """
    raw_data = extract_data(input_file)
    cleaned_data = transform_data(raw_data)
    load_data(cleaned_data, output_file)
    print(f" ETL complete:{len(cleaned_data)} valid rows written to {output_file}")
    
    etl_pipeline("book1.csv","cleaned_sales.csv")


In [17]:
## Simple ETL Example - Sales Data Cleaning:

sales_data = [
    {"name": "alice ","product":" laptop ","amount":"55000"},
    {"name": "Bob ","product":" electronic ","amount":"65000"},
    {"name": "alICE ","product":" Mobile ","amount":"55000"}
]

# step 1 - clean the data using map()
cleaned_data = list(map(lambda x: {
    "name":x["name"].strip().title(),
    "product":x["product"].strip().title(),
    "amount":x["amount"].strip().title()
}, sales_data))

# Step 2 - Remove invalid rows using filter()

valid_sales = list(filter(lambda x:x["amount"] and float(x["amount"]) > 0, cleaned_data))

# Step 3 - Add commission column (10%)
for sale in valid_sales:
    sale["commission"] = round(float(sale["amount"]) * 0.10 ,2)

# Step 4 - print the result
print(valid_sales)


[{'name': 'Alice', 'product': 'Laptop', 'amount': '55000', 'commission': 5500.0}, {'name': 'Bob', 'product': 'Electronic', 'amount': '65000', 'commission': 6500.0}, {'name': 'Alice', 'product': 'Mobile', 'amount': '55000', 'commission': 5500.0}]


### Microsoft style-Data Engineer Tasks

In [13]:
# Scenario:
## you're given a list of employee details from multiple systems.

# Given tasks 
# Clean the data - remove spaces and fix case (e.g,"Jay","Data").
# Filter out employees with missing salary.
# Add a new column: "annual_bonus" = 0.1 * salary

employees = [
    {"name":"jAy","dept":"Data","salary":"8000"},
    {"name":"RAVI","dept":"Data","salary":" "},
    {"name":"meena","dept":"Analytics","salary":"95000"}
]

# Clean the data using map():

cleaned_data = list(map(lambda x : {
            "name":x["name"].strip().capitalize(),
            "dept":x["dept"].strip().capitalize(),
            "salary":x["salary"].strip().capitalize()
},employees))
print("Cleaned Data set:",cleaned_data)

# step 2: Filter out employees with missing salary

print("Filter out employees with missing salary")
missing_salary = list(filter(lambda x : x["salary"] and float(x["salary"]) > 1, cleaned_data))
print(missing_salary)

# step 3: Add new column: " annual_bonus" = 0.1 * salary

print("Added New Column with Annual Bonus:")
for sale in missing_salary:
    sale["annual_bonus"] = round(float(sale["salary"]) * 0.1 )
print(missing_salary)
    

Cleaned Data set: [{'name': 'Jay', 'dept': 'Data', 'salary': '8000'}, {'name': 'Ravi', 'dept': 'Data', 'salary': ''}, {'name': 'Meena', 'dept': 'Analytics', 'salary': '95000'}]
Filter out employees with missing salary
[{'name': 'Jay', 'dept': 'Data', 'salary': '8000'}, {'name': 'Meena', 'dept': 'Analytics', 'salary': '95000'}]
Added New Column with Annual Bonus:
[{'name': 'Jay', 'dept': 'Data', 'salary': '8000', 'annual_bonus': 800}, {'name': 'Meena', 'dept': 'Analytics', 'salary': '95000', 'annual_bonus': 9500}]


In [24]:
# Task 2: Weather Data Transformation
'''
Scenario: A weather API gives temperatures in Celsius.
you need to convert them to fehrenheit and remove invalid records.

Tasks to complete:

-> Convert temp_c -> temp_f = (temp_c * 9/5) + 32.
-> Remove any records with missing temperature.

'''

weather = [
    {"city":"Seattle","temp_c":12},
    {"city":"Mumbai","temp_c":33},
    {"city":"Paris","temp_c":None}
]

# Remove the records with missing temperature.
valid_weather = list(filter(lambda x: x["temp_c"] is not None, weather))

# Convert Celsius -> Fahrenheit
cleaned_data = list(map(lambda x:
{
    "city": x["city"].title(),
    "temp_f":round((float(x["temp_c"]) * 9/5) + 32,1)}, valid_weather))
print("Cleaned_data:", cleaned_data)


Cleaned_data: [{'city': 'Seattle', 'temp_f': 53.6}, {'city': 'Mumbai', 'temp_f': 91.4}]


In [25]:
## Task 3: Data Transformation from Logs



In [30]:
## Task 4: Revenue Summary
'''
Scenario:
You get monthly sales from 3 regions

north = [1200, 1300, 1250]
south = [1100, 1050, 1150]
east = [900, 950, 1000]

 Your task:
1. Increase all numbers by 5% (price growth)
2. Calculate total revenue for each region.

'''

north = [1200, 1300, 1250]
south = [1100, 1050, 1150]
east = [900, 950, 1000]

# step 1: Increse by 5%
north_updated = list(map(lambda x: round(x * 1.05, 2), north))
south_updated = list(map(lambda x: round(x * 1.05, 2),south))
east_updated = list(map(lambda x: round(x * 1.05, 2),east))

# step 2: Calculate total per region
totals = {
    "north_total" : sum(north_updated),
    "south_updated" :sum(south_updated),
    "east_updated" :sum(east_updated)
}
print(totals)

{'north_total': 3937.5, 'south_updated': 3465.0, 'east_updated': 2992.5}


In [27]:
## Task 5: Identify High - value Customers.

'''
Scenario:
Your Company tracks customers and their purchases.
'''

customers = [
    {"name":"Alice","purchases":[200,300,400]},
    {"name":"Bob","purchases":[100,200,300]},
    {"name":"Charlie","purchases":[500,600,700]}
]

''' 
Your Task:
1. Calculate total purchase per customer.
2. Filter only customers who spent more then 800
'''

# step 1: Calculate total per customer
totals = list(map(lambda c: {"name":c["name"],"total":sum(c["purchases"])},customers))

# step 2: Filter high-value customers:
high_value = list(filter(lambda c: c["total"] > 800, totals))

print(high_value)


[{'name': 'Alice', 'total': 900}, {'name': 'Charlie', 'total': 1800}]


In [6]:
# Date 25/10/2025.
''''
Scenario 1:

Problems: 
You receive daily files from different source. Some columns need transformation,
and some rows need filtering.

Task 1:
-> Convert all email addresses to lowercase.
-> Keep only rows where age > 18.
-> Extract domains from email addresses.

'''

Data = [
    {"name":"Alice","email":"ALICE@gmail.com","age":56},
    {"name":"Bob","email":"BOB@gmail.com","age":17},
    {"name":"Charlie","email":"charlie@example.com","age":34}
]

# 1. Filter adults only 
adults = list(filter(lambda x : x['age'] > 18,Data))

# 2. Normalize all the emails addresses to lowercase.
email_addresses = list(map(lambda x : {**x, "email": x["email"].lower()},adults))

print(email_addresses)


[{'name': 'Alice', 'email': 'alice@gmail.com', 'age': 56}, {'name': 'Charlie', 'email': 'charlie@example.com', 'age': 34}]


#### normalized = list(map(lambda x: {**x, "email": x['email'].lower()}, adults)). what is **x means here.

Ah! This is an important Python concept often used in data engineering and automation. Let’s break it down carefully.

The line you mentioned:

```python
normalized = list(map(lambda x: {**x, "email": x['email'].lower()}, adults))
```

Here’s what each part means:

---

### **1. `x`**

* `x` represents **each element in the `adults` list**.
* In our example, `adults` is a list of dictionaries:

```python
adults = [
    {"name": "Alice", "email": "ALICE@Example.com", "age": 25},
    {"name": "Charlie", "email": "charlie@example.com", "age": 30}
]
```

* So, `x` will be:

```python
{"name": "Alice", "email": "ALICE@Example.com", "age": 25}  # first iteration
```

---

### **2. `{**x, "email": x['email'].lower()}`

This is **dictionary unpacking**.

* `**x` takes all key-value pairs from dictionary `x` and **copies them into a new dictionary**.
* Then, `"email": x['email'].lower()` **overwrites the `email` key** with the lowercase version.

So for Alice:

```python
x = {"name": "Alice", "email": "ALICE@Example.com", "age": 25}

# {**x, "email": x['email'].lower()} 
# becomes
{"name": "Alice", "email": "alice@example.com", "age": 25}
```

✅ Essentially, it **creates a new dictionary based on the old one but updates only the email field**.

---

### **3. Why we use `**x` instead of modifying `x` directly**

* `x` comes from the `adults` list. Modifying it directly could **change the original data** (mutability issues).
* Using `{**x, ...}` ensures **a new dictionary is created**, leaving the original list unchanged.
* This is a functional programming style, common in MAANG interviews.

---

If you want, I can draw a **small visual flow diagram** showing how `map()` + `lambda` + `**x` transforms the dictionary step by step—it’s super helpful for interviews.

Do you want me to do that?


In [8]:
# Extract email address
domains = list(map(lambda x : x['email'].split('@')[1], email_addresses))
print(domains)

['gmail.com', 'example.com']


In [9]:
''' 
Scenario 2: Log File Automation:
Problem:
you have server logs in a list of strings. you want to"
--> Keep only error logs.
--> Extract the error codes automatically.

'''

log = [
    "INFO: Server started",
    "ERROR: Code 500 - Internal Server Error",
    "WARNING: Low memory",
    "ERROR: Code 404 - Not Found"
]

#  Filter error logs
error_logs = list(filter(lambda x : "ERROR" in x, log))

# Extract error code
error_codes = list(map(lambda x: x.split()[4], error_logs))

print(error_logs)
print(error_codes)

['ERROR: Code 500 - Internal Server Error', 'ERROR: Code 404 - Not Found']
['Internal', 'Not']


In [10]:
'''
Scenario 3: Automated Notification for High-value Transactions

Problem: 
you have a list of transactions. 
you want to automatically identify high-value transactions(>$10,000) to trigger alerts.

'''

transactions = [
    {"id":1,"amount":5000},
    {"id":2,"amount":15000},
    {"id":3,"amount":25000}
]

# Filter high-value transactions.
high_value = list(filter(lambda x : x['amount'] > 10000,transactions))

# Extract transactions IDs for notifications
notify_ids = list(map(lambda x: x['id'], high_value))

print(high_value)
print(notify_ids)



[{'id': 2, 'amount': 15000}, {'id': 3, 'amount': 25000}]
[2, 3]


In [11]:
'''
Scenario 4: Batch Data Transformation

Problem:
You need to normalize numerical data for machine learning or analytics.

'''
raw_numbers = [10,20,30,40,50]

# Scale numbers between 0 and 1
min_val,max_val = min(raw_numbers),max(raw_numbers)
scaled = list(map(lambda x : (x - min_val) / (max_val - min_val), raw_numbers))

print(scaled)

[0.0, 0.25, 0.5, 0.75, 1.0]


Formula for Min-Max Scaling

scaled value = 𝑥 − min / max − min.

This formula converts all numbers into a range between 0 and 1.

In [12]:
'''
Automated Financial Report Cleanup:
Use Case: Clean up large transaction logs before storing in a database.

'''
transactions = [ 
    {"amount":5000,"currency":"usd"},
    {"amount":0,"currency":"usd"},
    {"amount":10000,"currency":"usd"}
]

valid = list(filter(lambda x : x["amount"] > 1, transactions))
totals = list(map(lambda x : x["amount"], transactions))
print(totals)

[5000, 0, 10000]


In [1]:
'''
Scenario 1: Data Cleaning Automation

Use Case : A Data Engineer is processing customer data where some phone numbers or emails are invalid. 
you need to automatically clean and standardize them.

'''

# input data:
Customers = [
    {"name":"Alice","email":"alice123@gmail.com"},
    {"name":"Surya","email":"Surya123@gmail.com"},
    {"name":"Ram","email":"ram@gmail.com"},
    {"name":"Rajan","email":"rajan@outlook.com"}
]

# clean valid emails only using filter and lambda
valid_customers = list(filter(lambda x: '@' in x['email'] and '.' in x['email'],Customers))

# Extract clean names and emails using map
cleaned_data = list(map(lambda x: (x['name'].title(), x['email'].lower()),valid_customers))

print(cleaned_data)

[('Alice', 'alice123@gmail.com'), ('Surya', 'surya123@gmail.com'), ('Ram', 'ram@gmail.com'), ('Rajan', 'rajan@outlook.com')]


In [3]:
''' 
Scenario 3: Log File Processing

Use Case: you are automating the processing of server logs to filter out only the error messages.

'''

logs = [
    "INFO: Service started",
    "WARNING: Low disk space",
    "ERROR: Database connection failed",
    "INFO: Request processed",
    "ERROR: API timeout"
]

# Filter only errors
error_logs = list(filter(lambda log:"ERROR" in log, logs))

# Extract only the error message part using map
error_message = list(map(lambda log: log.split(":")[1].strip(), error_logs))

print(error_message)

['Database connection failed', 'API timeout']


In [6]:
'''
Scenario 4: IoT or Sensor Data Filtering
Use Case:
A pipeline receives temperature data from IoT sensors. 
you must filter faulty reading(e.g., < 0 or >100 C).

'''
sensor_readings = [25,30,-5,105,45,60]

valid_readings = list(filter(lambda x : 0 <= x >= 100, sensor_readings))

# Convert to Fahrenheit
to_fahrenheit = list(map(lambda x : (x * 9/5 ) + 32, valid_readings))

print(to_fahrenheit)

[221.0]


In [7]:
'''
Scenario 5: File Automation( AI Data Preprocessing)
Before sending text data to an AI Model, 
automate preprocessing like trimming whitespaces and converting to lowercase.

'''

sentences = [" Hello World ", " DATA Engineer ", " Microsoft AI "]

# Clean sentences
cleaned = list(map(lambda s : s.strip().lower(), sentences))

# Filter out empty strings
filtered = list(filter(lambda s : len(s) > 0, cleaned))

print(filtered)
    


['hello world', 'data engineer', 'microsoft ai']


In [12]:
'''
Text Preprocessing API (AI/ML Use Case)
Scenario:
Before feeding text data into an ML model, you must clean it via an API.

Goal:
-> Removes extra spaces
-> Converts to lowercase
-> Filters empty strings

'''

from fastapi import FastAPI
@app.post("/clean_text")
def clean_text(data:List[str]):
    cleaned = list(map(lambda s: s.strip().lower(), data))
    filtered = list(filter(lambda s: len(s) > 0, cleaned))
    return {"cleaned_texts:" filtered}
    

SyntaxError: invalid syntax. Perhaps you forgot a comma? (3232569716.py, line 17)