# Data Querying for Data Scientists

### A Comprehensive Guide of using Pandas, SQL, PySpark, and Polars for Data Manipulation Techniques, with Practical Examples and Visualisations

Working as a Data Scientist or Data Engineer often involves querying data from various sources. There are many tools and libraries available to perform these tasks, each with its own strengths and weaknesses. Also, there are many different ways to achieve similar results, depending on the tool or library used. It's important to be familiar with these different methods to choose the best one for your specific use case.

This article provides a comprehensive guide on how to query data using different tools and libraries, including Pandas, SQL, PySpark, and Polars. Each section will cover the setup, data creation, and various querying techniques such as filtering, grouping, joining, window functions, ranking, and sorting. The output will be identical across all tools, but the transformations will be implemented using the specific syntax and features of each library. Therefore allowing you to compare the different approaches and understand the nuances of each method.

## Setup

Before we start querying data, we need to set up our environment. This includes importing the necessary libraries, creating sample data, and defining constants that will be used throughout the article. The following sections will guide you through this setup process. The code for this article is also available on GitHub: [querying-data](...).

### Imports

In [None]:
# Python StdLib Imports
import sqlite3

# Python Third Party Imports
import numpy as np
import pandas as pd
import polars as pl
from plotly import express as px, graph_objects as go
from plotly.subplots import make_subplots
from pyspark.sql import (
    DataFrame as psDataFrame,
    SparkSession,
    Window,
    functions as F,
    types as T,
)

### Constants

In [None]:
# Set seed for reproducibility
np.random.seed(42)

### Data

In [None]:
# Generate sample data
n_records = 1000

In [None]:
# Create sales fact table
sales_data: dict[str, str] = {
    "date": pd.date_range(start="2023-01-01", periods=n_records, freq="D"),
    "customer_id": np.random.randint(1, 100, n_records),
    "product_id": np.random.randint(1, 50, n_records),
    "category": np.random.choice(["Electronics", "Clothing", "Food", "Books", "Home"], n_records),
    "sales_amount": np.random.uniform(10, 1000, n_records).round(2),
    "quantity": np.random.randint(1, 10, n_records),
}

In [None]:
# Create product dimension table
product_data: dict[str, str] = {
    "product_id": np.arange(1, 51),
    "product_name": [f"Product {i}" for i in range(1, 51)],
    "price": np.random.uniform(10, 500, 50).round(2),
    "category": np.random.choice(["Electronics", "Clothing", "Food", "Books", "Home"], 50),
    "supplier_id": np.random.randint(1, 10, 50),
}

In [None]:
# Create customer dimension table
customer_data: dict[str, str] = {
    "customer_id": np.arange(1, 101),
    "customer_name": [f"Customer {i}" for i in range(1, 101)],
    "city": np.random.choice(["New York", "Los Angeles", "Chicago", "Houston", "Phoenix"], 100),
    "state": np.random.choice(["NY", "CA", "IL", "TX", "AZ"], 100),
    "segment": np.random.choice(["Consumer", "Corporate", "Home Office"], 100),
}

## Pandas

### Create

In [None]:
df_sales_pd = pd.DataFrame(sales_data)
df_product_pd = pd.DataFrame(product_data)
df_customer_pd = pd.DataFrame(customer_data)

In [None]:
print(f"Sales DataFrame: {len(df_sales_pd)}")
display(df_sales_pd.head(10))

Sales DataFrame: 1000


Unnamed: 0,date,customer_id,product_id,category,sales_amount,quantity
0,2023-01-01,52,34,Books,319.23,2
1,2023-01-02,93,45,Food,646.25,6
2,2023-01-03,15,6,Food,203.88,2
3,2023-01-04,72,37,Books,897.99,3
4,2023-01-05,61,33,Books,524.99,4
5,2023-01-06,21,22,Clothing,453.13,5
6,2023-01-07,83,21,Clothing,562.24,9
7,2023-01-08,87,6,Food,174.37,4
8,2023-01-09,75,6,Electronics,617.34,9
9,2023-01-10,75,48,Books,369.47,3


In [None]:
print(f"Product DataFrame: {len(df_product_pd)}")
display(df_product_pd.head(10))

Product DataFrame: 50


Unnamed: 0,product_id,product_name,price,category,supplier_id
0,1,Product 1,293.13,Food,6
1,2,Product 2,434.39,Food,3
2,3,Product 3,151.83,Electronics,7
3,4,Product 4,239.16,Clothing,5
4,5,Product 5,313.5,Clothing,4
5,6,Product 6,211.48,Electronics,5
6,7,Product 7,219.47,Clothing,8
7,8,Product 8,171.84,Books,3
8,9,Product 9,286.47,Electronics,7
9,10,Product 10,426.78,Books,6


In [None]:
print(f"Customer DataFrame: {len(df_customer_pd)}")
display(df_customer_pd.head(10))

Customer DataFrame: 100


Unnamed: 0,customer_id,customer_name,city,state,segment
0,1,Customer 1,Chicago,AZ,Consumer
1,2,Customer 2,New York,IL,Home Office
2,3,Customer 3,Los Angeles,NY,Corporate
3,4,Customer 4,Phoenix,TX,Consumer
4,5,Customer 5,New York,IL,Home Office
5,6,Customer 6,New York,TX,Corporate
6,7,Customer 7,Los Angeles,NY,Home Office
7,8,Customer 8,Phoenix,IL,Consumer
8,9,Customer 9,Houston,TX,Corporate
9,10,Customer 10,Phoenix,AZ,Corporate


### 1. Filtering and Selecting

In [None]:
# Filter sales data for specific category
electronics_sales: pd.DataFrame = df_sales_pd[df_sales_pd["category"] == "Electronics"]
print(f"Number of Electronics Sales: {len(electronics_sales)}")
display(electronics_sales.head())

Number of Electronics Sales: 188


Unnamed: 0,date,customer_id,product_id,category,sales_amount,quantity
8,2023-01-09,75,6,Electronics,617.34,9
16,2023-01-17,88,9,Electronics,735.31,8
31,2023-02-01,42,6,Electronics,979.98,7
40,2023-02-10,51,45,Electronics,973.72,7
42,2023-02-12,64,5,Electronics,303.52,8


In [None]:
# Filter for high value transactions (over $500)
high_value_sales: pd.DataFrame = df_sales_pd[df_sales_pd["sales_amount"] > 500]
print(f"Number of high-value Sales: {len(high_value_sales)}")
display(high_value_sales.head())

Number of high-value Sales: 525


Unnamed: 0,date,customer_id,product_id,category,sales_amount,quantity
1,2023-01-02,93,45,Food,646.25,6
3,2023-01-04,72,37,Books,897.99,3
4,2023-01-05,61,33,Books,524.99,4
6,2023-01-07,83,21,Clothing,562.24,9
8,2023-01-09,75,6,Electronics,617.34,9


In [None]:
# Select specific columns
sales_summary: pd.DataFrame = df_sales_pd[["date", "category", "sales_amount"]]
print(f"Sales Summary DataFrame: {len(sales_summary)}")
display(sales_summary.head())

Sales Summary DataFrame: 1000


Unnamed: 0,date,category,sales_amount
0,2023-01-01,Books,319.23
1,2023-01-02,Food,646.25
2,2023-01-03,Food,203.88
3,2023-01-04,Books,897.99
4,2023-01-05,Books,524.99


### 2. Grouping and Aggregation

In [None]:
# Basic aggregation
sales_stats: pd.DataFrame = df_sales_pd.agg(
    {
        "sales_amount": ["sum", "mean", "min", "max", "count"],
        "quantity": ["sum", "mean", "min", "max"],
    }
)
print(f"Sales Statistics: {len(sales_stats)}")
display(sales_stats)

Sales Statistics: 5


Unnamed: 0,sales_amount,quantity
sum,512657.67,5035.0
mean,512.65767,5.035
min,10.19,1.0
max,999.72,9.0
count,1000.0,


In [None]:
# Group by category and aggregate
category_sales: pd.DataFrame = df_sales_pd.groupby("category").agg(
    {
        "sales_amount": ["sum", "mean", "count"],
        "quantity": "sum",
    }
)
print(f"Category Sales Summary: {len(category_sales)}")
display(category_sales)

Category Sales Summary: 5


Unnamed: 0_level_0,sales_amount,sales_amount,sales_amount,quantity
Unnamed: 0_level_1,sum,mean,count,sum
category,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2
Books,112424.59,504.146143,223,1157
Clothing,112682.26,536.58219,210,992
Electronics,90232.28,479.958936,188,946
Food,85217.49,504.245503,169,847
Home,112101.05,533.814524,210,1093


In [None]:
# Rename columns for clarity
category_sales.columns = [
    "total_sales",
    "average_sales",
    "transaction_count",
    "total_quantity",
]
print(f"Renamed Category Sales Summary: {len(category_sales)}")
display(df_sales_pd.head(10))

Renamed Category Sales Summary: 5


Unnamed: 0,date,customer_id,product_id,category,sales_amount,quantity
0,2023-01-01,52,34,Books,319.23,2
1,2023-01-02,93,45,Food,646.25,6
2,2023-01-03,15,6,Food,203.88,2
3,2023-01-04,72,37,Books,897.99,3
4,2023-01-05,61,33,Books,524.99,4
5,2023-01-06,21,22,Clothing,453.13,5
6,2023-01-07,83,21,Clothing,562.24,9
7,2023-01-08,87,6,Food,174.37,4
8,2023-01-09,75,6,Electronics,617.34,9
9,2023-01-10,75,48,Books,369.47,3


In [None]:
# Plot the results
fig: go.Figure = px.bar(
    category_sales.reset_index(),
    x="category",
    y="total_sales",
    title="Total Sales by Category",
    text="transaction_count",
    labels={"total_sales": "Total Sales ($)", "category": "Product Category"},
)
fig.show()

### 3. Joining

In [None]:
# Join sales with product data
sales_with_product: pd.DataFrame = pd.merge(
    df_sales_pd,
    df_product_pd[["product_id", "product_name", "price"]],
    on="product_id",
    how="left",
)
print(f"Sales with Product Information: {len(sales_with_product)}")
display(sales_with_product.head())

Sales with Product Information: 1000


Unnamed: 0,date,customer_id,product_id,category,sales_amount,quantity,product_name,price
0,2023-01-01,52,34,Books,319.23,2,Product 34,301.18
1,2023-01-02,93,45,Food,646.25,6,Product 45,376.07
2,2023-01-03,15,6,Food,203.88,2,Product 6,211.48
3,2023-01-04,72,37,Books,897.99,3,Product 37,291.93
4,2023-01-05,61,33,Books,524.99,4,Product 33,341.48


In [None]:
# Join with customer information to get a complete view
complete_sales: pd.DataFrame = pd.merge(
    sales_with_product,
    df_customer_pd[["customer_id", "customer_name", "city", "state"]],
    on="customer_id",
    how="left",
)
print(f"Complete Sales Data with Customer Information: {len(complete_sales)}")
display(complete_sales.head())

Complete Sales Data with Customer Information: 1000


Unnamed: 0,date,customer_id,product_id,category,sales_amount,quantity,product_name,price,customer_name,city,state
0,2023-01-01,52,34,Books,319.23,2,Product 34,301.18,Customer 52,Houston,CA
1,2023-01-02,93,45,Food,646.25,6,Product 45,376.07,Customer 93,New York,CA
2,2023-01-03,15,6,Food,203.88,2,Product 6,211.48,Customer 15,Chicago,IL
3,2023-01-04,72,37,Books,897.99,3,Product 37,291.93,Customer 72,New York,CA
4,2023-01-05,61,33,Books,524.99,4,Product 33,341.48,Customer 61,Houston,AZ


In [None]:
# Calculate revenue (price * quantity) and compare with sales amount
complete_sales["calculated_revenue"] = complete_sales["price"] * complete_sales["quantity"]
complete_sales["price_difference"] = complete_sales["sales_amount"] - complete_sales["calculated_revenue"]
print(f"Complete Sales Data with Calculated Revenue and Price Difference: {len(complete_sales)}")
display(complete_sales[["sales_amount", "price", "quantity", "calculated_revenue", "price_difference"]].head())

Complete Sales Data with Calculated Revenue and Price Difference: 1000


Unnamed: 0,sales_amount,price,quantity,calculated_revenue,price_difference
0,319.23,301.18,2,602.36,-283.13
1,646.25,376.07,6,2256.42,-1610.17
2,203.88,211.48,2,422.96,-219.08
3,897.99,291.93,3,875.79,22.2
4,524.99,341.48,4,1365.92,-840.93


### 4. Window Functions

In [None]:
# Time-based window function
df_sales_pd["date"] = pd.to_datetime(df_sales_pd["date"])  # Ensure date type
daily_sales: pd.DataFrame = (
    df_sales_pd.groupby(df_sales_pd["date"].dt.date)["sales_amount"].sum().reset_index().sort_values("date")
)
print(f"Daily Sales Summary: {len(daily_sales)}")
display(daily_sales.head())

Daily Sales Summary: 1000


Unnamed: 0,date,sales_amount
0,2023-01-01,319.23
1,2023-01-02,646.25
2,2023-01-03,203.88
3,2023-01-04,897.99
4,2023-01-05,524.99


In [None]:
# Calculate rolling averages (7-day moving average)
daily_sales["7d_moving_avg"] = daily_sales["sales_amount"].rolling(window=7, min_periods=1).mean()
print(f"Daily Sales with 7-Day Moving Average: {len(daily_sales)}")
display(daily_sales.head())

Daily Sales with 7-Day Moving Average: 1000


Unnamed: 0,date,sales_amount,7d_moving_avg
0,2023-01-01,319.23,319.23
1,2023-01-02,646.25,482.74
2,2023-01-03,203.88,389.786667
3,2023-01-04,897.99,516.8375
4,2023-01-05,524.99,518.468


In [None]:
# Calculate lag and lead
daily_sales["previous_day_sales"] = daily_sales["sales_amount"].shift(1)
daily_sales["next_day_sales"] = daily_sales["sales_amount"].shift(-1)
print(f"Daily Sales with Lag and Lead: {len(daily_sales)}")
display(daily_sales.head())

Daily Sales with Lag and Lead: 1000


Unnamed: 0,date,sales_amount,7d_moving_avg,previous_day_sales,next_day_sales
0,2023-01-01,319.23,319.23,,646.25
1,2023-01-02,646.25,482.74,319.23,203.88
2,2023-01-03,203.88,389.786667,646.25,897.99
3,2023-01-04,897.99,516.8375,203.88,524.99
4,2023-01-05,524.99,518.468,897.99,453.13


In [None]:
# Calculate day-over-day change
daily_sales["day_over_day_change"] = daily_sales["sales_amount"].pct_change() - daily_sales["previous_day_sales"]
daily_sales["pct_change"] = daily_sales["sales_amount"].pct_change() * 100
print(f"Daily Sales with Day-over-Day Change: {len(daily_sales)}")
display(daily_sales.head())

Daily Sales with Day-over-Day Change: 1000


Unnamed: 0,date,sales_amount,7d_moving_avg,previous_day_sales,next_day_sales,day_over_day_change,pct_change
0,2023-01-01,319.23,319.23,,646.25,,
1,2023-01-02,646.25,482.74,319.23,203.88,-318.205598,102.440247
2,2023-01-03,203.88,389.786667,646.25,897.99,-646.934518,-68.451838
3,2023-01-04,897.99,516.8375,203.88,524.99,-200.475497,340.450265
4,2023-01-05,524.99,518.468,897.99,453.13,-898.405372,-41.537211


In [None]:
# Plot time series with rolling average
fig = (
    go.Figure()
    .add_trace(
        go.Scatter(
            x=daily_sales["date"],
            y=daily_sales["sales_amount"],
            mode="lines",
            name="Daily Sales",
        )
    )
    .add_trace(
        go.Scatter(
            x=daily_sales["date"],
            y=daily_sales["7d_moving_avg"],
            mode="lines",
            name="7-Day Moving Average",
            line=dict(width=3),
        ),
    )
    .update_layout(
        title="Daily Sales with 7-Day Moving Average",
        xaxis_title="Date",
        yaxis_title="Sales Amount ($)",
    )
)
fig.show()

### 5. Ranking and Partitioning

In [None]:
# Rank customers by total spending
customer_spending: pd.DataFrame = df_sales_pd.groupby("customer_id")["sales_amount"].sum().reset_index()
customer_spending["rank"] = customer_spending["sales_amount"].rank(method="dense", ascending=False)
customer_spending = customer_spending.sort_values("rank")
print(f"Customer Spending Summary: {len(customer_spending)}")
display(customer_spending.head(10))

Customer Spending Summary: 99


Unnamed: 0,customer_id,sales_amount,rank
89,90,12814.41,1.0
91,92,10499.18,2.0
57,58,9985.15,3.0
32,33,9135.3,4.0
92,93,8387.14,5.0
62,63,8304.06,6.0
38,39,8121.66,7.0
98,99,8026.09,8.0
61,62,7813.42,9.0
16,17,7778.62,10.0


In [None]:
# Add customer details
top_customers: pd.DataFrame = pd.merge(
    customer_spending,
    df_customer_pd[["customer_id", "customer_name", "segment", "city"]],
    on="customer_id",
    how="left",
)
print(f"Top Customers Summary: {len(top_customers)}")
display(top_customers.head(10))

Top Customers Summary: 99


Unnamed: 0,customer_id,sales_amount,rank,customer_name,segment,city
0,90,12814.41,1.0,Customer 90,Consumer,Los Angeles
1,92,10499.18,2.0,Customer 92,Home Office,Houston
2,58,9985.15,3.0,Customer 58,Corporate,Los Angeles
3,33,9135.3,4.0,Customer 33,Consumer,New York
4,93,8387.14,5.0,Customer 93,Home Office,New York
5,63,8304.06,6.0,Customer 63,Home Office,Los Angeles
6,39,8121.66,7.0,Customer 39,Home Office,Chicago
7,99,8026.09,8.0,Customer 99,Consumer,Los Angeles
8,62,7813.42,9.0,Customer 62,Home Office,New York
9,17,7778.62,10.0,Customer 17,Corporate,Chicago


In [None]:
# Rank products by quantity sold
product_popularity: pd.DataFrame = df_sales_pd.groupby("product_id")["quantity"].sum().reset_index()
product_popularity["rank"] = product_popularity["quantity"].rank(method="dense", ascending=False)
product_popularity = product_popularity.sort_values("rank")
print(f"Product Popularity Summary: {len(product_popularity)}")
display(product_popularity.head(10))

Product Popularity Summary: 49


Unnamed: 0,product_id,quantity,rank
39,40,165,1.0
28,29,154,2.0
16,17,143,3.0
32,33,136,4.0
43,44,128,5.0
24,25,124,6.0
29,30,122,7.0
10,11,121,8.0
36,37,120,9.0
25,26,120,9.0


In [None]:
# Add product details
top_products: pd.DataFrame = pd.merge(
    product_popularity,
    df_product_pd[["product_id", "product_name", "category"]],
    on="product_id",
    how="left",
)
print(f"Top Products Summary: {len(top_products)}")
display(top_products.head(10))

Top Products Summary: 49


Unnamed: 0,product_id,quantity,rank,product_name,category
0,40,165,1.0,Product 40,Books
1,29,154,2.0,Product 29,Home
2,17,143,3.0,Product 17,Books
3,33,136,4.0,Product 33,Home
4,44,128,5.0,Product 44,Books
5,25,124,6.0,Product 25,Electronics
6,30,122,7.0,Product 30,Books
7,11,121,8.0,Product 11,Clothing
8,37,120,9.0,Product 37,Clothing
9,26,120,9.0,Product 26,Clothing


## SQL

### Create

In [None]:
# Creates SQLite database and tables
conn: sqlite3.Connection = sqlite3.connect(":memory:")
df_sales_pd.to_sql("sales", conn, index=False, if_exists="replace")
df_product_pd.to_sql("product", conn, index=False, if_exists="replace")
df_customer_pd.to_sql("customer", conn, index=False, if_exists="replace")

100

In [None]:
print("Sales Table:")
display(pd.read_sql("SELECT * FROM sales LIMIT 5", conn))

Sales Table:


Unnamed: 0,date,customer_id,product_id,category,sales_amount,quantity
0,2023-01-01 00:00:00,52,34,Books,319.23,2
1,2023-01-02 00:00:00,93,45,Food,646.25,6
2,2023-01-03 00:00:00,15,6,Food,203.88,2
3,2023-01-04 00:00:00,72,37,Books,897.99,3
4,2023-01-05 00:00:00,61,33,Books,524.99,4


In [None]:
print("Product Table:")
display(pd.read_sql("SELECT * FROM product LIMIT 5", conn))

Product Table:


Unnamed: 0,product_id,product_name,price,category,supplier_id
0,1,Product 1,293.13,Food,6
1,2,Product 2,434.39,Food,3
2,3,Product 3,151.83,Electronics,7
3,4,Product 4,239.16,Clothing,5
4,5,Product 5,313.5,Clothing,4


In [None]:
print("Customer Table:")
display(pd.read_sql("SELECT * FROM customer LIMIT 5", conn))

Customer Table:


Unnamed: 0,customer_id,customer_name,city,state,segment
0,1,Customer 1,Chicago,AZ,Consumer
1,2,Customer 2,New York,IL,Home Office
2,3,Customer 3,Los Angeles,NY,Corporate
3,4,Customer 4,Phoenix,TX,Consumer
4,5,Customer 5,New York,IL,Home Office


### 1. Filtering and Selecting

In [None]:
# Filter sales for a specific category
electronics_sales_sql = """
    SELECT *
    FROM sales
    WHERE category = 'Electronics'
"""
electronics_sales: pd.DataFrame = pd.read_sql(electronics_sales_sql, conn)
print(f"Number of Electronics Sales: {len(electronics_sales)}")
display(pd.read_sql(electronics_sales_sql + "LIMIT 5", conn))

Number of Electronics Sales: 188


Unnamed: 0,date,customer_id,product_id,category,sales_amount,quantity
0,2023-01-09 00:00:00,75,6,Electronics,617.34,9
1,2023-01-17 00:00:00,88,9,Electronics,735.31,8
2,2023-02-01 00:00:00,42,6,Electronics,979.98,7
3,2023-02-10 00:00:00,51,45,Electronics,973.72,7
4,2023-02-12 00:00:00,64,5,Electronics,303.52,8


In [None]:
# Filter for high value transactions (over $500)
high_value_sales_sql = """
    SELECT *
    FROM sales
    WHERE sales_amount > 500
"""
high_value_sales: pd.DataFrame = pd.read_sql(high_value_sales_sql, conn)
print(f"Number of high-value Sales: {len(high_value_sales)}")
display(pd.read_sql(high_value_sales_sql + "LIMIT 5", conn))

Number of high-value Sales: 525


Unnamed: 0,date,customer_id,product_id,category,sales_amount,quantity
0,2023-01-02 00:00:00,93,45,Food,646.25,6
1,2023-01-04 00:00:00,72,37,Books,897.99,3
2,2023-01-05 00:00:00,61,33,Books,524.99,4
3,2023-01-07 00:00:00,83,21,Clothing,562.24,9
4,2023-01-09 00:00:00,75,6,Electronics,617.34,9


In [None]:
# Select specific columns
sales_summary_sql = """
    SELECT date, category, sales_amount
    FROM sales
"""
sales_summary: pd.DataFrame = pd.read_sql(sales_summary_sql, conn)
print(f"Selected columns in Sales: {len(sales_summary)}")
display(pd.read_sql(sales_summary_sql + "LIMIT 5", conn))

Selected columns in Sales: 1000


Unnamed: 0,date,category,sales_amount
0,2023-01-01 00:00:00,Books,319.23
1,2023-01-02 00:00:00,Food,646.25
2,2023-01-03 00:00:00,Food,203.88
3,2023-01-04 00:00:00,Books,897.99
4,2023-01-05 00:00:00,Books,524.99


### 2. Grouping and Aggregation

In [None]:
# Basic aggregation
sales_stats_sql = """
    SELECT
        SUM(sales_amount) AS sales_sum,
        AVG(sales_amount) AS sales_mean,
        MIN(sales_amount) AS sales_min,
        MAX(sales_amount) AS sales_max,
        COUNT(*) AS sales_count,
        SUM(quantity) AS quantity_sum,
        AVG(quantity) AS quantity_mean,
        MIN(quantity) AS quantity_min,
        MAX(quantity) AS quantity_max
    FROM sales
"""
print(f"Sales Statistics: {len(pd.read_sql(sales_stats_sql, conn))}")
display(pd.read_sql(sales_stats_sql, conn))

Sales Statistics: 1


Unnamed: 0,sales_sum,sales_mean,sales_min,sales_max,sales_count,quantity_sum,quantity_mean,quantity_min,quantity_max
0,512657.67,512.65767,10.19,999.72,1000,5035,5.035,1,9


In [None]:
# Group by category and aggregate
category_sales_sql = """
    SELECT
        category,
        SUM(sales_amount) AS total_sales,
        AVG(sales_amount) AS average_sales,
        COUNT(*) AS transaction_count,
        SUM(quantity) AS total_quantity
    FROM sales
    GROUP BY category
"""
print(f"Category Sales Summary: {len(pd.read_sql(category_sales_sql, conn))}")
display(pd.read_sql(category_sales_sql + "LIMIT 5", conn))

Category Sales Summary: 5


Unnamed: 0,category,total_sales,average_sales,transaction_count,total_quantity
0,Books,112424.59,504.146143,223,1157
1,Clothing,112682.26,536.58219,210,992
2,Electronics,90232.28,479.958936,188,946
3,Food,85217.49,504.245503,169,847
4,Home,112101.05,533.814524,210,1093


In [None]:
# Plot the results
fig: go.Figure = px.bar(
    pd.read_sql(category_sales_sql, conn),
    x="category",
    y="total_sales",
    title="Total Sales by Category",
    text="transaction_count",
    labels={"total_sales": "Total Sales ($)", "category": "Product Category"},
)
fig.show()

### 3. Joining

In [None]:
# Join sales with product data
sales_with_product_sql = """
    SELECT s.*, p.product_name, p.price
    FROM sales s
    LEFT JOIN product p ON s.product_id = p.product_id
"""
print(f"Sales with Product Data: {len(pd.read_sql(sales_with_product_sql, conn))}")
display(pd.read_sql(sales_with_product_sql + "LIMIT 5", conn))

Sales with Product Data: 1000


Unnamed: 0,date,customer_id,product_id,category,sales_amount,quantity,product_name,price
0,2023-01-01 00:00:00,52,34,Books,319.23,2,Product 34,301.18
1,2023-01-02 00:00:00,93,45,Food,646.25,6,Product 45,376.07
2,2023-01-03 00:00:00,15,6,Food,203.88,2,Product 6,211.48
3,2023-01-04 00:00:00,72,37,Books,897.99,3,Product 37,291.93
4,2023-01-05 00:00:00,61,33,Books,524.99,4,Product 33,341.48


In [None]:
# Join with customer information to get a complete view
complete_sales_sql = """
    SELECT
        s.*,
        p.product_name,
        p.price,
        c.customer_name,
        c.city,
        c.state
    FROM sales s
    LEFT JOIN product p ON s.product_id = p.product_id
    LEFT JOIN customer c ON s.customer_id = c.customer_id
"""
print(f"Complete Sales Data: {len(pd.read_sql(complete_sales_sql, conn))}")
display(pd.read_sql(complete_sales_sql + "LIMIT 5", conn))

Complete Sales Data: 1000


Unnamed: 0,date,customer_id,product_id,category,sales_amount,quantity,product_name,price,customer_name,city,state
0,2023-01-01 00:00:00,52,34,Books,319.23,2,Product 34,301.18,Customer 52,Houston,CA
1,2023-01-02 00:00:00,93,45,Food,646.25,6,Product 45,376.07,Customer 93,New York,CA
2,2023-01-03 00:00:00,15,6,Food,203.88,2,Product 6,211.48,Customer 15,Chicago,IL
3,2023-01-04 00:00:00,72,37,Books,897.99,3,Product 37,291.93,Customer 72,New York,CA
4,2023-01-05 00:00:00,61,33,Books,524.99,4,Product 33,341.48,Customer 61,Houston,AZ


In [None]:
# Calculate revenue and price difference
revenue_comparison_sql = """
    SELECT
        s.sales_amount,
        p.price,
        s.quantity,
        (p.price * s.quantity) AS calculated_revenue,
        (s.sales_amount - (p.price * s.quantity)) AS price_difference
    FROM sales s
    LEFT JOIN product p ON s.product_id = p.product_id
"""
print(f"Revenue Comparison: {len(pd.read_sql(revenue_comparison_sql, conn))}")
display(pd.read_sql(revenue_comparison_sql + "LIMIT 5", conn))

Revenue Comparison: 1000


Unnamed: 0,sales_amount,price,quantity,calculated_revenue,price_difference
0,319.23,301.18,2,602.36,-283.13
1,646.25,376.07,6,2256.42,-1610.17
2,203.88,211.48,2,422.96,-219.08
3,897.99,291.93,3,875.79,22.2
4,524.99,341.48,4,1365.92,-840.93


### 4. Window Functions

In [None]:
# Time-based window function
daily_sales_sql = """
    SELECT
        date,
        SUM(sales_amount) AS total_sales
    FROM sales
    GROUP BY date
    ORDER BY date
"""
print(f"Daily Sales Data: {len(pd.read_sql(daily_sales_sql, conn))}")
display(pd.read_sql(daily_sales_sql + "LIMIT 5", conn))

Daily Sales Data: 1000


Unnamed: 0,date,total_sales
0,2023-01-01 00:00:00,319.23
1,2023-01-02 00:00:00,646.25
2,2023-01-03 00:00:00,203.88
3,2023-01-04 00:00:00,897.99
4,2023-01-05 00:00:00,524.99


In [None]:
# Window functions for lead and lag
window_sql = """
    SELECT
        date AS sale_date,
        SUM(sales_amount) AS sales_amount,
        AVG(sales_amount) OVER (ORDER BY date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS rolling_7d_avg,
        LAG(SUM(sales_amount)) OVER (ORDER BY date) AS previous_day_sales,
        LEAD(SUM(sales_amount)) OVER (ORDER BY date) AS next_day_sales,
        SUM(sales_amount) - LAG(SUM(sales_amount)) OVER (ORDER BY date) AS day_over_day_change
    FROM sales
    GROUP BY date
    ORDER BY date
"""
window_df: pd.DataFrame = pd.read_sql(window_sql, conn)
print(f"Window Functions: {len(window_df)}")
display(pd.read_sql(window_sql + "LIMIT 5", conn))

Window Functions: 1000


Unnamed: 0,sale_date,sales_amount,rolling_7d_avg,previous_day_sales,next_day_sales,day_over_day_change
0,2023-01-01 00:00:00,319.23,319.23,,646.25,
1,2023-01-02 00:00:00,646.25,482.74,319.23,203.88,327.02
2,2023-01-03 00:00:00,203.88,389.786667,646.25,897.99,-442.37
3,2023-01-04 00:00:00,897.99,516.8375,203.88,524.99,694.11
4,2023-01-05 00:00:00,524.99,518.468,897.99,453.13,-373.0


In [None]:
# Plot time series with rolling average
fig = (
    go.Figure()
    .add_trace(
        go.Scatter(
            x=window_df["sale_date"],
            y=window_df["sales_amount"],
            mode="lines",
            name="Daily Sales",
        )
    )
    .add_trace(
        go.Scatter(
            x=window_df["sale_date"],
            y=window_df["rolling_7d_avg"],
            mode="lines",
            name="7-Day Moving Average",
            line=dict(width=3),
        )
    )
    .update_layout(
        title="Daily Sales with 7-Day Moving Average",
        xaxis_title="Date",
        yaxis_title="Sales Amount ($)",
    )
)
fig.show()

### 5. Ranking and Partitioning

In [None]:
# Rank customers by total spending
customer_spending_sql = """
    SELECT
        c.customer_id,
        c.customer_name,
        c.segment,
        c.city,
        SUM(s.sales_amount) AS total_spending,
        RANK() OVER (ORDER BY SUM(s.sales_amount) DESC) AS rank
    FROM sales s
    JOIN customer c ON s.customer_id = c.customer_id
    GROUP BY c.customer_id, c.customer_name, c.segment, c.city
    ORDER BY rank
"""
print(f"Customer Spending: {len(pd.read_sql(customer_spending_sql, conn))}")
display(pd.read_sql(customer_spending_sql + "LIMIT 10", conn))

Customer Spending: 99


Unnamed: 0,customer_id,customer_name,segment,city,total_spending,rank
0,90,Customer 90,Consumer,Los Angeles,12814.41,1
1,92,Customer 92,Home Office,Houston,10499.18,2
2,58,Customer 58,Corporate,Los Angeles,9985.15,3
3,33,Customer 33,Consumer,New York,9135.3,4
4,93,Customer 93,Home Office,New York,8387.14,5
5,63,Customer 63,Home Office,Los Angeles,8304.06,6
6,39,Customer 39,Home Office,Chicago,8121.66,7
7,99,Customer 99,Consumer,Los Angeles,8026.09,8
8,62,Customer 62,Home Office,New York,7813.42,9
9,17,Customer 17,Corporate,Chicago,7778.62,10


In [None]:
# Rank products by quantity sold
product_popularity_sql = """
    SELECT
        p.product_id,
        p.product_name,
        p.category,
        SUM(s.quantity) AS total_quantity,
        RANK() OVER (ORDER BY SUM(s.quantity) DESC) AS rank
    FROM sales s
    JOIN product p ON s.product_id = p.product_id
    GROUP BY p.product_id, p.product_name, p.category
    ORDER BY rank
"""
print(f"Product Popularity: {len(pd.read_sql(product_popularity_sql, conn))}")
display(pd.read_sql(product_popularity_sql + "LIMIT 10", conn))

Product Popularity: 49


Unnamed: 0,product_id,product_name,category,total_quantity,rank
0,40,Product 40,Books,165,1
1,29,Product 29,Home,154,2
2,17,Product 17,Books,143,3
3,33,Product 33,Home,136,4
4,44,Product 44,Books,128,5
5,25,Product 25,Electronics,124,6
6,30,Product 30,Books,122,7
7,11,Product 11,Clothing,121,8
8,26,Product 26,Clothing,120,9
9,37,Product 37,Clothing,120,9


## PySpark

### Create

In [None]:
spark: SparkSession = SparkSession.builder.appName("SalesAnalysis").getOrCreate()

25/06/09 08:11:48 WARN Utils: Your hostname, DESKTOP-9MBK3JG resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/06/09 08:11:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/09 08:11:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
df_sales_ps: psDataFrame = spark.createDataFrame(df_sales_pd)
df_product_ps: psDataFrame = spark.createDataFrame(df_product_pd)
df_customer_ps: psDataFrame = spark.createDataFrame(df_customer_pd)

In [None]:
print(f"Sales DataFrame: {df_sales_ps.count()}")
df_sales_ps.show(10)

                                                                                

Sales DataFrame: 1000
+----------+-----------+----------+-----------+------------+--------+
|      date|customer_id|product_id|   category|sales_amount|quantity|
+----------+-----------+----------+-----------+------------+--------+
|2023-01-01|         52|        34|      Books|      319.23|       2|
|2023-01-02|         93|        45|       Food|      646.25|       6|
|2023-01-03|         15|         6|       Food|      203.88|       2|
|2023-01-04|         72|        37|      Books|      897.99|       3|
|2023-01-05|         61|        33|      Books|      524.99|       4|
|2023-01-06|         21|        22|   Clothing|      453.13|       5|
|2023-01-07|         83|        21|   Clothing|      562.24|       9|
|2023-01-08|         87|         6|       Food|      174.37|       4|
|2023-01-09|         75|         6|Electronics|      617.34|       9|
|2023-01-10|         75|        48|      Books|      369.47|       3|
+----------+-----------+----------+-----------+------------+--------

In [None]:
print(f"Product DataFrame: {df_product_ps.count()}")
df_product_ps.show(10)

Product DataFrame: 50
+----------+------------+------+-----------+-----------+
|product_id|product_name| price|   category|supplier_id|
+----------+------------+------+-----------+-----------+
|         1|   Product 1|293.13|       Food|          6|
|         2|   Product 2|434.39|       Food|          3|
|         3|   Product 3|151.83|Electronics|          7|
|         4|   Product 4|239.16|   Clothing|          5|
|         5|   Product 5| 313.5|   Clothing|          4|
|         6|   Product 6|211.48|Electronics|          5|
|         7|   Product 7|219.47|   Clothing|          8|
|         8|   Product 8|171.84|      Books|          3|
|         9|   Product 9|286.47|Electronics|          7|
|        10|  Product 10|426.78|      Books|          6|
+----------+------------+------+-----------+-----------+
only showing top 10 rows



In [None]:
print(f"Customer DataFrame: {df_customer_ps.count()}")
df_customer_ps.show(10)

Customer DataFrame: 100
+-----------+-------------+-----------+-----+-----------+
|customer_id|customer_name|       city|state|    segment|
+-----------+-------------+-----------+-----+-----------+
|          1|   Customer 1|    Chicago|   AZ|   Consumer|
|          2|   Customer 2|   New York|   IL|Home Office|
|          3|   Customer 3|Los Angeles|   NY|  Corporate|
|          4|   Customer 4|    Phoenix|   TX|   Consumer|
|          5|   Customer 5|   New York|   IL|Home Office|
|          6|   Customer 6|   New York|   TX|  Corporate|
|          7|   Customer 7|Los Angeles|   NY|Home Office|
|          8|   Customer 8|    Phoenix|   IL|   Consumer|
|          9|   Customer 9|    Houston|   TX|  Corporate|
|         10|  Customer 10|    Phoenix|   AZ|  Corporate|
+-----------+-------------+-----------+-----+-----------+
only showing top 10 rows



### 1. Filtering and Selecting

In [None]:
# Filter sales data for specific category
electronics_sales: psDataFrame = df_sales_ps.filter(df_sales_ps["category"] == "Electronics")
print(f"Number of Electronics Sales: {electronics_sales.count()}")
electronics_sales.show(10)

Number of Electronics Sales: 188
+-------------------+-----------+----------+-----------+------------+--------+
|               date|customer_id|product_id|   category|sales_amount|quantity|
+-------------------+-----------+----------+-----------+------------+--------+
|2023-01-09 00:00:00|         75|         6|Electronics|      617.34|       9|
|2023-01-17 00:00:00|         88|         9|Electronics|      735.31|       8|
|2023-02-01 00:00:00|         42|         6|Electronics|      979.98|       7|
|2023-02-10 00:00:00|         51|        45|Electronics|      973.72|       7|
|2023-02-12 00:00:00|         64|         5|Electronics|      303.52|       8|
|2023-02-24 00:00:00|          9|        26|Electronics|       251.9|       2|
|2023-02-26 00:00:00|         53|        38|Electronics|       96.31|       5|
|2023-03-17 00:00:00|         54|         1|Electronics|      175.46|       3|
|2023-03-27 00:00:00|         95|        30|Electronics|      765.52|       5|
|2023-03-30 00:00:0

In [None]:
# Filter for high value transactions (over $500)
high_value_sales: psDataFrame = df_sales_ps.filter("sales_amount > 500")
print(f"Number of high-value Sales: {high_value_sales.count()}")
high_value_sales.show(10)

Number of high-value Sales: 525
+-------------------+-----------+----------+-----------+------------+--------+
|               date|customer_id|product_id|   category|sales_amount|quantity|
+-------------------+-----------+----------+-----------+------------+--------+
|2023-01-02 00:00:00|         93|        45|       Food|      646.25|       6|
|2023-01-04 00:00:00|         72|        37|      Books|      897.99|       3|
|2023-01-05 00:00:00|         61|        33|      Books|      524.99|       4|
|2023-01-07 00:00:00|         83|        21|   Clothing|      562.24|       9|
|2023-01-09 00:00:00|         75|         6|Electronics|      617.34|       9|
|2023-01-11 00:00:00|         88|         4|   Clothing|      898.21|       8|
|2023-01-13 00:00:00|          3|        11|   Clothing|      511.98|       9|
|2023-01-14 00:00:00|         22|        30|      Books|      602.13|       9|
|2023-01-17 00:00:00|         88|         9|Electronics|      735.31|       8|
|2023-01-18 00:00:00

In [None]:
# Select specific columns
sales_summary: psDataFrame = df_sales_ps.select("date", "category", "sales_amount")
print(f"Sales Summary DataFrame: {sales_summary.count()}")
sales_summary.show(10)

Sales Summary DataFrame: 1000
+-------------------+-----------+------------+
|               date|   category|sales_amount|
+-------------------+-----------+------------+
|2023-01-01 00:00:00|      Books|      319.23|
|2023-01-02 00:00:00|       Food|      646.25|
|2023-01-03 00:00:00|       Food|      203.88|
|2023-01-04 00:00:00|      Books|      897.99|
|2023-01-05 00:00:00|      Books|      524.99|
|2023-01-06 00:00:00|   Clothing|      453.13|
|2023-01-07 00:00:00|   Clothing|      562.24|
|2023-01-08 00:00:00|       Food|      174.37|
|2023-01-09 00:00:00|Electronics|      617.34|
|2023-01-10 00:00:00|      Books|      369.47|
+-------------------+-----------+------------+
only showing top 10 rows



### 2. Grouping and Aggregation

In [None]:
# Basic aggregation
sales_stats: psDataFrame = df_sales_ps.agg(
    F.sum("sales_amount").alias("sales_sum"),
    F.avg("sales_amount").alias("sales_mean"),
    F.expr("MIN(sales_amount) AS sales_min"),
    F.expr("MAX(sales_amount) AS sales_max"),
    F.count("*").alias("sales_count"),
    F.expr("SUM(quantity) AS quantity_sum"),
    F.expr("AVG(quantity) AS quantity_mean"),
    F.min("quantity").alias("quantity_min"),
    F.max("quantity").alias("quantity_max"),
)
print(f"Sales Statistics: {sales_stats.count()}")
sales_stats.show()

Sales Statistics: 1
+-----------------+-----------------+---------+---------+-----------+------------+-------------+------------+------------+
|        sales_sum|       sales_mean|sales_min|sales_max|sales_count|quantity_sum|quantity_mean|quantity_min|quantity_max|
+-----------------+-----------------+---------+---------+-----------+------------+-------------+------------+------------+
|512657.6699999999|512.6576699999999|    10.19|   999.72|       1000|        5035|        5.035|           1|           9|
+-----------------+-----------------+---------+---------+-----------+------------+-------------+------------+------------+



In [None]:
# Group by category and aggregate
category_sales: psDataFrame = df_sales_ps.groupBy("category").agg(
    F.sum("sales_amount").alias("total_sales"),
    F.avg("sales_amount").alias("average_sales"),
    F.count("*").alias("transaction_count"),
    F.sum("quantity").alias("total_quantity"),
)
print(f"Category Sales Summary: {category_sales.count()}")
category_sales.show()

Category Sales Summary: 5
+-----------+------------------+------------------+-----------------+--------------+
|   category|       total_sales|     average_sales|transaction_count|total_quantity|
+-----------+------------------+------------------+-----------------+--------------+
|       Home|         112101.05| 533.8145238095238|              210|          1093|
|       Food| 85217.48999999999|504.24550295857983|              169|           847|
|Electronics|          90232.28| 479.9589361702128|              188|           946|
|   Clothing|112682.25999999998| 536.5821904761904|              210|           992|
|      Books|112424.59000000001| 504.1461434977579|              223|          1157|
+-----------+------------------+------------------+-----------------+--------------+



In [None]:
# Rename columns for clarity
category_sales = category_sales.withColumnsRenamed(
    {
        "total_sales": "Total Sales",
        "average_sales": "Average Sales",
        "transaction_count": "Transaction Count",
        "total_quantity": "Total Quantity",
    }
)
print(f"Renamed Category Sales Summary: {category_sales.count()}")
category_sales.show()

Renamed Category Sales Summary: 5
+-----------+------------------+------------------+-----------------+--------------+
|   category|       Total Sales|     Average Sales|Transaction Count|Total Quantity|
+-----------+------------------+------------------+-----------------+--------------+
|       Home|         112101.05| 533.8145238095238|              210|          1093|
|       Food| 85217.48999999999|504.24550295857983|              169|           847|
|Electronics|          90232.28| 479.9589361702128|              188|           946|
|   Clothing|112682.25999999998| 536.5821904761904|              210|           992|
|      Books|112424.59000000001| 504.1461434977579|              223|          1157|
+-----------+------------------+------------------+-----------------+--------------+



In [None]:
# Convert to pandas for plotting with plotly
category_sales_pd: pd.DataFrame = category_sales.toPandas()
fig: go.Figure = px.bar(
    category_sales_pd,
    x="category",
    y="Total Sales",
    title="Total Sales by Category",
    text="Transaction Count",
    labels={"Total Sales": "Total Sales ($)", "category": "Product Category"},
)
fig.show()

### 3. Joining

In [None]:
# Join sales with product data
sales_with_product: psDataFrame = df_sales_ps.join(
    other=df_product_ps.select("product_id", "product_name", "price"),
    on="product_id",
    how="left",
)
print(f"Sales with Product Information: {sales_with_product.count()}")
sales_with_product.show(10)

Sales with Product Information: 1000
+----------+-------------------+-----------+-----------+------------+--------+------------+------+
|product_id|               date|customer_id|   category|sales_amount|quantity|product_name| price|
+----------+-------------------+-----------+-----------+------------+--------+------------+------+
|        22|2023-01-06 00:00:00|         21|   Clothing|      453.13|       5|  Product 22|154.09|
|        34|2023-01-01 00:00:00|         52|      Books|      319.23|       2|  Product 34|301.18|
|         6|2023-01-03 00:00:00|         15|       Food|      203.88|       2|   Product 6|211.48|
|         6|2023-01-08 00:00:00|         87|       Food|      174.37|       4|   Product 6|211.48|
|         6|2023-01-09 00:00:00|         75|Electronics|      617.34|       9|   Product 6|211.48|
|        33|2023-01-05 00:00:00|         61|      Books|      524.99|       4|  Product 33|341.48|
|        48|2023-01-10 00:00:00|         75|      Books|      369.47|   

In [None]:
# Join with customer information to get a complete view
complete_sales: psDataFrame = sales_with_product.alias("s").join(
    other=df_customer_ps.select("customer_id", "customer_name", "city", "state").alias("c"),
    on="customer_id",
    how="left",
)
print(f"Complete Sales Data with Customer Information: {complete_sales.count()}")
complete_sales.show(10)

Complete Sales Data with Customer Information: 1000
+-----------+----------+-------------------+-----------+------------+--------+------------+------+-------------+--------+-----+
|customer_id|product_id|               date|   category|sales_amount|quantity|product_name| price|customer_name|    city|state|
+-----------+----------+-------------------+-----------+------------+--------+------------+------+-------------+--------+-----+
|         21|        22|2023-01-06 00:00:00|   Clothing|      453.13|       5|  Product 22|154.09|  Customer 21| Phoenix|   NY|
|         52|        34|2023-01-01 00:00:00|      Books|      319.23|       2|  Product 34|301.18|  Customer 52| Houston|   CA|
|         15|         6|2023-01-03 00:00:00|       Food|      203.88|       2|   Product 6|211.48|  Customer 15| Chicago|   IL|
|         87|         6|2023-01-08 00:00:00|       Food|      174.37|       4|   Product 6|211.48|  Customer 87| Houston|   AZ|
|         75|         6|2023-01-09 00:00:00|Electron

In [None]:
# Calculate revenue (price * quantity) and compare with sales amount
complete_sales = complete_sales.withColumns(
    {
        "calculated_revenue": complete_sales["price"] * complete_sales["quantity"],
        "price_difference": F.expr("sales_amount - (price * quantity)"),
    },
)
print(f"Complete Sales Data with Calculated Revenue and Price Difference: {complete_sales.count()}")
complete_sales.select("sales_amount", "price", "quantity", "calculated_revenue", "price_difference").show(10)

                                                                                

Complete Sales Data with Calculated Revenue and Price Difference: 1000
+------------+------+--------+------------------+-------------------+
|sales_amount| price|quantity|calculated_revenue|   price_difference|
+------------+------+--------+------------------+-------------------+
|      453.13|154.09|       5|            770.45|-317.32000000000005|
|      319.23|301.18|       2|            602.36|            -283.13|
|      203.88|211.48|       2|            422.96|-219.07999999999998|
|      174.37|211.48|       4|            845.92|            -671.55|
|      617.34|211.48|       9|           1903.32|           -1285.98|
|      524.99|341.48|       4|           1365.92| -840.9300000000001|
|      369.47| 74.18|       3|222.54000000000002|             146.93|
|      897.99|291.93|       3|            875.79| 22.200000000000045|
|      898.21|239.16|       8|           1913.28|-1015.0699999999999|
|      562.24|496.06|       9|           4464.54|            -3902.3|
+------------+-----

### 4. Window Functions

In [None]:
# Convert date column to date type if not already
df_sales_ps = df_sales_ps.withColumn("date", F.to_date(df_sales_ps["date"]))

In [None]:
# Time-based window function
daily_sales: psDataFrame = (
    df_sales_ps.groupBy("date")
    .agg(
        F.sum("sales_amount").alias("total_sales"),
    )
    .orderBy("date")
)
print(f"Daily Sales Summary: {daily_sales.count()}")
daily_sales.show(10)

Daily Sales Summary: 1000
+----------+-----------+
|      date|total_sales|
+----------+-----------+
|2023-01-01|     319.23|
|2023-01-02|     646.25|
|2023-01-03|     203.88|
|2023-01-04|     897.99|
|2023-01-05|     524.99|
|2023-01-06|     453.13|
|2023-01-07|     562.24|
|2023-01-08|     174.37|
|2023-01-09|     617.34|
|2023-01-10|     369.47|
+----------+-----------+
only showing top 10 rows



In [None]:
# Define a window specification for lead/lag functions
window_spec = Window.orderBy("date")

# Calculate lead and lag
daily_sales = daily_sales.withColumns(
    {
        "previous_day_sales": F.lag("total_sales").over(window_spec),
        "next_day_sales": F.expr("LEAD(total_sales) OVER (ORDER BY date)"),
    },
)
print(f"Daily Sales with Lead and Lag: {daily_sales.count()}")
daily_sales.show(10)

Daily Sales with Lead and Lag: 1000


25/06/09 08:32:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:32:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:32:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:32:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:32:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:32:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 0

+----------+-----------+------------------+--------------+
|      date|total_sales|previous_day_sales|next_day_sales|
+----------+-----------+------------------+--------------+
|2023-01-01|     319.23|              NULL|        646.25|
|2023-01-02|     646.25|            319.23|        203.88|
|2023-01-03|     203.88|            646.25|        897.99|
|2023-01-04|     897.99|            203.88|        524.99|
|2023-01-05|     524.99|            897.99|        453.13|
|2023-01-06|     453.13|            524.99|        562.24|
|2023-01-07|     562.24|            453.13|        174.37|
|2023-01-08|     174.37|            562.24|        617.34|
|2023-01-09|     617.34|            174.37|        369.47|
|2023-01-10|     369.47|            617.34|        898.21|
+----------+-----------+------------------+--------------+
only showing top 10 rows



In [None]:
# Calculate day-over-day change
daily_sales = daily_sales.withColumns(
    {
        "day_over_day_change": F.expr("total_sales - previous_day_sales"),
        "pct_change": (F.expr("total_sales / previous_day_sales - 1") * 100).alias("pct_change"),
    }
)
print(f"Daily Sales with Day-over-Day Change: {daily_sales.count()}")
daily_sales.show(10)

Daily Sales with Day-over-Day Change: 1000


25/06/09 08:33:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:33:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:33:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:33:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:33:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:33:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 0

+----------+-----------+------------------+--------------+-------------------+-------------------+
|      date|total_sales|previous_day_sales|next_day_sales|day_over_day_change|         pct_change|
+----------+-----------+------------------+--------------+-------------------+-------------------+
|2023-01-01|     319.23|              NULL|        646.25|               NULL|               NULL|
|2023-01-02|     646.25|            319.23|        203.88|             327.02| 102.44024684396828|
|2023-01-03|     203.88|            646.25|        897.99|            -442.37| -68.45183752417795|
|2023-01-04|     897.99|            203.88|        524.99|             694.11|  340.4502648616834|
|2023-01-05|     524.99|            897.99|        453.13|             -373.0| -41.53721088208109|
|2023-01-06|     453.13|            524.99|        562.24| -71.86000000000001|-13.687879769138467|
|2023-01-07|     562.24|            453.13|        174.37| 109.11000000000001| 24.079182574537118|
|2023-01-0

In [None]:
# Calculate 7-day moving average
daily_sales = daily_sales.withColumns(
    {
        "7d_moving_avg": F.avg("total_sales").over(Window.orderBy("date").rowsBetween(-6, 0)),
        "7d_rolling_avg": F.expr("AVG(total_sales) OVER (ORDER BY date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW)"),
    }
)
print(f"Daily Sales with 7-Day Moving Average: {daily_sales.count()}")
daily_sales.show(10)

Daily Sales with 7-Day Moving Average: 1000


25/06/09 08:38:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:38:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:38:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:38:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:38:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+-----------+------------------+--------------+-------------------+-------------------+------------------+------------------+
|      date|total_sales|previous_day_sales|next_day_sales|day_over_day_change|         pct_change|     7d_moving_avg|    7d_rolling_avg|
+----------+-----------+------------------+--------------+-------------------+-------------------+------------------+------------------+
|2023-01-01|     319.23|              NULL|        646.25|               NULL|               NULL|            319.23|            319.23|
|2023-01-02|     646.25|            319.23|        203.88|             327.02| 102.44024684396828|            482.74|            482.74|
|2023-01-03|     203.88|            646.25|        897.99|            -442.37| -68.45183752417795| 389.7866666666667| 389.7866666666667|
|2023-01-04|     897.99|            203.88|        524.99|             694.11|  340.4502648616834| 516.8375000000001| 516.8375000000001|
|2023-01-05|     524.99|            897.9

25/06/09 08:38:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:38:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:38:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:38:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [None]:
# Plot time series with rolling average
fig = (
    go.Figure()
    .add_trace(
        go.Scatter(
            x=daily_sales.toPandas()["date"],
            y=daily_sales.toPandas()["total_sales"],
            mode="lines",
            name="Daily Sales",
        )
    )
    .add_trace(
        go.Scatter(
            x=daily_sales.toPandas()["date"],
            y=daily_sales.toPandas()["7d_moving_avg"],
            mode="lines",
            name="7-Day Moving Average",
            line=dict(width=3),
        ),
    )
    .update_layout(
        title="Daily Sales with 7-Day Moving Average",
        xaxis_title="Date",
        yaxis_title="Sales Amount ($)",
    )
)
fig.show()

### 5. Ranking and Partitioning

In [None]:
# Rank customers by total spending
customer_spending: psDataFrame = (
    df_sales_ps.groupBy("customer_id")
    .agg(F.sum("sales_amount").alias("total_spending"))
    .withColumn("rank", F.dense_rank().over(Window.orderBy(F.desc("total_spending"))))
    .orderBy("rank")
)
print(f"Customer Spending Summary: {customer_spending.count()}")
customer_spending.show(10)

Customer Spending Summary: 99


25/06/09 08:41:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:41:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:41:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:41:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:41:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:41:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 0

+-----------+------------------+----+
|customer_id|    total_spending|rank|
+-----------+------------------+----+
|         90|12814.410000000002|   1|
|         92|          10499.18|   2|
|         58|           9985.15|   3|
|         33| 9135.300000000001|   4|
|         93|           8387.14|   5|
|         63| 8304.060000000001|   6|
|         39|           8121.66|   7|
|         99|           8026.09|   8|
|         62| 7813.420000000001|   9|
|         17|           7778.62|  10|
+-----------+------------------+----+
only showing top 10 rows



In [None]:
# Rank products by quantity sold
product_popularity: psDataFrame = (
    df_sales_ps.groupBy("product_id")
    .agg(F.sum("quantity").alias("total_quantity"))
    .withColumn("rank", F.expr("DENSE_RANK() OVER (ORDER BY total_quantity DESC)"))
    .orderBy("rank")
)
print(f"Product Popularity Summary: {product_popularity.count()}")
product_popularity.show(10)

Product Popularity Summary: 49


25/06/09 08:42:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:42:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:42:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:42:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:42:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 08:42:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/09 0

+----------+--------------+----+
|product_id|total_quantity|rank|
+----------+--------------+----+
|        40|           165|   1|
|        29|           154|   2|
|        17|           143|   3|
|        33|           136|   4|
|        44|           128|   5|
|        25|           124|   6|
|        30|           122|   7|
|        11|           121|   8|
|        26|           120|   9|
|        37|           120|   9|
+----------+--------------+----+
only showing top 10 rows



## Polars

### Create

In [None]:
df_sales_pl: pl.DataFrame = pl.DataFrame(sales_data)
df_product_pl: pl.DataFrame = pl.DataFrame(product_data)
df_customer_pl: pl.DataFrame = pl.DataFrame(customer_data)

In [None]:
print(f"Sales DataFrame: {df_sales_pl.shape[0]}")
display(df_sales_pl.head(10))

Sales DataFrame: 1000


date,customer_id,product_id,category,sales_amount,quantity
datetime[ns],i64,i64,str,f64,i64
2023-01-01 00:00:00,52,34,"""Books""",319.23,2
2023-01-02 00:00:00,93,45,"""Food""",646.25,6
2023-01-03 00:00:00,15,6,"""Food""",203.88,2
2023-01-04 00:00:00,72,37,"""Books""",897.99,3
2023-01-05 00:00:00,61,33,"""Books""",524.99,4
2023-01-06 00:00:00,21,22,"""Clothing""",453.13,5
2023-01-07 00:00:00,83,21,"""Clothing""",562.24,9
2023-01-08 00:00:00,87,6,"""Food""",174.37,4
2023-01-09 00:00:00,75,6,"""Electronics""",617.34,9
2023-01-10 00:00:00,75,48,"""Books""",369.47,3


In [None]:
print(f"Product DataFrame: {df_product_pl.shape[0]}")
display(df_product_pl.head(10))

Product DataFrame: 50


product_id,product_name,price,category,supplier_id
i64,str,f64,str,i64
1,"""Product 1""",293.13,"""Food""",6
2,"""Product 2""",434.39,"""Food""",3
3,"""Product 3""",151.83,"""Electronics""",7
4,"""Product 4""",239.16,"""Clothing""",5
5,"""Product 5""",313.5,"""Clothing""",4
6,"""Product 6""",211.48,"""Electronics""",5
7,"""Product 7""",219.47,"""Clothing""",8
8,"""Product 8""",171.84,"""Books""",3
9,"""Product 9""",286.47,"""Electronics""",7
10,"""Product 10""",426.78,"""Books""",6


In [None]:
print(f"Customer DataFrame: {df_customer_pl.shape[0]}")
display(df_customer_pl.head(10))

Customer DataFrame: 100


customer_id,customer_name,city,state,segment
i64,str,str,str,str
1,"""Customer 1""","""Chicago""","""AZ""","""Consumer"""
2,"""Customer 2""","""New York""","""IL""","""Home Office"""
3,"""Customer 3""","""Los Angeles""","""NY""","""Corporate"""
4,"""Customer 4""","""Phoenix""","""TX""","""Consumer"""
5,"""Customer 5""","""New York""","""IL""","""Home Office"""
6,"""Customer 6""","""New York""","""TX""","""Corporate"""
7,"""Customer 7""","""Los Angeles""","""NY""","""Home Office"""
8,"""Customer 8""","""Phoenix""","""IL""","""Consumer"""
9,"""Customer 9""","""Houston""","""TX""","""Corporate"""
10,"""Customer 10""","""Phoenix""","""AZ""","""Corporate"""


### 1. Filtering and Selecting

In [None]:
# Filter sales data for specific category
electronics_sales = df_sales_pl.filter(df_sales_pl["category"] == "Electronics")
print(f"Number of Electronics Sales: {len(electronics_sales)}")
display(electronics_sales.head(10))

Number of Electronics Sales: 188


date,customer_id,product_id,category,sales_amount,quantity
datetime[ns],i64,i64,str,f64,i64
2023-01-09 00:00:00,75,6,"""Electronics""",617.34,9
2023-01-17 00:00:00,88,9,"""Electronics""",735.31,8
2023-02-01 00:00:00,42,6,"""Electronics""",979.98,7
2023-02-10 00:00:00,51,45,"""Electronics""",973.72,7
2023-02-12 00:00:00,64,5,"""Electronics""",303.52,8
2023-02-24 00:00:00,9,26,"""Electronics""",251.9,2
2023-02-26 00:00:00,53,38,"""Electronics""",96.31,5
2023-03-17 00:00:00,54,1,"""Electronics""",175.46,3
2023-03-27 00:00:00,95,30,"""Electronics""",765.52,5
2023-03-30 00:00:00,72,5,"""Electronics""",904.57,3


In [None]:
# Filter for high value transactions (over $500)
high_value_sales = df_sales_pl.filter(df_sales_pl["sales_amount"] > 500)
print(f"Number of high-value Sales: {len(high_value_sales)}")
display(high_value_sales.head(10))

Number of high-value Sales: 525


date,customer_id,product_id,category,sales_amount,quantity
datetime[ns],i64,i64,str,f64,i64
2023-01-02 00:00:00,93,45,"""Food""",646.25,6
2023-01-04 00:00:00,72,37,"""Books""",897.99,3
2023-01-05 00:00:00,61,33,"""Books""",524.99,4
2023-01-07 00:00:00,83,21,"""Clothing""",562.24,9
2023-01-09 00:00:00,75,6,"""Electronics""",617.34,9
2023-01-11 00:00:00,88,4,"""Clothing""",898.21,8
2023-01-13 00:00:00,3,11,"""Clothing""",511.98,9
2023-01-14 00:00:00,22,30,"""Books""",602.13,9
2023-01-17 00:00:00,88,9,"""Electronics""",735.31,8
2023-01-18 00:00:00,30,3,"""Home""",822.17,7


In [None]:
# Select specific columns
sales_summary = df_sales_pl.select(["date", "category", "sales_amount"])
print(f"Sales Summary DataFrame: {len(sales_summary)}")
display(sales_summary.head(10))

Sales Summary DataFrame: 1000


date,category,sales_amount
datetime[ns],str,f64
2023-01-01 00:00:00,"""Books""",319.23
2023-01-02 00:00:00,"""Food""",646.25
2023-01-03 00:00:00,"""Food""",203.88
2023-01-04 00:00:00,"""Books""",897.99
2023-01-05 00:00:00,"""Books""",524.99
2023-01-06 00:00:00,"""Clothing""",453.13
2023-01-07 00:00:00,"""Clothing""",562.24
2023-01-08 00:00:00,"""Food""",174.37
2023-01-09 00:00:00,"""Electronics""",617.34
2023-01-10 00:00:00,"""Books""",369.47


### 2. Grouping and Aggregation

In [None]:
# Basic aggregation
sales_stats = df_sales_pl.select(
    pl.col("sales_amount").sum().alias("sales_sum"),
    pl.col("sales_amount").mean().alias("sales_mean"),
    pl.col("sales_amount").min().alias("sales_min"),
    pl.col("sales_amount").max().alias("sales_max"),
    pl.col("quantity").sum().alias("quantity_sum"),
    pl.col("quantity").mean().alias("quantity_mean"),
    pl.col("quantity").min().alias("quantity_min"),
    pl.col("quantity").max().alias("quantity_max"),
)
print(f"Sales Statistics: {len(sales_stats)}")
display(sales_stats)

Sales Statistics: 1


sales_sum,sales_mean,sales_min,sales_max,quantity_sum,quantity_mean,quantity_min,quantity_max
f64,f64,f64,f64,i64,f64,i64,i64
512657.67,512.65767,10.19,999.72,5035,5.035,1,9


In [None]:
# Group by category and aggregate
category_sales = df_sales_pl.group_by("category").agg(
    pl.col("sales_amount").sum().alias("total_sales"),
    pl.col("sales_amount").mean().alias("average_sales"),
    pl.col("sales_amount").count().alias("transaction_count"),
    pl.col("quantity").sum().alias("total_quantity"),
)
print(f"Category Sales Summary: {len(category_sales)}")
display(category_sales.head(10))

Category Sales Summary: 5


category,total_sales,average_sales,transaction_count,total_quantity
str,f64,f64,u32,i64
"""Clothing""",112682.26,536.58219,210,992
"""Home""",112101.05,533.814524,210,1093
"""Books""",112424.59,504.146143,223,1157
"""Food""",85217.49,504.245503,169,847
"""Electronics""",90232.28,479.958936,188,946


In [None]:
# Rename columns for clarity
category_sales = category_sales.rename(
    {
        "total_sales": "Total Sales",
        "average_sales": "Average Sales",
        "transaction_count": "Transaction Count",
        "total_quantity": "Total Quantity",
    }
)
print(f"Renamed Category Sales Summary: {len(category_sales)}")
display(category_sales.head(10))

Renamed Category Sales Summary: 5


category,Total Sales,Average Sales,Transaction Count,Total Quantity
str,f64,f64,u32,i64
"""Clothing""",112682.26,536.58219,210,992
"""Home""",112101.05,533.814524,210,1093
"""Books""",112424.59,504.146143,223,1157
"""Food""",85217.49,504.245503,169,847
"""Electronics""",90232.28,479.958936,188,946


In [None]:
# Plot the results
fig: go.Figure = px.bar(
    category_sales,
    x="category",
    y="Total Sales",
    title="Total Sales by Category",
    text="Transaction Count",
    labels={"Total Sales": "Total Sales ($)", "category": "Product Category"},
)
fig.show()

### 3. Joining

In [None]:
# Join sales with product data
sales_with_product = df_sales_pl.join(
    df_product_pl.select(["product_id", "product_name", "price"]),
    on="product_id",
    how="left",
)
print(f"Sales with Product Information: {len(sales_with_product)}")
display(sales_with_product.head(10))

Sales with Product Information: 1000


date,customer_id,product_id,category,sales_amount,quantity,product_name,price
datetime[ns],i64,i64,str,f64,i64,str,f64
2023-01-01 00:00:00,52,34,"""Books""",319.23,2,"""Product 34""",301.18
2023-01-02 00:00:00,93,45,"""Food""",646.25,6,"""Product 45""",376.07
2023-01-03 00:00:00,15,6,"""Food""",203.88,2,"""Product 6""",211.48
2023-01-04 00:00:00,72,37,"""Books""",897.99,3,"""Product 37""",291.93
2023-01-05 00:00:00,61,33,"""Books""",524.99,4,"""Product 33""",341.48
2023-01-06 00:00:00,21,22,"""Clothing""",453.13,5,"""Product 22""",154.09
2023-01-07 00:00:00,83,21,"""Clothing""",562.24,9,"""Product 21""",496.06
2023-01-08 00:00:00,87,6,"""Food""",174.37,4,"""Product 6""",211.48
2023-01-09 00:00:00,75,6,"""Electronics""",617.34,9,"""Product 6""",211.48
2023-01-10 00:00:00,75,48,"""Books""",369.47,3,"""Product 48""",74.18


In [None]:
# Join with customer information to get a complete view
complete_sales = sales_with_product.join(
    df_customer_pl.select(["customer_id", "customer_name", "city", "state"]),
    on="customer_id",
    how="left",
)
print(f"Complete Sales Data with Customer Information: {len(complete_sales)}")
display(complete_sales.head(10))

Complete Sales Data with Customer Information: 1000


date,customer_id,product_id,category,sales_amount,quantity,product_name,price,customer_name,city,state
datetime[ns],i64,i64,str,f64,i64,str,f64,str,str,str
2023-01-01 00:00:00,52,34,"""Books""",319.23,2,"""Product 34""",301.18,"""Customer 52""","""Houston""","""CA"""
2023-01-02 00:00:00,93,45,"""Food""",646.25,6,"""Product 45""",376.07,"""Customer 93""","""New York""","""CA"""
2023-01-03 00:00:00,15,6,"""Food""",203.88,2,"""Product 6""",211.48,"""Customer 15""","""Chicago""","""IL"""
2023-01-04 00:00:00,72,37,"""Books""",897.99,3,"""Product 37""",291.93,"""Customer 72""","""New York""","""CA"""
2023-01-05 00:00:00,61,33,"""Books""",524.99,4,"""Product 33""",341.48,"""Customer 61""","""Houston""","""AZ"""
2023-01-06 00:00:00,21,22,"""Clothing""",453.13,5,"""Product 22""",154.09,"""Customer 21""","""Phoenix""","""NY"""
2023-01-07 00:00:00,83,21,"""Clothing""",562.24,9,"""Product 21""",496.06,"""Customer 83""","""New York""","""CA"""
2023-01-08 00:00:00,87,6,"""Food""",174.37,4,"""Product 6""",211.48,"""Customer 87""","""Houston""","""AZ"""
2023-01-09 00:00:00,75,6,"""Electronics""",617.34,9,"""Product 6""",211.48,"""Customer 75""","""Houston""","""TX"""
2023-01-10 00:00:00,75,48,"""Books""",369.47,3,"""Product 48""",74.18,"""Customer 75""","""Houston""","""TX"""


In [None]:
# Calculate revenue (price * quantity) and compare with sales amount
complete_sales = complete_sales.with_columns(
    (pl.col("price") * pl.col("quantity")).alias("calculated_revenue"),
    (pl.col("sales_amount") - (pl.col("price") * pl.col("quantity"))).alias("price_difference"),
)
print(f"Complete Sales Data with Calculated Revenue and Price Difference: {len(complete_sales)}")
display(complete_sales.select(["sales_amount", "price", "quantity", "calculated_revenue", "price_difference"]).head(10))

Complete Sales Data with Calculated Revenue and Price Difference: 1000


sales_amount,price,quantity,calculated_revenue,price_difference
f64,f64,i64,f64,f64
319.23,301.18,2,602.36,-283.13
646.25,376.07,6,2256.42,-1610.17
203.88,211.48,2,422.96,-219.08
897.99,291.93,3,875.79,22.2
524.99,341.48,4,1365.92,-840.93
453.13,154.09,5,770.45,-317.32
562.24,496.06,9,4464.54,-3902.3
174.37,211.48,4,845.92,-671.55
617.34,211.48,9,1903.32,-1285.98
369.47,74.18,3,222.54,146.93


### 4. Window Functions

In [None]:
# Convert date column to date type if not already
df_sales_pl = df_sales_pl.with_columns(pl.col("date").cast(pl.Date))

In [None]:
daily_sales = (
    df_sales_pl.group_by("date")
    .agg(
        pl.col("sales_amount").sum().alias("total_sales"),
    )
    .sort("date")
)
print(f"Daily Sales Summary: {len(daily_sales)}")
display(daily_sales.head(10))

Daily Sales Summary: 1000


date,total_sales
date,f64
2023-01-01,319.23
2023-01-02,646.25
2023-01-03,203.88
2023-01-04,897.99
2023-01-05,524.99
2023-01-06,453.13
2023-01-07,562.24
2023-01-08,174.37
2023-01-09,617.34
2023-01-10,369.47


In [None]:
# Calculate lead and lag
daily_sales = daily_sales.with_columns(
    pl.col("total_sales").shift(1).alias("previous_day_sales"),
    pl.col("total_sales").shift(-1).alias("next_day_sales"),
)
print(f"Daily Sales with Lead and Lag: {len(daily_sales)}")
display(daily_sales.head(10))

Daily Sales with Lead and Lag: 1000


date,total_sales,previous_day_sales,next_day_sales
date,f64,f64,f64
2023-01-01,319.23,,646.25
2023-01-02,646.25,319.23,203.88
2023-01-03,203.88,646.25,897.99
2023-01-04,897.99,203.88,524.99
2023-01-05,524.99,897.99,453.13
2023-01-06,453.13,524.99,562.24
2023-01-07,562.24,453.13,174.37
2023-01-08,174.37,562.24,617.34
2023-01-09,617.34,174.37,369.47
2023-01-10,369.47,617.34,898.21


In [None]:
# Calculate day-over-day change
daily_sales = daily_sales.with_columns(
    (pl.col("total_sales") - pl.col("previous_day_sales")).alias("day_over_day_change"),
    (pl.col("total_sales") / pl.col("previous_day_sales") - 1).alias("pct_change") * 100,
)
print(f"Daily Sales with Day-over-Day Change: {len(daily_sales)}")
display(daily_sales.head(10))

Daily Sales with Day-over-Day Change: 1000


date,total_sales,previous_day_sales,next_day_sales,day_over_day_change,pct_change
date,f64,f64,f64,f64,f64
2023-01-01,319.23,,646.25,,
2023-01-02,646.25,319.23,203.88,327.02,102.440247
2023-01-03,203.88,646.25,897.99,-442.37,-68.451838
2023-01-04,897.99,203.88,524.99,694.11,340.450265
2023-01-05,524.99,897.99,453.13,-373.0,-41.537211
2023-01-06,453.13,524.99,562.24,-71.86,-13.68788
2023-01-07,562.24,453.13,174.37,109.11,24.079183
2023-01-08,174.37,562.24,617.34,-387.87,-68.986554
2023-01-09,617.34,174.37,369.47,442.97,254.040259
2023-01-10,369.47,617.34,898.21,-247.87,-40.151294


In [None]:
# Calculate 7-day moving average
daily_sales = daily_sales.with_columns(
    pl.col("total_sales").rolling_mean(window_size=7, min_periods=1).alias("7d_moving_avg"),
)
print(f"Daily Sales with 7-Day Moving Average: {len(daily_sales)}")
display(daily_sales.head(10))

Daily Sales with 7-Day Moving Average: 1000



the argument `min_periods` for `Expr.rolling_mean` is deprecated. It was renamed to `min_samples` in version 1.21.0.



date,total_sales,previous_day_sales,next_day_sales,day_over_day_change,pct_change,7d_moving_avg
date,f64,f64,f64,f64,f64,f64
2023-01-01,319.23,,646.25,,,319.23
2023-01-02,646.25,319.23,203.88,327.02,102.440247,482.74
2023-01-03,203.88,646.25,897.99,-442.37,-68.451838,389.786667
2023-01-04,897.99,203.88,524.99,694.11,340.450265,516.8375
2023-01-05,524.99,897.99,453.13,-373.0,-41.537211,518.468
2023-01-06,453.13,524.99,562.24,-71.86,-13.68788,507.578333
2023-01-07,562.24,453.13,174.37,109.11,24.079183,515.387143
2023-01-08,174.37,562.24,617.34,-387.87,-68.986554,494.692857
2023-01-09,617.34,174.37,369.47,442.97,254.040259,490.562857
2023-01-10,369.47,617.34,898.21,-247.87,-40.151294,514.218571


In [None]:
# Plot time series with rolling average
fig = (
    go.Figure()
    .add_trace(
        go.Scatter(
            x=daily_sales["date"].to_list(),
            y=daily_sales["total_sales"].to_list(),
            mode="lines",
            name="Daily Sales",
        )
    )
    .add_trace(
        go.Scatter(
            x=daily_sales["date"].to_list(),
            y=daily_sales["7d_moving_avg"].to_list(),
            mode="lines",
            name="7-Day Moving Average",
            line=dict(width=3),
        )
    )
    .update_layout(
        title="Daily Sales with 7-Day Moving Average",
        xaxis_title="Date",
        yaxis_title="Sales Amount ($)",
    )
)
fig.show()

### 5. Ranking and Partitioning

In [None]:
# Rank customers by total spending
customer_spending = (
    df_sales_pl.group_by("customer_id")
    .agg(pl.col("sales_amount").sum().alias("total_spending"))
    .with_columns(pl.col("total_spending").rank(method="dense", descending=True).alias("rank"))
    .sort("rank")
)
print(f"Customer Spending Summary: {len(customer_spending)}")
display(customer_spending.head(10))

Customer Spending Summary: 99


customer_id,total_spending,rank
i64,f64,u32
90,12814.41,1
92,10499.18,2
58,9985.15,3
33,9135.3,4
93,8387.14,5
63,8304.06,6
39,8121.66,7
99,8026.09,8
62,7813.42,9
17,7778.62,10


In [None]:
# Rank products by quantity sold
product_popularity = (
    df_sales_pl.group_by("product_id")
    .agg(pl.col("quantity").sum().alias("total_quantity"))
    .with_columns(pl.col("total_quantity").rank(method="dense", descending=True).alias("rank"))
    .sort("rank")
)
print(f"Product Popularity Summary: {len(product_popularity)}")
display(product_popularity.head(10))

Product Popularity Summary: 49


product_id,total_quantity,rank
i64,i64,u32
40,165,1
29,154,2
17,143,3
33,136,4
44,128,5
25,124,6
30,122,7
11,121,8
37,120,9
26,120,9


## Conclusion