In [1]:
# Sample dataset
covid_data = [
    {"country": "USA", "cases": 250000, "deaths": 5000},
    {"country": "India", "cases": 180000, "deaths": 3500},
    {"country": "Brazil", "cases": None, "deaths": 2800}
]

# 1. Clean missing data
def clean_missing(data, default=0):
    return[
        {k: v if v is not None else default
         for k, v in entry.items()}
        for entry in data
    ]

# 2. Calculate mortality rate
def mortality_rate(entry):
    return (entry["deaths"] / entry["cases"]) * 100 if entry["cases"] > 0 else 0

# 3. Apply analysis
cleaned = clean_missing(covid_data)
rates = list(map(mortality_rate, cleaned))

print(cleaned)
print(rates)

[{'country': 'USA', 'cases': 250000, 'deaths': 5000}, {'country': 'India', 'cases': 180000, 'deaths': 3500}, {'country': 'Brazil', 'cases': 0, 'deaths': 2800}]
[2.0, 1.9444444444444444, 0]


In [2]:
# PRACTICE

In [None]:
# WEATHER DATA
weather_data = [
    {"city": "New York", "date": "2023-05-01", "temp_f": 68, "condition": "Rain"},
    {"city": "New York", "date": "2023-05-02", "temp_f": 72, "condition": "Cloudy"},
    {"city": "New York", "date": "2023-05-03", "temp_f": 85, "condition": "Sunny"},
    {"city": "Chicago", "date": "2023-05-01", "temp_f": 62, "condition": "Cloudy"},
    {"city": "Chicago", "date": "2023-05-02", "temp_f": None, "condition": "Rain"},  # Missing data
    {"city": "Chicago", "date": "2023-05-03", "temp_f": 78, "condition": "Sunny"}
]

def f_to_c(temp_f):
    """
    Convert Fahrenheit to Celsius.
    Args:
        temp_f (float/int): Temperature in Farenheit
    Returns:
        float: Temperature in Celcius, or None if input is None
    """
    return (temp_f - 32) * 5/9 if temp_f is not None else None

# Test cases
print(f_to_c(68))  # Output: 20.0
print(f_to_c(None)) # Output: None

def clean_missing_temps(data, default=72): # Clean missing data
    """
    Replace missing temperature values with a default.
    Args:
        data (list): List of weather day dictionaries
        default (int/float): Fallback temperature value
    Return:
        list: Cleaned data with no None temps
    """
    return [
        {**day, "temp_f": day["temp_f"] if day["temp_f"] is not None else default}
        for day in data
    ]
# Test
cleaned_data = clean_missing_temps(weather_data)
print(cleaned_data[4]["temp_f"]) # Output: 72 (replaced None)
#print(cleaned_data)

def get_extremes(data, key, condition=lambda x: True):
    """
    Find min/max values in data matching a condition.
    Args:
        data (list): List of dictionaries
        key (str): Dictionary key to analyze
        condition (function): Filter function
    Return:
        tuple: (main_value, max_value)
    """
    values = [day[key] for day in data if condition(day) and day[key] is not None]
    return (min(values), max(values)) if values else (None, None)
# Test - Find hottest/coldest sunny days
sunny_extremes = get_extremes(weather_data, "temp_f", condition=lambda day: day["condition"] == "Sunny")
print(sunny_extremes) #Output: (78, 85)

def safe_analyze(func, data, fallback=(None, None)):
    """
    Safely run analysis with error handling.
    Args:
        func (function): Analysis function
        data (list): Input data
        fallback: Value to return on failure
    Returns:
        func result or fallback
    """
    try:
        return func(data)
    except (ValueError, TypeError, KeyError) as e:
        print(f"‚ö†Ô∏è Analysis failed: {type(e).__name__} - {str(e)}")
        return fallback

# Test with bad data
bad_data = [{"temp_f": "invalid_string"}]
result = safe_analyze(
    lambda d: get_extremes(d, "temp_f"),
    bad_data
)
print(result) # Output: ‚ö†Ô∏è Analysis failed: TypeError....-> (None, None)

def weather_alert(data, city, temp_threshold=80, condition="Rain"):
    """
    Generate alerts for extreme weather conditions.
    Args:
        data (list): Weather data
        city (str): Target city
        temp_threshold (int): Heat alert threshold
        condition (str): Dangerous weather condition
    Returns:
        str: Alert message
    """
    alerts = [
        f"{day['date']} ({day['condition']}, {day['temp_f']}¬∞F)"
        for day in data
        if day["city"] == city
        and (
            (day["temp_f"] is not None and day["temp_f"] > temp_threshold)
            or day["condition"] == condition
        )
    ]
    return (
        f"üö® {city} alerts: " + " | ".join(alerts) 
        if alerts 
        else f"‚úÖ No alerts for {city}"
    )

# Test
print(weather_alert(weather_data, "New York"))
# Output: "üö® New York alerts: 2023-05-03 (Sunny, 85¬∞F) | 2023-05-01 (Rain, 68¬∞F)"

20.0
None
72
[85, 78]
None
['invalid_string']
None
üö® New York alerts: 2023-05-01 (Rain, 68¬∞F) | 2023-05-03 (Sunny, 85¬∞F)


In [21]:
# Challenge 1: Movie Ratings

def summarize_movies(titles, ratings=(0,), favorite=None):
    return {
        title: f"{i}. {title.title()} rated {rating}" + (" ‚≠êÔ∏è" if favorite and title.lower() == favorite.lower() else "")
        for i, (title, rating) in enumerate(zip(titles, ratings), start=1)
    }

# Challenge 2: Animal Ages
def describe_pets(names, ages=(0,), favorite=None):
    return {
        name: f"{i}. {name.title()} is {age} years old" +
            (" ‚ù§Ô∏è" if favorite and name.lower() == favorite.lower() else "")
        for i, (name, age) in enumerate(zip(names, ages), start=1)
    }
    
# Challenge 3: Book Ratings
def book_reviews(books, ratings=(0,), highlight=4.0):
    return {
        book: f"{i}. {book.title()} rated {rating}" +
        (" ‚≠êÔ∏è" if rating >= highlight else "")
        for i, (book, rating) in enumerate(zip(books, ratings), start=1)
    }

In [22]:
def summarize_scores(names, scores=(0,), highlight=90):
    top = max(scores)
    return {
        name: f"{i}. {name.title()} scored {score}" +
        (" ‚≠êÔ∏è" if score >= highlight else "") +
        (" üèÜ" if score == top else "")
        for i, (name, score) in enumerate(zip(names, scores),  start=1)
    }
    
students = ["alice", "bob", "carol"]
scores = [85, 92, 90]

result = summarize_scores(students, scores)
print(result)


{'alice': '1. Alice scored 85', 'bob': '2. Bob scored 92 ‚≠êÔ∏è üèÜ', 'carol': '3. Carol scored 90 ‚≠êÔ∏è'}


In [49]:
def summarize_multi_scores(names, subjects, scores_matrix, highlight=90):
    averages = [sum(scores)/ len(scores) for scores in scores_matrix]
    top_avg = max(averages)
    
    result = {}
    for i, (name, scores, avg) in enumerate(zip(names, scores_matrix, averages), start=1):
        subject_scores = [
            f"{subject.title()}: {score}{" ‚≠êÔ∏è" if score >= highlight else ""}"
            for subject, score in zip(subjects,scores)
        ]
        description = f"{i}. {name.title()} - " + ", ".join(subject_scores)
        (" üèÜ" if avg == top_avg else "")
        result[name] = description
    return result

students = ["alice", "bob", "carol"]
subjects = ["math", "science", "history"]
scores = [
  [95, 87, 90],   # alice
  [88, 92, 85],   # bob
  [100, 98, 99]   # carol
]

result = summarize_multi_scores(students, subjects, scores)
print(result)

{'alice': '1. Alice - Math: 95 ‚≠êÔ∏è, Science: 87, History: 90 ‚≠êÔ∏è', 'bob': '2. Bob - Math: 88, Science: 92 ‚≠êÔ∏è, History: 85', 'carol': '3. Carol - Math: 100 ‚≠êÔ∏è, Science: 98 ‚≠êÔ∏è, History: 99 ‚≠êÔ∏è'}


In [None]:
def summarize_multi_scores(names, subjects, scores_matrix, highlight=90):
    # First, calculate all student averages
    averages = [sum(scores) / len(scores) for scores in scores_matrix]
    top_avg = max(averages) 
    # Now build the dictionary
    result = {
        name: f"{i}. {name.title()} - " + ", ".join([
            f"{subject.title()}: {score}{'‚≠êÔ∏è' if score >= highlight else ''}"
            for subject, score in zip(subjects, scores)
        ])
        
        for i, (name, scores, avg) in enumerate(zip(names, scores_matrix, averages), start=1)
        
    }
    return result

students = ["alice", "bob", "carol"]
subjects = ["math", "science", "history"]
scores = [
  [95, 87, 90],   # alice
  [88, 92, 85],   # bob
  [100, 98, 99]   # carol
]

result = summarize_multi_scores(students, subjects, scores)
print(result)


{'alice': '1. Alice - Math: 95‚≠êÔ∏è, Science: 87, History: 90‚≠êÔ∏è', 'bob': '2. Bob - Math: 88, Science: 92‚≠êÔ∏è, History: 85', 'carol': '3. Carol - Math: 100‚≠êÔ∏è, Science: 98‚≠êÔ∏è, History: 99‚≠êÔ∏è üèÜ'}


In [77]:
# Student Progress Report
def report_card(names, subjects, scores_matrix, highlight=90, topper=None):
     return {
         name: f"{i}. {name.title()} -" + ", ".join([
             f"{subject.title()}: {score}{" ‚≠êÔ∏è" if  score >= highlight else ""}"
             for subject, score in zip(subjects, scores)
             ]) + f" (avg: {round(sum(scores)/len(scores), 2)})" +
         (" üèÜ" if topper and name.lower() == topper.lower() else "")
         for i, (name, scores) in enumerate(zip(names, scores_matrix), start=1)
     }

names = ["alice", "bob", "carol"]
subjects = ["math", "science", "history"]
scores = [
    [95, 87, 90],
    [88, 92, 85],
    [100, 98, 99]
]

output = report_card(names, subjects, scores, topper="carol")
print(output)

{'alice': '1. Alice -Math: 95 ‚≠êÔ∏è, Science: 87, History: 90 ‚≠êÔ∏è (avg: 90.67)', 'bob': '2. Bob -Math: 88, Science: 92 ‚≠êÔ∏è, History: 85 (avg: 88.33)', 'carol': '3. Carol -Math: 100 ‚≠êÔ∏è, Science: 98 ‚≠êÔ∏è, History: 99 ‚≠êÔ∏è (avg: 99.0) üèÜ'}


In [101]:
# Language Learning Tracker
def vocab_leaderboard(learners, languages, word_counts, goal=1000):
    # sort and zip data by word_counts descending
    sorted_data = sorted(zip(learners, languages, word_counts), key=lambda x: x[2], reverse=True)
    
    # get top scorer
    top_score = sorted_data[0][2]
    
    # create ranked leaderboard
    return {
        learner: f"{rank}. {learner.title()} is learning {language.title()} - {word_count} words learned ({round(word_count/goal * 100, 2)}%)" +
        (" üéâ" if word_count >= goal else " üå±") +
        (" üèÜ" if word_count == top_score else "")
        for rank, (learner, language, word_count) in enumerate(sorted_data, start=1)
    }
    
# data
learners = ["alice", "bob", "carol"]
languages = ["spanish", "japanese", "french"]
word_counts = [850, 920, 1005]

#result
result = vocab_leaderboard(learners, languages, word_counts)
print(result)

{'carol': '1. Carol is learning French - 1005 words learned (100.5%) üéâ üèÜ', 'bob': '2. Bob is learning Japanese - 920 words learned (92.0%) üå±', 'alice': '3. Alice is learning Spanish - 850 words learned (85.0%) üå±'}


In [109]:
def sales_summary(sellers, regions, monthly_sales, target=13000):
    # sorted data by monthly_sales descending
    sorted_data = sorted(zip(sellers, regions, monthly_sales), key=lambda x: x[2], reverse=True)
    
    # top seller
    top_seller = sorted_data[0][2]
    
    return {
        seller: f"{i}. {seller.title()} from {region} - Total: ${sales} ({round(sales/ target * 100, 2)}%)" +
        (" ‚úÖ" if sales >= target else " ‚ö†Ô∏è") +
        (" üèÜ" if sales == top_seller else "")
        for i, (seller, region, sales) in enumerate(sorted_data, start=1)
    }

sellers = ["alice", "bob", "carol"]
regions = ["north", "south", "west"]
monthly_sales = [9800, 12000, 15000]

output = sales_summary(sellers, regions, monthly_sales)
print(output)


{'carol': '1. Carol from west - Total: $15000 (115.38%) ‚úÖ üèÜ', 'bob': '2. Bob from south - Total: $12000 (92.31%) ‚ö†Ô∏è', 'alice': '3. Alice from north - Total: $9800 (75.38%) ‚ö†Ô∏è'}


In [None]:
# SALES REPORT
def advanced_sales_report(sellers, regions, monthly_sales, target=13000):
    # sorted data by monthly_sales descending
    sorted_data = sorted(zip(sellers, regions, monthly_sales), key=lambda x: x[2], reverse=True)
    
    # top seller
    top_seller = sorted_data[0][2]
    
    # averages sales
    averages = round(sum(monthly_sales)/ len(monthly_sales), 2)
    
    # Sales report
    return {
        seller: f" {rank}. {seller.title()} from {region.title()} - ${sales} ({round(sales/target * 100, 2)}%
        )" +
        (" ‚úÖ" if sales >= target else "") +
        (" üí°" if sales > averages else "") +
        (" üèÜ" if sales == top_seller else "")
        for rank, (seller, region, sales) in enumerate(sorted_data, start=1)
    }

# data
sellers = ["alice", "bob", "carol"]
regions = ["north", "south", "west"]
monthly_sales = [9800, 12000, 15000]

# output
output = advanced_sales_report(sellers, regions, monthly_sales)
print(output)

{'carol': ' 1. Carol from West - $15000 (115.38%) ‚úÖ üí° üèÜ', 'bob': ' 2. Bob from South - $12000 (92.31%)', 'alice': ' 3. Alice from North - $9800 (75.38%)'}


In [180]:
covid_data = [
    {"country": "USA", "cases": 250000, "deaths": 5000, "recovered": None},
    {"country": "India", "cases": 180000, "deaths": 3500, "recovered": 120000},
    {"country": "Brazil", "cases": None, "deaths": 2800, "recovered": 80000}
]

# Cleaning Values
def clean_missing(data, default=0):
    cleaned = []
    for country in covid_data:
        new_country = {}
        for key, value in country.items():
            new_country[key] = value if value is not None else default
        cleaned.append(new_country)
    return cleaned
        
cleaned_data = clean_missing(covid_data)
print(cleaned_data)

[{'country': 'USA', 'cases': 250000, 'deaths': 5000, 'recovered': 0}, {'country': 'India', 'cases': 180000, 'deaths': 3500, 'recovered': 120000}, {'country': 'Brazil', 'cases': 0, 'deaths': 2800, 'recovered': 80000}]


In [None]:
# COVID19 DATA
covid_data = [
    {"country": "USA", "cases": 250000, "deaths": 5000, "recovered": None},
    {"country": "India", "cases": 180000, "deaths": 3500, "recovered": 120000},
    {"country": "Brazil", "cases": None, "deaths": 2800, "recovered": 80000}
]
# clean data
def clean_missing(data, default=0, allow_negative=False):
    """
    cleaning None value to 0.
    Args:
        data (list): List contain a dictionaries of covid_data
        default: Value to use for missing data
    Return:
        list: Cleaned data
    """
    return [
        {
            key: default if (
                value is None or 
                (not allow_negative and isinstance(value, (int, float))) and value < 0
            )
            else value for key, value in country.items()
        }
        for country in data
    ]
# Test with negative data
test_data = [
    {"country": "Test", "cases": -100, "deaths": 50, "recovered": None}
]
print(clean_missing(test_data))
# Output: [{'country': 'Test', 'cases': 0, 'deaths': 50, 'recovered': 0}]
    
# Mortality rate
def calculate_mortality(country_data):
    try:
        rate = (country_data["deaths"] / country_data["cases"] * 100)
        return round(rate, 2) if rate >= 0 else None
    except (TypeError, ZeroDivisionError):
        return None

# Filtering by mortality rate
def filter_mortality(data, threshold=1.5):
    filtered = []
    for country in data:
        if calculate_mortality(country) is not None
        and calculate_mortality(country) > threshold:
            filtered.append(country)
    return filtered

# Pipeline
def analyze_covid(data):
    cleaned = clean_missing(data)
    result = []
    for country in cleaned:
        result.append(
            {"country": country["country"], "mortality": calculate_mortality(country)}
        )
    return result

# Testing cleaned data
cleaned_data = clean_missing(covid_data)
print(cleaned_data)

# Testing mortality
print(calculate_mortality(cleaned_data[0]))  # 2.0 (5000/250000*100)
print(calculate_mortality({"cases": 0, "deaths": 100}))  # None

# Testing analyze_covid
print(analyze_covid(covid_data))

[{'country': 'Test', 'cases': 0, 'deaths': 50, 'recovered': 0}]
[{'country': 'USA', 'cases': 250000, 'deaths': 5000, 'recovered': 0}, {'country': 'India', 'cases': 180000, 'deaths': 3500, 'recovered': 120000}, {'country': 'Brazil', 'cases': 0, 'deaths': 2800, 'recovered': 80000}]
2.0
None
[{'country': 'USA', 'mortality': 2.0}, {'country': 'India', 'mortality': 1.94}, {'country': 'Brazil', 'mortality': None}]


In [226]:
# RETAIL SALES
sales_data = [
    {"product": "Laptop", "price": 999.99, "units_sold": 120, "return_rate": 0.02},
    {"product": "Mouse", "price": 24.99, "units_sold": 350, "return_rate": None},
    {"product": "Keyboard", "price": 49.99, "units_sold": 200, "return_rate": 0.05},
    {"product": "Monitor", "price": 199.99, "units_sold": None, "return_rate": 0.01}
]

# Cleaning data
def clean_sales_data(data, default_units=0, default_return=0):
    """
    Replace None values in sales data with defaults.
    Args:
        data: List of sales dictionaries
        default_units: Value for missing units_sold
        default_return: value for missing return_rate
    Return:
        list: Cleaned data
    """
    cleaned = []
    for new_data in data:
        sales = {}
        for key, value in new_data.items():
            if key == "units_sold" and value is None:
                sales[key] = default_units
            elif key == "return_rate" and value is None:
                sales[key] = default_return
            else:
                sales[key] = value
        cleaned.append(sales)
    return cleaned

# Calculate Revenue
def calculate_revenue(product):
    """
    Calculate net revenue (adjusted for returns).
    Formula: (Price * units_sold) * (1 - return_rate)
    Args:
        product: Single product dictionary 
    Return:
        float: Net revenue or None if invalid data
    """
    try:
        if product["units_sold"] < 0 or product["return_rate"] < 0:
            raise ValueError("Negative values are not allowed")
        revenue = (product["price"] * product["units_sold"]) * (1 - product["return_rate"])
        return round(revenue, 2)
    except (TypeError):
        return None

# Find Top performer
def get_top_products(data, metric="revenue", n=3):
    """
    Get top N products by specified metric.
    Args:
        data: List of cleaned product dictionaries
        metric: "revenue" or "units_sold"
        n: Number of top products to return
    Return:
        list: Sorted product names
    """
    valid_data = [p for p in data if p.get(metric) is not None]
    if not valid_data:
        print(f"Warning: No valid data for metric '{metric}'")
        return []
    return [p["product"] for p in sorted(valid_data, key=lambda x: x[metric], reverse=True) [:n]]
    
cleaned_data = clean_sales_data(sales_data)
revenues = [calculate_revenue(cleaned_data)]
top_sellers = get_top_products(cleaned_data, "units_sold", 2)

print("Cleaned Data:", cleaned_data)
print("Revenues:", revenues)
print("Top Sellers:", top_sellers)

Cleaned Data: [{'product': 'Laptop', 'price': 999.99, 'units_sold': 120, 'return_rate': 0.02}, {'product': 'Mouse', 'price': 24.99, 'units_sold': 350, 'return_rate': 0}, {'product': 'Keyboard', 'price': 49.99, 'units_sold': 200, 'return_rate': 0.05}, {'product': 'Monitor', 'price': 199.99, 'units_sold': 0, 'return_rate': 0.01}]
Revenues: [None]
Top Sellers: ['Mouse', 'Keyboard']


In [None]:
# INVENTORY ANALYSIS
inventory = [
    {"id": 101, "name": "Wireless Earbuds", "category": "Electronics", "stock": 45, "cost": 25.99, "price": 59.99},
    {"id": 102, "name": "Yoga Mat", "category": "Fitness", "stock": 120, "cost": 12.50, "price": 29.99},
    {"id": 103, "name": "Blender", "category": "Home", "stock": 0, "cost": 40.00, "price": 89.99},
    {"id": 104, "name": "Notebook", "category": "Office", "stock": 75, "cost": None, "price": 4.99},
    {"id": 105, "name": "Resistance Bands", "category": "Fitness", "stock": 60, "cost": 8.20, "price": 24.99}
]

# Cleaning data
def clean_inventory(data, default_cost=0.0):
    """
    Replace None cost with default, add margins, and flag out-of-stock items.
    Args:
        data:  List of inventory dictionaries
        default_cost (float): default parameter 0.0
    Return:
    """
    cleaned = []
    for new_inventory in data:
        new_data = {}
        for key, value in new_inventory.items():
            if key == "cost" and value is None:
                new_data[key] = default_cost
            else:
                new_data[key] = value
        new_data["margin"] = (round(new_inventory["price"] - new_inventory["cost"], 1) if new_inventory["cost"] is not None else None)
        new_data["need_restock"] = True if key == "cost" and value is None else False
        cleaned.append(new_data)
    return cleaned

# Financial Analysis


# Smart Restocker
def generate_restock_list(data, threshold=10):
    """
    Identify items needing restock (stock <= threshold).
    AND with margin > 20%.
    Returns:
        list: Sorted by most needed (lowest stock first)
    """
    return sorted(
        [item for item in data if item["stock"] <= threshold and (item.get("margin") or 0) > 20], 
        key=lambda x: x["stock"]
    )

def sell_product(inventory, product_id, quantity):
    updated_inventory = []
    sale_success = False
    
    for product in inventory:
        # make a copy to avoid modifying original data
        product_copy = product.copy()
        
        #find matching product
        if product["id"] == product_id:
            # validate quantity
            if quantity <= 0:
                print(f"üõë invalid quantity ({quantity} for  {product['name']})")
                updated_inventory.append(product_copy)
                return, False, inventory # Early return on invalid input
            
            # checking stock and apply discounts
            if product["stock"] >= quantity and quantity > 0:
                # Bulk discount
                if quantity > 50:
                    product_copy["price"] *= 0.9 # 10% discount
                    print(f"üéâ Applied 10% bulk discount to {product['name']}")
                    
                # process transaction
                product_copy["stock"] -= quantity
                sale_success = True
                print(f"‚úÖ Sold {quantity} units of {product['name']}")
                product_copy["last_sale"] = datetime.now().isoformat() # add timestamp
                
                # restock alert
                if product_copy["stock"] < 10:
                    product_copy["need_restock"] = True
                    print(f"‚ö†Ô∏è Low stock alert for {product['name']}")
                else:
                    print(f"üõë Not enough stock for {quantity} units of {product["name"]}")
        updated_inventory.append(product_copy)
    return sale_success, updated_inventory

cleaned_data = clean_inventory(inventory)
#metrics = analyze_profitability(cleaned_data)
restock_list = generate_restock_list(cleaned_data)

print("Cleaned Data:", cleaned_data)  # First item sample
#print("Metrics:", metrics)
print("Restock List:", [item["name"] for item in restock_list])

Cleaned Data: [{'id': 101, 'name': 'Wireless Earbuds', 'category': 'Electronics', 'stock': 45, 'cost': 25.99, 'price': 59.99, 'margin': 34.0, 'neeed_restock': False}, {'id': 102, 'name': 'Yoga Mat', 'category': 'Fitness', 'stock': 120, 'cost': 12.5, 'price': 29.99, 'margin': 17.5, 'neeed_restock': False}, {'id': 103, 'name': 'Blender', 'category': 'Home', 'stock': 0, 'cost': 40.0, 'price': 89.99, 'margin': 50.0, 'neeed_restock': False}, {'id': 104, 'name': 'Notebook', 'category': 'Office', 'stock': 75, 'cost': 0.0, 'price': 4.99, 'margin': None, 'neeed_restock': False}, {'id': 105, 'name': 'Resistance Bands', 'category': 'Fitness', 'stock': 60, 'cost': 8.2, 'price': 24.99, 'margin': 16.8, 'neeed_restock': False}]
Restock List: ['Blender']


In [266]:
# SALES DATA
sales_data = [
    {
        "month": "Jan-2023",
        "revenue": 150000,
        "expenses": 95000,
        "campaigns": ["New Year Sale", "Winter Clearance"],
        "top_products": [
            {"product": "Winter Jacket", "units": 420, "return_rate": 0.08},
            {"product": "Heating Blanket", "units": 380, "return_rate": 0.12}
        ]
    },
    {
        "month": "Feb-2023",
        "revenue": 185000,
        "expenses": 110000,
        "campaigns": ["Valentine's Special"],
        "top_products": [
            {"product": "Chocolate Box", "units": 510, "return_rate": 0.03},
            {"product": "Teddy Bear", "units": 290, "return_rate": 0.05}
        ]
    },
    {
        "month": "Mar-2023",
        "revenue": None,
        "expenses": 105000,
        "campaigns": [],
        "top_products": [
            {"product": "Yoga Mat", "units": 320, "return_rate": 0.04},
            {"product": "Dumbbells", "units": 180, "return_rate": 0.15}
        ]
    }
]

for campaigns in sales_data:
    for top_product in campaigns["top_products"]:
        print(top_product)

{'product': 'Winter Jacket', 'units': 420, 'return_rate': 0.08}
{'product': 'Heating Blanket', 'units': 380, 'return_rate': 0.12}
{'product': 'Chocolate Box', 'units': 510, 'return_rate': 0.03}
{'product': 'Teddy Bear', 'units': 290, 'return_rate': 0.05}
{'product': 'Yoga Mat', 'units': 320, 'return_rate': 0.04}
{'product': 'Dumbbells', 'units': 180, 'return_rate': 0.15}


In [331]:
# SALES DATA
sales_data = [
    {
        "month": "Jan-2023",
        "revenue": 150000,
        "expenses": 95000,
        "campaigns": ["New Year Sale", "Winter Clearance"],
        "top_products": [
            {"product": "Winter Jacket", "units": 420, "return_rate": 0.08},
            {"product": "Heating Blanket", "units": 380, "return_rate": 0.12}
        ]
    },
    {
        "month": "Feb-2023",
        "revenue": 185000,
        "expenses": 110000,
        "campaigns": ["Valentine's Special"],
        "top_products": [
            {"product": "Chocolate Box", "units": 510, "return_rate": 0.03},
            {"product": "Teddy Bear", "units": 290, "return_rate": 0.05}
        ]
    },
    {
        "month": "Mar-2023",
        "revenue": None,
        "expenses": 105000,
        "campaigns": [],
        "top_products": [
            {"product": "Yoga Mat", "units": 320, "return_rate": 0.04},
            {"product": "Dumbbells", "units": 180, "return_rate": 0.15}
        ]
    }
]

# Clean data
def clean_sales_data(data, default_revenue=0):
    """
    Clean missing data and calculate drived metrics:
    - Add "profit" (revenue - expenses)
    - Flag months with no campaigns
    - Replace missing revenue with default
    Returns cleaned list with new keys
    """
    cleaned = []
    for month in data:
        cleaned_month = month.copy()
        cleaned_month["revenue"] = month["revenue"] if month["revenue"] is not None else default_revenue
        cleaned_month["profit"] = cleaned_month["revenue"] - month["expenses"]
        cleaned_month["has_campaigns"] = len(month["campaigns"]) > 0
        cleaned.append(cleaned_month)
    return cleaned

# Campaign performance analysis
def analyze_campaigns(data):
    # get months with campaigns
    campaigns_months = [m for m in data if m["has_campaigns"]]
    
    if not campaigns_months:
        return {
            "avg_revenue_per_campaigns": 0,
            "best_month": None,
            "campaigns_coverage": 0
        }
    # calculate metrics
    avg_revenue =  sum(m["revenue"] for m in campaigns_months) / len(campaigns_months)
    best_month = max(campaigns_months, key=lambda x: x["revenue"])["month"]
    
    return {
        "avg_revenue_per_campaign": avg_revenue,
        "best_month": best_month,
        "campaign_coverage": round(len(campaigns_months)/ len(data),2)
    }

# Product return rate analysis
def identify_high_return_products(data, threshold=0.07):
    """
    Find products with high return rate.
    Args:
        data: List of sales data dictionaries
        threshold: Minimum return rate
    Returns:
        sorted ascending by return rate
        list: Products, Month, Return rate
    """
    high_return = []
    for month in data:
        for product in month["top_products"]:
            if product["return_rate"] > threshold:
                high_return.append({
                    "product_name": product["product"],
                    "month": month["month"],
                    "return_rate": product["return_rate"],
                    "units": product["units"]
                })
    return sorted(high_return, key=lambda x: x["return_rate"], reverse=True)

# Monthly performance report
def generate_performance_report(data):
    cleaned_data = clean_sales_data(data)
    campaign_stats = analyze_campaigns(cleaned_data)
    high_return = identify_high_return_products(cleaned_data)
    
    # calculate report  metrics
    total_profit = sum(m["profit"] for m in cleaned_data)
    best_month = max(cleaned_data, key=lambda x: x["profit"])
    worst_return = high_return[0] if high_return else None
    
    report = f"""
    # Sales Performance Report
    ## Financial Summary
    - Total Profit: ${total_profit:,.2f}
    - Most Profitable Month: {best_month['month']} (${best_month["profit"]:,.2f})
    - Average Revenue per Campaign: ${campaign_stats['avg_revenue_per_campaign']:,.2f}
    
    ## Product Analysis
    - Highest Return Product: {worst_return['product_name'] if worst_return else 'N/A'}
    ({worst_return['return_rate']*100:.1f}% in {worst_return['month'] if worst_return else ''})
    - Campaign Coverage: {campaign_stats.get('campaign_coverage')*100:.1f}% of months
    """
    return report

# Predict next month
def predict_next_month(data):
    # Get last 3 months with revenue data
    valid_months = [m for m in data if m["revenue"] is not None][-3:]
    
    if not valid_months:
        return {"predicted_revenue": None, "warning": "Insufficient data"}
    
    # Simple moving average forecast
    avg_revenue = sum(m["revenue"] for m in valid_months) / len(valid_months)
    avg_expenses = sum(m["expenses"] for m in valid_months) / len(valid_months)
    
    return {
        "predicted_revenue": round(avg_revenue),
        "predicted_profit": round(avg_revenue - avg_expenses),
        "warning": "Potential loss" if (avg_revenue - avg_expenses) < 0 else None
    }
    
cleaned_data = clean_sales_data(sales_data)
print("Cleaned Data Sample:", cleaned_data[0])
print("\nCampaign Analysis:", analyze_campaigns(cleaned_data))
print("\nHigh Return Products:", identify_high_return_products(cleaned_data))
print("\nPerformance Report:")
print(generate_performance_report(sales_data))
print("\nNext Month Prediction:", predict_next_month(cleaned_data))

Cleaned Data Sample: {'month': 'Jan-2023', 'revenue': 150000, 'expenses': 95000, 'campaigns': ['New Year Sale', 'Winter Clearance'], 'top_products': [{'product': 'Winter Jacket', 'units': 420, 'return_rate': 0.08}, {'product': 'Heating Blanket', 'units': 380, 'return_rate': 0.12}], 'profit': 55000, 'has_campaigns': True}

Campaign Analysis: {'avg_revenue_per_campaign': 167500.0, 'best_month': 'Feb-2023', 'campaign_coverage': 0.67}

High Return Products: [{'product_name': 'Dumbbells', 'month': 'Mar-2023', 'return_rate': 0.15, 'units': 180}, {'product_name': 'Heating Blanket', 'month': 'Jan-2023', 'return_rate': 0.12, 'units': 380}, {'product_name': 'Winter Jacket', 'month': 'Jan-2023', 'return_rate': 0.08, 'units': 420}]

Performance Report:

    # Sales Performance Report
    ## Financial Summary
    - Total Profit: $25,000.00
    - Most Profitable Month: Feb-2023 ($75,000.00)
    - Average Revenue per Campaign: $167,500.00

    ## Product Analysis
    - Highest Return Product: Dumbbel

In [364]:
# CHALLENGE: Quarterly Sales Performance

def quarterly_report(teams, q1, q2, q3, q4, growth_threshold=0.1):
    sorted_data = sorted(zip(teams, q1, q2, q3, q4, q1 + q2 + q3 + q4), key=lambda x: x[0], reverse=True)
    top_rank = sorted_data[0][5]
    result = []
    for rank, (team, q1, q2, q3, q4, total) in enumerate(sorted_data, start=1):
        result.append({
            team.title(): f"{rank}. {team.title()} - Total: {q1+q2+q3+q4}, Growth: {(q4-q1)/q1}" +
            (" üìà" if ((q4-q1)/q1) > growth_threshold else "") +
            (" üèÜ" if total == top_rank else "")
        })
    return result
    
teams = ["alpha", "beta", "gamma"]
q1_sales = [3000, 5000, 4000]
q2_sales = [3500, 5300, 4200]
q3_sales = [4000, 5600, 4600]
q4_sales = [4800, 6000, 5500]

result = quarterly_report(teams, q1_sales, q2_sales, q3_sales, q4_sales)
print(result)

[{'Gamma': '1. Gamma - Total: 18300, Growth: 0.375 üìà üèÜ'}, {'Beta': '2. Beta - Total: 21900, Growth: 0.2 üìà'}, {'Alpha': '3. Alpha - Total: 15300, Growth: 0.6 üìà'}]


In [376]:
def departments_performance_report(departments, regions, q1, q2, q3, q4, growth_threshold=20,revenue_goal=50000):
    combined = [(department, region, q1s, q2s, q3s, q4s, q1s+q2s+q3s+q4s) 
                for department, region, q1s, q2s, q3s, q4s in 
                zip(departments, regions, q1, q2, q3, q4)]
    
    sorted_data = sorted(combined, key=lambda x: x[0], reverse=True)
    top_department = sorted_data [0][6]
    result = {}
    for rank, (department, region, q1, q2, q3, q4, total_revenue) in enumerate(sorted_data, start=1):
        growth = round((q4-q1)/q1 * 100, 2)
        avg = round(total_revenue/4, 2)
        result[department.title()] = (
            f"{rank}. {department.title()} from {region.title()} - Total Annual Revenue: ${total_revenue:,}, Quarterly Average: ${avg:,}, Growth: {growth}%" +
            (" ‚úÖ" if total_revenue > revenue_goal else "") +
            (" üìà" if growth >= growth_threshold else "") +
            (" üèÜ" if total_revenue == top_department else "")
        )
    return result

departments = ["sales", "marketing", "support", "development"]
regions = ["north", "east", "south", "west"]

q1_revenue = [12000, 9500, 7800, 15000]
q2_revenue = [13000, 9800, 8200, 15800]
q3_revenue = [14000, 10000, 8600, 16200]
q4_revenue = [15500, 11000, 9100, 17000]

result = departments_performance_report(departments, regions, q1_revenue, q2_revenue, q3_revenue, q4_revenue)
print(result)

{'Support': '1. Support from South - Total Annual Revenue: $33,700, Quarterly Average: $8,425.0, Growth: 16.67% üèÜ', 'Sales': '2. Sales from North - Total Annual Revenue: $54,500, Quarterly Average: $13,625.0, Growth: 29.17% ‚úÖ üìà', 'Marketing': '3. Marketing from East - Total Annual Revenue: $40,300, Quarterly Average: $10,075.0, Growth: 15.79%', 'Development': '4. Development from West - Total Annual Revenue: $64,000, Quarterly Average: $16,000.0, Growth: 13.33% ‚úÖ'}


In [401]:
# E-Commerce customer transactions
customers = [
    {
        "customer_id": "C1001",
        "age": 28,
        "loyalty_tier": "Gold",
        "purchases": [
            {"order_id": "ORD101", "date": "2023-01-15", "amount": 150.99, "category": "Electronics"},
            {"order_id": "ORD132", "date": "2023-03-22", "amount": 89.99, "category": "Home"},
            {"order_id": "ORD156", "date": "2023-03-29", "amount": 245.50, "category": "Electronics"}
        ]
    },
    {
        "customer_id": "C1002",
        "age": 42,
        "loyalty_tier": "Silver",
        "purchases": [
            {"order_id": "ORD102", "date": "2023-01-20", "amount": 75.50, "category": "Clothing"},
            {"order_id": "ORD177", "date": "2023-04-05", "amount": 120.00, "category": None}
        ]
    },
    {
        "customer_id": "C1003",
        "age": None,
        "loyalty_tier": "Bronze",
        "purchases": []  # No purchases
    }
]

# Data Quality Check
def validate_customer_data(customers):
    """
    Identifies data quality issues in customer records.
    Returns counts of:
    - Customers with no purchases
    - Missing age values
    - Purchases with null categories
    """
    issues =  {
        "customer_with_no_purchases": 0,
        "missing_age_values": 0,
        "purchases_with_null_category": 0
    }
    
    for customer in customers:
        if not customer["purchases"]:
            issues["customer_with_no_purchases"] += 1
        if customer["age"] is None:
            issues["missing_age_values"] += 1
        for purchase in customer["purchases"]:
            if purchase["category"] is None:
                issues["purchases_with_null_category"] += 1
    return issues
# Customer Segmentation
from collections import defaultdict

def segment_customers(customers):
    """
    Analyzes customers by loyalty tier.
    Returns metrics:
    - Average purchase amount
    - Most popular category
    - Purchase frequency (purchases/month)
    """
    segments = defaultdict(lambda: {
        "total_amount": 0,
        "purchase_count": 0,
        "category_counts": defaultdict(int),
        "customer_count": 0
    })
    
    for customer in customers:
        tier = customer["loyalty_tier"]
        segments[tier]["customer_count"] += 1
        
        for purchase in customer["purchases"]:
            segments[tier]["total_amount"] += purchase["amount"]
            segments[tier]["purchase_count"] += 1
            segments[tier]["category_counts"][purchase["category"]] += 1
        
        # calculate final metrics
        result = {}
        for tier, data in segments.items():
            avg_amount = data["total_amount"] / data["purchase_count"] if data["purchase_count"] else 0
            popular_category = max(data["category_counts"].items(), key=lambda x: x[1])[0] if data["category_counts"] else "N/A"
            
            result[tier] = {
                "avg_purchase_amount": round(avg_amount, 2),
                "most_popular_category": popular_category,
                "purchases_per_customer": round(data["purchase_count"]/ data["customer_count"], 2)
            }
    return result
# Cohort analysis
def analyze_cohorts(customers, age_brackets=[20, 30, 40, 50]):
    """
    Groups customers by age brackets and calculates:
    - Average total spend
    - Most popular category
    - Repeat purchase rate
    """
    cohorts = defaultdict(lambda: {
        "customers": [],
        "total_spend": 0,
        "purchase_count": 0,
        "category_counts": defaultdict(int),
        "repeat_customers": 0
    })
    
    for customer in customers:
        # Determine age bracket
        if customer["age"] is None:
            bracket = "Unknown"
        else:
            for i in range(len(age_brackets)-1):
                if age_brackets[i] <= customer["age"] < age_brackets[i+1]:
                    bracket = f"{age_brackets[i]}-{age_brackets[i+1]-1}"
                    break
            else:
                bracket = f"{age_brackets[-1]}+"
        
        # Update cohort metrics
        cohorts[bracket]["customers"].append(customer["customer_id"])
        purchase_count = len(customer["purchases"])
        
        if purchase_count > 1:
            cohorts[bracket]["repeat_customers"] += 1
        
        for purchase in customer["purchases"]:
            cohorts[bracket]["total_spend"] += purchase["amount"]
            cohorts[bracket]["purchase_count"] += 1
            if purchase["category"]:
                cohorts[bracket]["category_counts"][purchase["category"]] += 1
    
    # Calculate final metrics
    result = {}
    for bracket, data in cohorts.items():
        avg_spend = data["total_spend"] / len(data["customers"]) if data["customers"] else 0
        popular_category = max(data["category_counts"].items(), key=lambda x: x[1])[0] if data["category_counts"] else "N/A"
        repeat_rate = data["repeat_customers"] / len(data["customers"]) if data["customers"] else 0
        
        result[bracket] = {
            "avg_total_spend": round(avg_spend, 2),
            "popular_category": popular_category,
            "repeat_purchase_rate": round(repeat_rate, 2)
        }
    
    return result

# High Value Customer Identification
def identify_high_value_customers(customers, n=3):
    """
    Identifies top customers by:
    1. Total spend
    2. Purchase frequency
    3. Electronics spend (if applicable)
    """
    ranked_customers = []
    for customer in customers:
        metrics = {
            "customer_id": customer["customer_id"],
            "total_spend": sum(p["amount"] for p in customer["purchases"]),
            "purchase_count": len(customer["purchases"]),
            "electronics_spend": sum(
                p["amount"] for p in customer["purchases"]
                if p["category"] == "Electronics"
            )
        }
        ranked_customers.append(metrics)
        
    ranked_customers.sort(key=lambda x: (-x["total_spend"], -x["purchase_count"], -x["electronics_spend"]))
    return ranked_customers[:n]

# Advanced: Churn Risk Prediction
from datetime import datetime, timedelta

def predict_churn_risk(customers, inactive_days=90):
    """
    Flags customers at risk of churn based on:
    - No recent purchases (last x days)
    - Declining purchase frequency
    """
    today = datetime.now().date()
    cutoff_date = today - timedelta(days=inactive_days)
    at_risk = []
    
    for customer in customers:
        if not customer["purchases"]:
            at_risk.append({
                "customer_id": customer["customer_id"],
                "risk_score": 1.0,
                "reasons": "No purchase history"
            })
            continue
        
        # Convert string dates to date objects
        try:
            purchase_dates = [
                datetime.strptime(p["date"], "%Y-%m-%d").date()
                for p in customer["purchases"]
            ]
        except KeyError:
            continue
        
        last_purchase = max(purchase_dates)
        
        # Calculate purchase frequency change
        purchase_months = sorted({d.replace(day=1) for d in purchase_dates})
        freq_change = 0
        if len(purchase_months) > 1:
            freq_change = (len(purchase_dates) / len(purchase_months)) - 1
            
        # Risk calculation
        risk_score = 0
        reasons = []
        
        if last_purchase < cutoff_date:
            inactive_weeks = (today - last_purchase).days // 7
            risk_score += min(0.5 + inactive_weeks * 0.1, 1.0)
            reasons.append(f"Inactive for {(today - last_purchase).days} days")
        
        if freq_change < -0.3: # Frequency dropped > 30%
            risk_score += 0.3
            reasons.append(f"Purchase frequency dropping")
        
        if risk_score >= 0.5:
            at_risk.append({
                "customer_id": customer["customer_id"],
                "risk_score": min(risk_score, 1.0),
                "reason": reasons
            })
            
    return sorted(at_risk, key=lambda x: -x["risk_score"])

# Run all analyses
data_issues = validate_customer_data(customers)
segmentation = segment_customers(customers)
cohorts = analyze_cohorts(customers)
high_value = identify_high_value_customers(customers)
churn_risk = predict_churn_risk(customers)

# Generate comprehensive report
import json

report = {
    "data_quality_issues": data_issues,
    "loyalty_tier_analysis": segmentation,
    "age_cohort_analysis": cohorts,
    "top_customers": high_value,
    "churn_risk_customers": churn_risk
}

print(json.dumps(report, indent=4))

{
    "data_quality_issues": {
        "customer_with_no_purchases": 1,
        "missing_age_values": 1,
        "purchases_with_null_category": 1
    },
    "loyalty_tier_analysis": {
        "Gold": {
            "avg_purchase_amount": 162.16,
            "most_popular_category": "Electronics",
            "purchases_per_customer": 3.0
        },
        "Silver": {
            "avg_purchase_amount": 97.75,
            "most_popular_category": "Clothing",
            "purchases_per_customer": 2.0
        },
        "Bronze": {
            "avg_purchase_amount": 0,
            "most_popular_category": "N/A",
            "purchases_per_customer": 0.0
        }
    },
    "age_cohort_analysis": {
        "20-29": {
            "avg_total_spend": 486.48,
            "popular_category": "Electronics",
            "repeat_purchase_rate": 1.0
        },
        "40-49": {
            "avg_total_spend": 195.5,
            "popular_category": "Clothing",
            "repeat_purchase_rate": 1.

In [407]:
prices = [50, 120, 80, 200]

# Output example:
# ['50', '120 üíµ', '80', '200 üíµ']

# Try it here:
tagged_prices = list(map(lambda price: f"{price} üíµ" if price > 80 else f"{price}" , prices))

print(tagged_prices)

['50', '120 üíµ', '80', '200 üíµ']


In [412]:
from collections import defaultdict

# Sample transactions: (department, amount)
transactions = [
    ("Sales", 2000),
    ("Marketing", 1500),
    ("Sales", 1800),
    ("Support", 1200),
    ("Marketing", 1700),
]

# Group by department and sum the totals using defaultdict + lambda
summary = defaultdict(lambda: 0)

for dept, amount in transactions:
    summary[dept] += amount

print(dict(summary))

{'Sales': 3800, 'Marketing': 3200, 'Support': 1200}


In [419]:
# Data: (name, revenue, growth%)
data = [
    ("Alpha", 15000, 0.05),
    ("Beta", 18000, 0.03),
    ("Gamma", 16000, 0.08),
]

# Sort by revenue descending, then growth ascending
sorted_data = sorted(data, key=lambda x: (-x[1], x[2]))

print(sorted_data)

[('Beta', 18000, 0.03), ('Gamma', 16000, 0.08), ('Alpha', 15000, 0.05)]


In [429]:
columns = ["name", "age", "income"]
types = ["str", "int", "float"]

# Create a dict of processors using lambda
processors = {
    "str": lambda x: x.strip().title(),
    "int": lambda x: int(x),
    "float": lambda x: float(x)
}

# Sample raw row from CSV
raw_row = [" alice ", "30", "50000.0"]

# Apply processors based on types
cleaned = [processors[t](v) for v, t in zip(raw_row, types)]

print(cleaned)

['Alice', 30, 50000.0]


In [432]:
# Sample data: List of tuples (country, cases)
covid_data = [
    ("USA", 35000000),
    ("India", 32000000),
    ("Brazil", 21000000),
    ("France", 7000000),
    ("Germany", 6500000)
]

# 1. Filter countries with > 10M cases
high_cases = list(filter(lambda item: item[1] > 10_000_000, covid_data))
print(high_cases)  # Output: [('USA', 35000000), ('India', 32000000), ('Brazil', 21000000)]

# 2. Convert cases to "per million" (assuming population data exists)
per_million = list(map(lambda item: (item[0], item[1] / 100), covid_data))  # Simplified
print(per_million)

[('USA', 35000000), ('India', 32000000), ('Brazil', 21000000)]
[('USA', 350000.0), ('India', 320000.0), ('Brazil', 210000.0), ('France', 70000.0), ('Germany', 65000.0)]


In [433]:
orders = [
    {"id": 101, "items": [{"name": "Laptop", "price": 999}, {"name": "Mouse", "price": 20}], "status": "shipped"},
    {"id": 102, "items": [{"name": "Keyboard", "price": 50}], "status": "pending"},
    {"id": 103, "items": [{"name": "Monitor", "price": 200}, {"name": "Cable", "price": 10}], "status": "shipped"}
]
# Step 1: Filter shipped orders
shipped_orders = filter(lambda order: order["status"] == "shipped", orders)

# Step 2: Calculate totals and apply discount
def process_order(order):
    total = sum(item["price"] for item in order["items"])
    if total > 500:
        total *= 0.9 # 10% discount
    return {"id": order["id"], "total": round(total, 2)}

# Step 3: Apply to all shipped orders
report = map(process_order, shipped_orders)
print(list(report))





[{'id': 101, 'total': 917.1}, {'id': 103, 'total': 210}]


In [None]:
api_response = [
    {
        "user": "Alice",
        "devices": [
            {"type": "phone", "model": "iPhone 12"},
            {"type": "laptop", "model": "MacBook Pro"}
        ]
    },
    {
        "user": "Bob",
        "devices": [
            {"type": "tablet", "model": "iPad Air"}
        ]
    }
]

# Using list comprehension (cleaner for nested loops)
flattened = [
    {"user": user["user"], "model": device["model"]}
    for user in api_response
    for device in user["devices"]
]

# using map + lambda (less readable here)
flattened = list(map(
    lambda user: list(map(lambda device: {"user": user["user"], "model": device["model"]}, user["devices"])), api_response
))

print(flattened)

[[{'user': 'Alice', 'model': 'iPhone 12'}, {'user': 'Alice', 'model': 'MacBook Pro'}], [{'user': 'Bob', 'model': 'iPad Air'}]]


In [438]:
users = [
    {"id": 1, "name": "Alice", "tier": "gold", "purchases": [100, 200, 300]},
    {"id": 2, "name": "Bob", "tier": "silver", "purchases": [50, 150]},
    {"id": 3, "name": "Charlie", "tier": "bronze", "purchases": [75]}
]

def apply_discount(user):
    discount_rates = {"gold": 0.2, "silver": 0.1, "bronze": 0.05}
    total = sum(user["purchases"])
    discount = total * discount_rates[user["tier"]]
    return {"id": user["id"], "discounted_total": round(total - discount, 2)}

discounted_users = map(apply_discount, users)
print(list(discounted_users))

[{'id': 1, 'discounted_total': 480.0}, {'id': 2, 'discounted_total': 180.0}, {'id': 3, 'discounted_total': 71.25}]


In [440]:
users = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
orders = [{"user_id": 1, "product": "Laptop"}, {"user_id": 2, "product": "Mouse"}]

# Using list comprehension
merged = [
    {"user": user["name"], "product": next(order["product"] for order in orders if order["user_id"] == user["id"])}
    for user in users
]

# Alternative using map + lambda
user_dict = {user["id"]: user["name"] for user in users}
merged = map(
    lambda order: {"user": user_dict[order["user_id"]], "product": order["product"]},
    orders
)

print(list(merged))

[{'user': 'Alice', 'product': 'Laptop'}, {'user': 'Bob', 'product': 'Mouse'}]


In [441]:
data = [{"value": "100"}, {"value": "200"}, {"value": "N/A"}]

# Goal: convert "value" to integers, defaulting to 0 for invalid entries.

# Using a lambda with try-except
parsed = map(
    lambda d: {"value": int(d["value"])} if d["value"].isdigit() else {"value": 0},
    data
)

# Using a helper function (cleaner)
def safe_parse(item):
    try:
        return {"value": int(item["value"])}
    except:
        return {"value": 0}
parsed = map(safe_parse, data)

In [447]:
# Employees and Project data

employees = [
    {"id": 1, "name": "Alice", "skills": ["Python", "SQL"], "years": 5},
    {"id": 2, "name": "Bob", "skills": ["Java"], "years": 3},
    {"id": 3, "name": "Charlie", "skills": ["Python", "JavaScript"], "years": 1}
]
projects = [
    {"id": 1, "name": "Web App", "required_skills": ["Python", "JavaScript"]},
    {"id": 2, "name": "Data Pipeline", "required_skills": ["Python", "SQL"]}
]

# Goal: Match employees to project they're qualified for (all required skills must match).

# Step 1: Filter experienced employees.
experienced = filter(lambda e: e["years"] > 2, employees)

# Step 2: Match projects
def match_projects(employee):
    eligible_projects = [
        project["name"] for project in projects
        if all(skill in employee["skills"] for skill in project["required_skills"])
    ]
    return {"name": employee["name"], "projects": eligible_projects}

matches = map(match_projects, experienced)
print(list(matches))
    


# Include only employees with >2 years of experience.

[{'name': 'Alice', 'projects': ['Data Pipeline']}, {'name': 'Bob', 'projects': []}]


In [None]:
# Filtering countries with >10M cases and recovery rate >80%
covid_data = [
    {"country": "USA", "cases": 35_000_000, "deaths": 600_000, "recovered": 28_000_000},
    {"country": "India", "cases": 32_000_000, "deaths": 400_000, "recovered": 30_000_000},
    {"country": "Brazil", "cases": 21_000_000, "deaths": 500_000, "recovered": 19_000_000},
    {"country": "France", "cases": 7_000_000, "deaths": 100_000, "recovered": 6_500_000},
    {"country": "Germany", "cases": 6_500_000, "deaths": 90_000, "recovered": 6_000_000}
]

# List comprehension
high_risk = [country for country in covid_data if country["cases"] > 10000000 and (country["recovered"]/ country["cases"]) > 0.8]

# Filter + lambda
high_risk = filter(lambda country: country["cases"] > 10000000 and (country["recovered"]/ country["cases"]) > 0.8, covid_data)

In [458]:
# Calcualte mortality rate (deaths per 100K cases)
covid_data = [
    {"country": "USA", "cases": 35_000_000, "deaths": 600_000, "recovered": 28_000_000},
    {"country": "India", "cases": 32_000_000, "deaths": 400_000, "recovered": 30_000_000},
    {"country": "Brazil", "cases": 21_000_000, "deaths": 500_000, "recovered": 19_000_000},
    {"country": "France", "cases": 7_000_000, "deaths": 100_000, "recovered": 6_500_000},
    {"country": "Germany", "cases": 6_500_000, "deaths": 90_000, "recovered": 6_000_000}
]

# Using comprehension
mortality_rates_comp = [{
    "country": country["country"],
    "mortality_rate": round((country["deaths"] / country["cases"]) * 100_000, 2)
} for country in covid_data
]
print(mortality_rates_comp)

# Using map + lambda
mortality_rates_map = map(lambda mortality: {
    "country": mortality["country"],
    "mortality_rate": round((mortality["deaths"] / mortality["cases"]) * 100_000, 2)}, covid_data
    )
print(list(mortality_rates_map))

[{'country': 'USA', 'mortality_rate': 1714.29}, {'country': 'India', 'mortality_rate': 1250.0}, {'country': 'Brazil', 'mortality_rate': 2380.95}, {'country': 'France', 'mortality_rate': 1428.57}, {'country': 'Germany', 'mortality_rate': 1384.62}]
[{'country': 'USA', 'mortality_rate': 1714.29}, {'country': 'India', 'mortality_rate': 1250.0}, {'country': 'Brazil', 'mortality_rate': 2380.95}, {'country': 'France', 'mortality_rate': 1428.57}, {'country': 'Germany', 'mortality_rate': 1384.62}]


In [466]:
# Filter country with mortality rate > 1,500
covid_data = [
    {"country": "USA", "cases": 35_000_000, "deaths": 600_000, "recovered": 28_000_000},
    {"country": "India", "cases": 32_000_000, "deaths": 400_000, "recovered": 30_000_000},
    {"country": "Brazil", "cases": 21_000_000, "deaths": 500_000, "recovered": 19_000_000},
    {"country": "France", "cases": 7_000_000, "deaths": 100_000, "recovered": 6_500_000},
    {"country": "Germany", "cases": 6_500_000, "deaths": 90_000, "recovered": 6_000_000}
]
# Using comprehension
low_risk_comp = [
    country["country"] for country in covid_data if ((country["deaths"]/country["cases"]) * 100_000) > 1500
]
print(low_risk_comp)

# Using map + filter + lambda
low_risk_filter = map(lambda country: country["country"], filter(lambda mortality: (mortality["deaths"]/ mortality["cases"] * 100_000) > 1500, covid_data))
print(list(low_risk_filter))

['USA', 'Brazil']
['USA', 'Brazil']


In [470]:
# Calculate the global average recovery rate.
covid_data = [
    {"country": "USA", "cases": 35_000_000, "deaths": 600_000, "recovered": 28_000_000},
    {"country": "India", "cases": 32_000_000, "deaths": 400_000, "recovered": 30_000_000},
    {"country": "Brazil", "cases": 21_000_000, "deaths": 500_000, "recovered": 19_000_000},
    {"country": "France", "cases": 7_000_000, "deaths": 100_000, "recovered": 6_500_000},
    {"country": "Germany", "cases": 6_500_000, "deaths": 90_000, "recovered": 6_000_000}
]

# Using comprehension
total_cases = sum(cases["cases"] for cases in covid_data)
total_recovered = sum(recover["recovered"] for recover in covid_data)
global_recover_comp = round((total_recovered / total_cases) * 100, 2)

result = f"Global recovery rate: {global_recover_comp}%"
print(result)

Global recovery rate: 88.18%


In [479]:
covid_data_missing = [
    {"country": "USA", "cases": 35_000_000, "deaths": 600_000},
    {"country": "India", "cases": 32_000_000, "deaths": 400_000, "recovered": 30_000_000}
]

def safe_recovery_rate(country):
    if "recovered" in country:
        return (country["recovered"] / country["cases"] * 100)

rates = map(safe_recovery_rate, covid_data_missing)
print(list(rates))

[None, 93.75]
