In [1]:
import sys
print(sys.executable)


/Users/kosiew/GitHub/datafusion-python/.venv/bin/python3.13


In [4]:
import time
import pyarrow as pa
from datafusion import SessionContext

def create_very_large_dataset():
    """Create a much larger dataset that will take time to process."""
    ctx = SessionContext()
    
    # Create much larger record batches
    batches = []
    for i in range(100):  # Increased from 10 to 100
        batch = pa.RecordBatch.from_arrays(
            [
                pa.array(list(range(i * 10000, (i + 1) * 10000))),  # 10k rows per batch
                pa.array([f"value_{j}" for j in range(i * 10000, (i + 1) * 10000)]),
                pa.array([j * 1.5 for j in range(i * 10000, (i + 1) * 10000)]),  # Float column
                pa.array([f"category_{j % 1000}" for j in range(i * 10000, (i + 1) * 10000)]),  # Categories
            ],
            names=["id", "text_col", "float_col", "category"],
        )
        batches.append(batch)
    
    # Fix: Register multiple large tables - wrap batches in a list for partitions
    ctx.register_record_batches("large_table1", [batches])  # List of partitions
    ctx.register_record_batches("large_table2", [batches])  # List of partitions
    ctx.register_record_batches("large_table3", [batches])  # List of partitions
    
    print(f"Created dataset with {len(batches)} batches, ~{len(batches) * 10000:,} rows each")
    return ctx

# Setup the test environment
ctx = create_very_large_dataset()

Created dataset with 100 batches, ~1,000,000 rows each


In [5]:
# This will definitely be slow enough to interrupt
df = ctx.sql("""
    SELECT 
        t1.id,
        t2.id as id2,
        t1.float_col * t2.float_col as product,
        CONCAT(t1.text_col, '_', t2.text_col) as combined_text,
        SIN(t1.float_col) + COS(t2.float_col) as trig_calc,
        CASE 
            WHEN t1.id % 2 = 0 THEN 'even'
            ELSE 'odd'
        END as parity
    FROM large_table1 t1
    CROSS JOIN large_table2 t2
    WHERE t1.id BETWEEN 1000 AND 5000
      AND t2.id BETWEEN 1500 AND 5500
    ORDER BY product DESC
    LIMIT 900000
""")

print("Starting cartesian product query...")
print("Click ⬜ stop button to interrupt!")

try:
    result = df.collect()
    print(f"Query completed! Got {len(result)} batches")
except KeyboardInterrupt:
    print("✅ Query was successfully interrupted by ⬜ stop button")
except Exception as e:
    print(f"Error: {e}")

Starting cartesian product query...
Click ⬜ stop button to interrupt!
✅ Query was successfully interrupted by ⬜ stop button
