# Polars: The Ultimate All-in-One Workbook

**A complete, beginner-friendly guide to Polars for data analysis.**

This notebook demonstrates the most important features of Polars, using real datasets from the project. It is written for both technical and non-technical users. All steps are explained in plain English, with clear code and outputs.

---

## 1. Introduction to Polars and Loading Data

Polars is a fast, modern DataFrame library for data analysis in Python. Let's start by importing Polars and loading some datasets.


In [None]:
import polars as pl

# Load a sample of available datasets
clients = pl.read_csv('../datasets/clients.csv')
transactions = pl.read_csv('../datasets/transactions.csv')
assets = pl.read_parquet('../datasets/assets.parquet')
portfolio = pl.read_csv('../datasets/portfolio_performance.csv')
print('Clients:', clients.shape)
print('Transactions:', transactions.shape)
print('Assets:', assets.shape)
print('Portfolio:', portfolio.shape)

## 2. Exploring DataFrames: The Basics

Let's look at the structure of our datasets.


In [None]:
# Show the first few rows of each DataFrame
print('Clients sample:')
print(clients.head())
print('Transactions sample:')
print(transactions.head())
print('Assets sample:')
print(assets.head())
print('Portfolio sample:')
print(portfolio.head())

### DataFrame Info and Summary

Polars makes it easy to see the columns, data types, and summary statistics.


In [None]:
print('Clients columns:', clients.columns)
print('Transactions dtypes:', transactions.dtypes)
print('Assets summary:')
print(assets.describe())

## 3. Data Selection and Filtering

Let's select specific columns and filter rows.


In [None]:
# Select columns
print(clients.select(['client_id', 'name']))

# Filter: Show transactions above a threshold
large_tx = transactions.filter(pl.col('amount') > 10000)
print('Large transactions:')
print(large_tx.head())

## 4. Grouping and Aggregation

Group data to get insights, such as total transactions per client.


In [None]:
# Total transaction amount per client
tx_per_client = transactions.group_by('client_id').agg(pl.col('amount').sum().alias('total_amount'))
print(tx_per_client.head())

## 5. Joining DataFrames

Combine information from different tables.


In [None]:
# Join clients and transactions
joined = clients.join(tx_per_client, on='client_id', how='left')
print(joined.head())

## 6. Advanced Analysis: Window Functions

Calculate running totals and rankings.


In [None]:
# Running total of transactions per client
transactions_sorted = transactions.sort(['client_id', 'date'])
transactions_with_cumsum = transactions_sorted.with_columns(
    pl.col('amount').cum_sum().over('client_id').alias('running_total')
)
print(transactions_with_cumsum.head(10))

## 7. Pivoting and Reshaping Data

Turn long data into wide, or vice versa.


In [None]:
# Pivot: total amount per client per transaction type
pivot = transactions.pivot(
    values='amount',
    index='client_id',
    on='asset_id',  # replace with an actual column name
    aggregate_function='sum'
)
print(pivot.head())

## 8. SQL Queries in Polars

Polars supports SQL for those familiar with it.


In [None]:
ctx = pl.SQLContext()
ctx.register('transactions', transactions)
sql_result = ctx.execute("SELECT client_id, SUM(amount) as total FROM transactions GROUP BY client_id ORDER BY total DESC").collect()
print(sql_result.head())

## 9. Handling Missing Data

Polars makes it easy to find and fill missing values.


In [None]:
# Check for missing values
print(transactions.null_count())
# Fill missing values (if any)
filled = transactions.fill_null(0)
print(filled.head())

## 10. Exporting Data

Save your results for use elsewhere.


In [None]:
# Save to CSV
tx_per_client.write_csv('tx_per_client.csv')
print('Exported tx_per_client.csv')

## Real-Life Wealth Management Scenario: Delivering Strategic Insights

**Scenario:**  
Your employer, a wealth management firm, asks you to deliver a comprehensive analysis of client portfolio performance, risk exposure, and investment trends. The goal is to identify high-value clients, assess risk-adjusted returns, and uncover actionable insights for strategic decision-making.

You have access to the following datasets:
- `clients.csv`: Client demographic and onboarding data
- `transactions.csv`: All buy/sell trades per client
- `assets.parquet`: Asset metadata (type, region, risk)
- `portfolio_performance.csv`: Daily portfolio values and returns

**Your task:**  
1. Integrate all datasets to create a unified view of each client's investment journey.
2. Clean and enrich the data (handle missing values, standardize types, join relevant info).
3. Analyze portfolio performance, risk, and client behavior.
4. Deliver actionable insights for the business.

Let's see how Polars enables you to go above and beyond in this scenario!

In [None]:
# --- Wealth Management Data Analysis: Robust, Transparent, and Maintainable Polars Workflow ---

import polars as pl
import os

# --- 1. Data Loading with Diagnostics and Error Handling ---

def safe_read_csv(path, **kwargs):
    """Safely read a CSV file and provide diagnostics if missing."""
    if not os.path.exists(path):
        print(f"Error: File not found: {path}")
        return None
    df = pl.read_csv(path, **kwargs)
    print(f"Loaded {path} with shape {df.shape}")
    return df

def safe_read_parquet(path, **kwargs):
    """Safely read a Parquet file and provide diagnostics if missing."""
    if not os.path.exists(path):
        print(f"Error: File not found: {path}")
        return None
    df = pl.read_parquet(path, **kwargs)
    print(f"Loaded {path} with shape {df.shape}")
    return df

# Load datasets with clear paths (update paths as needed)
clients = safe_read_csv('../datasets/clients.csv')
transactions = safe_read_csv('../datasets/transactions.csv')
assets = safe_read_parquet('../datasets/assets.parquet')
portfolio = safe_read_csv('../datasets/portfolio_performance.csv')

# --- 2. Data Cleaning & Enrichment: Defensive Datetime Parsing ---

def parse_datetime_column(df, col, fmt, strict=False):
    """Parse a column as datetime if it exists, with diagnostics."""
    if df is not None and col in df.columns:
        print(f"Parsing '{col}' in DataFrame with format '{fmt}' (strict={strict})")
        return df.with_columns(
            pl.col(col).str.strptime(pl.Datetime, fmt, strict=strict)
        )
    elif df is not None:
        print(f"Warning: '{col}' column not found in DataFrame. Skipping datetime parsing.")
    return df

# Parse date columns with robust handling of possible formats
clients = parse_datetime_column(clients, "join_date", "%Y-%m-%dT%H:%M:%S%.f", strict=False)
transactions = parse_datetime_column(transactions, "date", "%Y-%m-%dT%H:%M:%S%.f", strict=False)

# --- Robust Portfolio Date Parsing ---
if portfolio is not None:
    print("Portfolio columns before date parsing:", portfolio.columns)
    # Try to parse 'date' column if present
    if "date" in portfolio.columns:
        portfolio = parse_datetime_column(portfolio, "date", "%Y-%m-%d")
        print("Parsed 'date' column in portfolio as datetime.")
    # Otherwise, try to parse 'trade_date' (common in financial datasets)
    elif "trade_date" in portfolio.columns:
        portfolio = parse_datetime_column(portfolio, "trade_date", "%Y-%m-%d")
        print("Parsed 'trade_date' column in portfolio as datetime.")
    else:
        print("Warning: No suitable date column ('date' or 'trade_date') found in portfolio. Available columns:", portfolio.columns)
    # Show a preview of the parsed portfolio
    print("Sample of portfolio DataFrame after date parsing:")
    print(portfolio.head())

# --- 3. Data Integration: Robust Joins with Diagnostics ---

# Join transactions with asset metadata
if transactions is not None and assets is not None:
    tx_assets = transactions.join(assets, left_on="asset_id", right_on="asset_id", how="left")
    print("Joined transactions with assets. Result shape:", tx_assets.shape)
else:
    tx_assets = None
    print("Warning: Could not join transactions with assets due to missing data.")

# Join with client info
if tx_assets is not None and clients is not None:
    tx_full = tx_assets.join(clients, left_on="client_id", right_on="client_id", how="left")
    print("Joined tx_assets with clients. Result shape:", tx_full.shape)
else:
    tx_full = None
    print("Warning: Could not join tx_assets with clients due to missing data.")

# --- 4. Analysis 1: Identify High-Value Clients ---

if tx_full is not None:
    print("tx_full columns:", tx_full.columns)
    # Defensive aggregation: check for risk_score or fallback to just amount/num_trades
    if "risk_score" in tx_full.columns:
        total_invested = (
            tx_full
            .group_by("client_id")
            .agg([
                pl.col("amount").sum().alias("total_invested"),
                pl.col("amount").count().alias("num_trades"),
                pl.col("risk_score").mean().alias("avg_risk_score")
            ])
            .sort("total_invested", descending=True)
        )
    elif "risk" in tx_full.columns:
        total_invested = (
            tx_full
            .group_by("client_id")
            .agg([
                pl.col("amount").sum().alias("total_invested"),
                pl.col("amount").count().alias("num_trades"),
                pl.col("risk").mean().alias("avg_risk_score")
            ])
            .sort("total_invested", descending=True)
        )
        print("Warning: Used 'risk' column instead of 'risk_score'.")
    else:
        total_invested = (
            tx_full
            .group_by("client_id")
            .agg([
                pl.col("amount").sum().alias("total_invested"),
                pl.col("amount").count().alias("num_trades"),
            ])
            .sort("total_invested", descending=True)
        )
        print("Warning: No risk_score or risk column found. Only aggregating invested amount and trade count.")

    print("\nTop 5 High-Value Clients:")
    print(total_invested.head(5))
else:
    print("Analysis 1 skipped: tx_full is None.")

# --- 5. Analysis 2: Portfolio Risk-Adjusted Returns ---

if portfolio is not None and clients is not None and "client_id" in portfolio.columns:
    # Join portfolio with clients for demographic analysis
    portfolio_clients = portfolio.join(clients, on="client_id", how="left")
    # Calculate risk-adjusted return (Sharpe ratio proxy: mean return / stddev)
    # Defensive: check for daily_return or daily_return_pct
    if "daily_return" in portfolio_clients.columns:
        mean_col = "daily_return"
    elif "daily_return_pct" in portfolio_clients.columns:
        mean_col = "daily_return_pct"
    else:
        mean_col = None

    if mean_col:
        risk_adj = (
            portfolio_clients
            .group_by("client_id")
            .agg([
                pl.col(mean_col).mean().alias("mean_return"),
                pl.col(mean_col).std().alias("std_return"),
                (pl.col(mean_col).mean() / pl.col(mean_col).std()).alias("risk_adjusted_return")
            ])
            .sort("risk_adjusted_return", descending=True)
        )
        print("\nTop 5 Clients by Risk-Adjusted Return:")
        print(risk_adj.head(5))
    else:
        print("Analysis 2 skipped: No daily return column found in portfolio.")
else:
    print("Analysis 2 skipped: Required columns missing in portfolio or clients.")

# --- 6. Analysis 3: Investment Trends by Asset Type & Region ---

if tx_full is not None and "asset_type" in tx_full.columns and "region" in tx_full.columns:
    trend = (
        tx_full
        .group_by(["asset_type", "region"])
        .agg([
            pl.col("amount").sum().alias("total_invested"),
            pl.col("client_id").n_unique().alias("unique_clients")
        ])
        .sort("total_invested", descending=True)
    )
    print("\nTop Investment Trends (by Asset Type & Region):")
    print(trend.head(10))
else:
    print("Analysis 3 skipped: 'asset_type' or 'region' column missing in tx_full.")

# --- 7. Actionable Insights: Flag Clients with High Risk and Low Diversification ---

if tx_full is not None:
    # Use risk_score or risk if available
    risk_col = "risk_score" if "risk_score" in tx_full.columns else ("risk" if "risk" in tx_full.columns else None)
    if risk_col:
        diversification = (
            tx_full
            .group_by("client_id")
            .agg([
                pl.col("asset_id").n_unique().alias("num_assets"),
                pl.col(risk_col).mean().alias("avg_risk_score")
            ])
            .filter((pl.col("num_assets") < 3) & (pl.col("avg_risk_score") > 7))
        )
        print("\nClients with High Risk and Low Diversification:")
        print(diversification)
    else:
        print("Actionable insights skipped: No risk_score or risk column found in tx_full.")
else:
    print("Actionable insights skipped: tx_full is None.")

# --- 8. Best Practices & Next Steps ---
# - Always review printed diagnostics for missing data or schema mismatches.
# - If you add new datasets or columns, update the parsing and join logic accordingly.
# - For large datasets, consider lazy evaluation (pl.scan_csv, pl.scan_parquet) for performance.
# - Document any business logic assumptions at the top of your notebook for future users.

---

# Conclusion

This workbook has shown how to use Polars for a wide range of data analysis tasks, from loading and exploring data, to advanced analytics and exporting results. All examples use real datasets and are explained in simple terms.

For more, see the other notebooks in this project!
