<!---
  Licensed to the Apache Software Foundation (ASF) under one
  or more contributor license agreements.  See the NOTICE file
  distributed with this work for additional information
  regarding copyright ownership.  The ASF licenses this file
  to you under the Apache License, Version 2.0 (the
  "License"); you may not use this file except in compliance
  with the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing,
  software distributed under the License is distributed on an
  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  KIND, either express or implied.  See the License for the
  specific language governing permissions and limitations
  under the License.
-->

# Distributed Queries with Ballista

This notebook demonstrates distributed query execution features in Ballista.

## Overview

Ballista is a distributed query engine that can execute queries across multiple
nodes. When you submit a query, Ballista:

1. Parses and optimizes the query
2. Creates a distributed execution plan
3. Distributes work across executors
4. Collects and returns results

This enables processing of datasets much larger than a single machine's memory.

In [None]:
from ballista import BallistaSessionContext, setup_test_cluster
from datafusion import col, lit
from datafusion import functions as f

# Set up test cluster and connect
host, port = setup_test_cluster()
ctx = BallistaSessionContext(f"df://{host}:{port}")

# Register sample data
ctx.register_parquet("test_data", "../testdata/test.parquet")

print(f"Connected! Session ID: {ctx.session_id}")

## Execution Plans

Understanding execution plans is key to optimizing distributed queries.

In [None]:
# Create a query with multiple stages
df = ctx.sql("""
    SELECT 
        bool_col,
        COUNT(*) as cnt,
        SUM(id) as sum_id,
        AVG(tinyint_col) as avg_tiny
    FROM test_data
    WHERE id > 2
    GROUP BY bool_col
    ORDER BY cnt DESC
""")

df

In [None]:
# View the logical plan
print("Logical Plan:")
print(df.explain())

In [None]:
# Visualize the execution plan
# This shows the query plan as a graph (requires graphviz for full SVG)
df.explain_visual()

In [None]:
# View the plan with runtime statistics (analyze=True runs the query)
print("Analyzed Plan (with statistics):")
print(df.explain(analyze=True))

## Multi-Stage Queries

Complex queries may involve multiple stages of distributed execution.

In [None]:
# Subquery example
result = ctx.sql("""
    WITH stats AS (
        SELECT 
            bool_col,
            COUNT(*) as cnt
        FROM test_data
        GROUP BY bool_col
    )
    SELECT 
        t.id,
        t.bool_col,
        s.cnt as group_count
    FROM test_data t
    JOIN stats s ON t.bool_col = s.bool_col
    WHERE t.id <= 5
    ORDER BY t.id
""")

result

In [None]:
# View the execution plan - notice the join and exchange stages
result.explain_visual()

## DataFrame API for Complex Transformations

In [None]:
# Build a complex transformation using the DataFrame API
df1 = ctx.table("test_data")

# Aggregate to get group statistics
group_stats = df1.aggregate(
    [col("bool_col")],
    [
        f.count_star().alias("group_count"),
        f.avg(col("id")).alias("avg_id"),
    ]
)

group_stats

In [None]:
# Join original data with statistics
joined = df1.join(
    group_stats,
    on="bool_col",
    how="inner"
)

joined.select("id", "bool_col", "group_count", "avg_id").limit(10)

## Window Functions

Window functions allow computations across related rows.

In [None]:
# Window function example
window_result = ctx.sql("""
    SELECT 
        id,
        bool_col,
        tinyint_col,
        SUM(tinyint_col) OVER (
            PARTITION BY bool_col 
            ORDER BY id
        ) as running_sum,
        ROW_NUMBER() OVER (
            PARTITION BY bool_col 
            ORDER BY id
        ) as row_num
    FROM test_data
    ORDER BY bool_col, id
""")

window_result

## Progress Tracking for Long Queries

For long-running queries, you can track progress.

In [None]:
# Execute a query with progress tracking
df = ctx.sql("SELECT * FROM test_data")

# collect_with_progress shows elapsed time in Jupyter
batches = df.collect_with_progress()
print(f"Collected {len(batches)} batch(es)")

In [None]:
# You can also provide a custom callback
def my_progress_callback(status, progress):
    if progress < 0:
        print(f"Status: {status} (in progress...)")
    else:
        print(f"Status: {status} ({progress:.0%} complete)")

df = ctx.sql("SELECT * FROM test_data")
batches = df.collect_with_progress(callback=my_progress_callback)

## Writing Results

Distributed write operations for large result sets.

In [None]:
# Prepare a query result
df = ctx.sql("""
    SELECT 
        id,
        bool_col,
        tinyint_col * 2 as doubled
    FROM test_data
    WHERE id > 3
""")

df

In [None]:
# Write to Parquet (distributed write)
# df.write_parquet("../target/output.parquet")

# Write to CSV
# df.write_csv("../target/output.csv")

# Write to JSON
# df.write_json("../target/output.json")

print("Write operations are commented out - uncomment to test")

## Best Practices for Distributed Queries

1. **Filter early**: Push filters as close to the data source as possible
2. **Project early**: Select only needed columns to reduce data movement
3. **Partition wisely**: Ensure data is partitioned for efficient joins
4. **Check plans**: Use `explain()` and `explain_visual()` to understand execution
5. **Monitor progress**: Use `collect_with_progress()` for long queries

In [None]:
# Example: Optimized query pattern
optimized = (
    ctx.table("test_data")
    # 1. Filter early
    .filter(col("id") > lit(2))
    # 2. Project only needed columns
    .select("id", "bool_col", "tinyint_col")
    # 3. Aggregate
    .aggregate(
        [col("bool_col")],
        [f.count_star().alias("cnt")]
    )
)

# 4. Check the plan
print("Optimized plan:")
print(optimized.explain())

## Next Steps

- Review the [Ballista Architecture docs](https://datafusion.apache.org/ballista/)
- Learn about cluster deployment and configuration
- Explore advanced features like custom functions and plugins