In [0]:
# Databricks Concurrency Write Conflict Test
# Run this test to prove write conflicts exist

# Test Setup: Create this notebook and run simultaneously from 2 different clusters

import time
import random
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from datetime import datetime
import uuid

# Configuration
TABLE_NAME = "hive_metastore.default.concurrency_test_table"
CLUSTER_ID = str(uuid.uuid4())[:8]  # Unique identifier for this cluster

print(f"Starting test on cluster: {CLUSTER_ID}")
print(f"Target table: {TABLE_NAME}")
print(f"Timestamp: {datetime.now()}")

# Step 2: Concurrent write test function
def test_concurrent_writes(num_attempts=10):
    """
    This function will cause write conflicts when run simultaneously
    from multiple clusters
    """
    
    conflicts_detected = 0
    successful_writes = 0
    
    for attempt in range(num_attempts):
        try:
            # Generate unique operation ID
            operation_id = f"{CLUSTER_ID}_{attempt}_{int(time.time())}"
            
            print(f"Attempt {attempt + 1}: Starting write operation {operation_id}")
            
            # Read current max ID (this creates a dependency on table state)
            current_max = spark.sql(f"SELECT COALESCE(MAX(id), 0) as max_id FROM {TABLE_NAME}").collect()[0]['max_id']
            next_id = current_max + 1
            
            # Small random delay to increase chance of conflict
            time.sleep(random.uniform(0.1, 0.5))
            
            # Attempt to insert with next sequential ID
            spark.sql(f"""
            INSERT INTO {TABLE_NAME} 
            VALUES ({next_id}, '{CLUSTER_ID}', current_timestamp(), '{operation_id}')
            """)
            
            successful_writes += 1
            print(f"✅ Write {attempt + 1} successful: ID {next_id}")
            
        except Exception as e:
            error_msg = str(e)
            if "ConcurrentAppendException" in error_msg or "concurrent" in error_msg.lower():
                conflicts_detected += 1
                print(f"🚨 CONFLICT DETECTED on attempt {attempt + 1}: {error_msg[:100]}...")
            else:
                print(f"❌ Other error on attempt {attempt + 1}: {error_msg[:100]}...")
            
            # Brief pause before retry
            time.sleep(1)
    
    return conflicts_detected, successful_writes

# Step 3: Alternative test using MERGE (more likely to cause conflicts)
def test_merge_conflicts(num_attempts=15):
    """
    MERGE operations are more likely to cause conflicts
    """
    
    conflicts_detected = 0
    successful_operations = 0
    
    for attempt in range(num_attempts):
        try:
            operation_id = f"merge_{CLUSTER_ID}_{attempt}_{int(time.time())}"
            
            print(f"MERGE Attempt {attempt + 1}: {operation_id}")
            
            # Create a small temp view to merge
            temp_data = spark.createDataFrame([
                (1000 + attempt, CLUSTER_ID, datetime.now(), operation_id)
            ], ["id", "cluster_id", "timestamp", "operation_id"])
            
            temp_data.createOrReplaceTempView("temp_merge_data")
            
            # Perform MERGE operation
            spark.sql(f"""
            MERGE INTO {TABLE_NAME} AS target
            USING temp_merge_data AS source
            ON target.id = source.id
            WHEN MATCHED THEN
                UPDATE SET 
                    target.cluster_id = source.cluster_id,
                    target.timestamp = source.timestamp,
                    target.operation_id = source.operation_id
            WHEN NOT MATCHED THEN
                INSERT (id, cluster_id, timestamp, operation_id)
                VALUES (source.id, source.cluster_id, source.timestamp, source.operation_id)
            """)
            
            successful_operations += 1
            print(f"✅ MERGE {attempt + 1} successful")
            
        except Exception as e:
            error_msg = str(e)
            if "ConcurrentAppendException" in error_msg or "concurrent" in error_msg.lower():
                conflicts_detected += 1
                print(f"🚨 MERGE CONFLICT DETECTED on attempt {attempt + 1}: {error_msg[:100]}...")
            else:
                print(f"❌ MERGE error on attempt {attempt + 1}: {error_msg[:100]}...")
            
            time.sleep(1)
    
    return conflicts_detected, successful_operations

# Step 4: Results analysis
def analyze_results():
    """Check the final state of the table"""
    
    print("\n" + "="*50)
    print("FINAL RESULTS ANALYSIS")
    print("="*50)
    
    # Count total rows
    total_rows = spark.sql(f"SELECT COUNT(*) as count FROM {TABLE_NAME}").collect()[0]['count']
    print(f"Total rows in table: {total_rows}")
    
    # Show distribution by cluster
    cluster_distribution = spark.sql(f"""
    SELECT cluster_id, COUNT(*) as row_count 
    FROM {TABLE_NAME} 
    GROUP BY cluster_id 
    ORDER BY cluster_id
    """).collect()
    
    print("\nRows by cluster:")
    for row in cluster_distribution:
        print(f"  {row['cluster_id']}: {row['row_count']} rows")
    
    # Show recent operations
    print(f"\nRecent operations from cluster {CLUSTER_ID}:")
    recent_ops = spark.sql(f"""
    SELECT * FROM {TABLE_NAME} 
    WHERE cluster_id = '{CLUSTER_ID}' 
    ORDER BY timestamp DESC 
    LIMIT 5
    """).show()

# Step 2: Concurrent test (run simultaneously from 2 clusters)
# conflicts, successes = test_concurrent_writes(20)
# print(f"\nTest completed: {conflicts} conflicts detected, {successes} successful writes")

print(f"Cluster {CLUSTER_ID} waiting to start at synchronized time...")
time.sleep(10)  # Give time to start both notebooks
print("Starting NOW!")

# Step 3: MERGE test (run simultaneously from 2 clusters) 
merge_conflicts, merge_successes = test_merge_conflicts(30)
print(f"\nMERGE test completed: {merge_conflicts} conflicts detected, {merge_successes} successful operations")

# Step 4: Results
#analyze_results()

#print(f"\nCluster {CLUSTER_ID} ready for testing!")

Starting test on cluster: 7c63c292
Target table: hive_metastore.default.concurrency_test_table
Timestamp: 2025-10-03 11:04:55.370916
MERGE Attempt 1: merge_7c63c292_0_1759489495
✅ MERGE 1 successful
MERGE Attempt 2: merge_7c63c292_1_1759489497
✅ MERGE 2 successful
MERGE Attempt 3: merge_7c63c292_2_1759489499
✅ MERGE 3 successful
MERGE Attempt 4: merge_7c63c292_3_1759489501
✅ MERGE 4 successful
MERGE Attempt 5: merge_7c63c292_4_1759489503
✅ MERGE 5 successful
MERGE Attempt 6: merge_7c63c292_5_1759489505
✅ MERGE 6 successful
MERGE Attempt 7: merge_7c63c292_6_1759489508
✅ MERGE 7 successful
MERGE Attempt 8: merge_7c63c292_7_1759489510
✅ MERGE 8 successful
MERGE Attempt 9: merge_7c63c292_8_1759489512
✅ MERGE 9 successful
MERGE Attempt 10: merge_7c63c292_9_1759489514
✅ MERGE 10 successful
MERGE Attempt 11: merge_7c63c292_10_1759489516
✅ MERGE 11 successful
MERGE Attempt 12: merge_7c63c292_11_1759489518
✅ MERGE 12 successful
MERGE Attempt 13: merge_7c63c292_12_1759489521
✅ MERGE 13 successfu