using Microsoft.Azure.Cosmos;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Linq;
using Microsoft.Azure.Cosmos.Linq;

public class CosmosTransactionManager
{
    private readonly Container _container;
    private readonly CosmosClient _client;
    private readonly string _databaseId;

    public CosmosTransactionManager(string connectionString, string databaseId)
    {
        _client = new CosmosClient(connectionString);
        _databaseId = databaseId;
        _container = _client.GetDatabase(databaseId).GetContainer("orders");
    }

    // Single-partition transaction example
    public async Task<bool> ProcessOrderTransaction(string orderId, string customerId)
    {
        // Define a batch of operations
        TransactionalBatch batch = _container.CreateTransactionalBatch(
            new PartitionKey(customerId));

        try
        {
            // Read the order
            var order = await _container.ReadItemAsync<Order>(
                orderId, 
                new PartitionKey(customerId)
            );

            // Create payment record
            var payment = new Payment
            {
                Id = Guid.NewGuid().ToString(),
                OrderId = orderId,
                CustomerId = customerId,
                Amount = order.Resource.TotalAmount,
                Status = "Processing",
                Timestamp = DateTime.UtcNow
            };

            // Update order status
            order.Resource.Status = "Processing";
            order.Resource.LastUpdated = DateTime.UtcNow;

            // Add operations to batch
            batch.ReplaceItem(orderId, order.Resource);
            batch.CreateItem(payment);

            // Execute the batch
            using TransactionalBatchResponse response = await batch.ExecuteAsync();
            
            if (response.IsSuccessStatusCode)
            {
                return true;
            }

            // Handle specific error cases
            if (response.StatusCode == System.Net.HttpStatusCode.PreconditionFailed)
            {
                throw new Exception("Concurrent modification detected");
            }

            return false;
        }
        catch (CosmosException ex) when (ex.StatusCode == System.Net.HttpStatusCode.NotFound)
        {
            throw new Exception("Order not found");
        }
        catch (Exception ex)
        {
            // Log the error
            Console.WriteLine($"Transaction failed: {ex.Message}");
            throw;
        }
    }

    // Cross-partition transaction example
    public async Task<bool> TransferFunds(
        string sourceAccountId, 
        string targetAccountId, 
        decimal amount)
    {
        var accountContainer = _client.GetDatabase(_databaseId)
                                    .GetContainer("accounts");

        // Start transaction batch operations
        List<Task<TransactionalBatchResponse>> operations = 
            new List<Task<TransactionalBatchResponse>>();

        try
        {
            // Read source account
            var sourceAccount = await accountContainer.ReadItemAsync<Account>(
                sourceAccountId, 
                new PartitionKey(sourceAccountId)
            );

            // Read target account
            var targetAccount = await accountContainer.ReadItemAsync<Account>(
                targetAccountId, 
                new PartitionKey(targetAccountId)
            );

            // Validate balances
            if (sourceAccount.Resource.Balance < amount)
            {
                throw new Exception("Insufficient funds");
            }

            // Update source account
            var sourceBatch = accountContainer.CreateTransactionalBatch(
                new PartitionKey(sourceAccountId));
            sourceAccount.Resource.Balance -= amount;
            sourceBatch.ReplaceItem(sourceAccountId, sourceAccount.Resource);

            // Update target account
            var targetBatch = accountContainer.CreateTransactionalBatch(
                new PartitionKey(targetAccountId));
            targetAccount.Resource.Balance += amount;
            targetBatch.ReplaceItem(targetAccountId, targetAccount.Resource);

            // Execute batches
            operations.Add(sourceBatch.ExecuteAsync());
            operations.Add(targetBatch.ExecuteAsync());

            // Wait for all operations to complete
            await Task.WhenAll(operations);

            // Verify all operations succeeded
            bool success = operations.All(op => op.Result.IsSuccessStatusCode);
            
            if (!success)
            {
                // Handle rollback if needed
                await RollbackTransfer(sourceAccountId, targetAccountId, amount);
                return false;
            }

            return true;
        }
        catch (Exception ex)
        {
            // Log error and attempt rollback
            Console.WriteLine($"Transfer failed: {ex.Message}");
            await RollbackTransfer(sourceAccountId, targetAccountId, amount);
            throw;
        }
    }

    // Bulk transaction example
    public async Task<BulkOperationResponse> ProcessBulkTransactions(
        List<Order> orders)
    {
        var response = new BulkOperationResponse
        {
            SuccessCount = 0,
            FailedItems = new List<FailedOperation>()
        };

        List<Task> operations = new List<Task>();
        
        foreach (var orderGroup in orders.GroupBy(o => o.CustomerId))
        {
            var batch = _container.CreateTransactionalBatch(
                new PartitionKey(orderGroup.Key));

            foreach (var order in orderGroup)
            {
                batch.CreateItem(order);
            }

            operations.Add(ProcessBatchWithRetry(batch, orderGroup.ToList(), response));
        }

        await Task.WhenAll(operations);
        return response;
    }

    private async Task ProcessBatchWithRetry(
        TransactionalBatch batch, 
        List<Order> orders,
        BulkOperationResponse response,
        int maxRetries = 3)
    {
        int attempts = 0;
        bool success = false;

        while (!success && attempts < maxRetries)
        {
            try
            {
                using var batchResponse = await batch.ExecuteAsync();
                
                if (batchResponse.IsSuccessStatusCode)
                {
                    response.SuccessCount += orders.Count;
                    success = true;
                }
                else
                {
                    attempts++;
                    await Task.Delay(attempts * 1000); // Exponential backoff
                }
            }
            catch (Exception ex)
            {
                attempts++;
                if (attempts >= maxRetries)
                {
                    orders.ForEach(o => response.FailedItems.Add(
                        new FailedOperation 
                        { 
                            ItemId = o.Id,
                            Error = ex.Message 
                        }));
                }
                await Task.Delay(attempts * 1000);
            }
        }
    }

    private async Task RollbackTransfer(
        string sourceAccountId, 
        string targetAccountId, 
        decimal amount)
    {
        try
        {
            var accountContainer = _client.GetDatabase(_databaseId)
                                        .GetContainer("accounts");

            // Attempt to restore original balances
            var sourceAccount = await accountContainer.ReadItemAsync<Account>(
                sourceAccountId, 
                new PartitionKey(sourceAccountId)
            );
            var targetAccount = await accountContainer.ReadItemAsync<Account>(
                targetAccountId, 
                new PartitionKey(targetAccountId)
            );

            var sourceBatch = accountContainer.CreateTransactionalBatch(
                new PartitionKey(sourceAccountId));
            sourceAccount.Resource.Balance += amount;
            sourceBatch.ReplaceItem(sourceAccountId, sourceAccount.Resource);

            var targetBatch = accountContainer.CreateTransactionalBatch(
                new PartitionKey(targetAccountId));
            targetAccount.Resource.Balance -= amount;
            targetBatch.ReplaceItem(targetAccountId, targetAccount.Resource);

            await Task.WhenAll(
                sourceBatch.ExecuteAsync(),
                targetBatch.ExecuteAsync()
            );
        }
        catch (Exception ex)
        {
            // Log rollback failure
            Console.WriteLine($"Rollback failed: {ex.Message}");
            throw;
        }
    }
}

// Supporting classes
public class Order
{
    public string Id { get; set; }
    public string CustomerId { get; set; }
    public decimal TotalAmount { get; set; }
    public string Status { get; set; }
    public DateTime LastUpdated { get; set; }
}

public class Payment
{
    public string Id { get; set; }
    public string OrderId { get; set; }
    public string CustomerId { get; set; }
    public decimal Amount { get; set; }
    public string Status { get; set; }
    public DateTime Timestamp { get; set; }
}

public class Account
{
    public string Id { get; set; }
    public decimal Balance { get; set; }
}

public class BulkOperationResponse
{
    public int SuccessCount { get; set; }
    public List<FailedOperation> FailedItems { get; set; }
}

public class FailedOperation
{
    public string ItemId { get; set; }
    public string Error { get; set; }
}

using Microsoft.Azure.Cosmos;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Linq;

public class AriesTransactionManager
{
    private readonly Container _container;
    private readonly CosmosClient _client;
    private readonly string _databaseId;
    private readonly string _containerId;

    public AriesTransactionManager(CosmosClient client, string databaseId, string containerId)
    {
        _client = client;
        _databaseId = databaseId;
        _containerId = containerId;
        _container = client.GetDatabase(databaseId).GetContainer(containerId);
    }

    public async Task<TransactionResult> ExecuteTransactionAsync(List<TransactionOperation> operations)
    {
        // Group operations by partition key
        var operationsByPartition = operations.GroupBy(o => o.Document.PartitionKey.ToString());
        
        var transactionLog = new TransactionLog
        {
            Id = Guid.NewGuid().ToString(),
            Status = TransactionStatus.Pending,
            Operations = operations,
            Timestamp = DateTime.UtcNow,
            PartitionGroups = operationsByPartition.Select(g => g.Key).ToList()
        };

        try
        {
            // Phase 1: Write-Ahead Logging (in its own partition)
            await _container.CreateItemAsync(
                transactionLog, 
                new PartitionKey($"txlog_{transactionLog.Id}")
            );

            // Phase 2: Execute Operations by Partition
            foreach (var partitionGroup in operationsByPartition)
            {
                var partitionKey = partitionGroup.Key;
                var partitionOps = partitionGroup.ToList();

                try
                {
                    // Update checkpoint before processing partition
                    transactionLog.CurrentPartition = partitionKey;
                    await UpdateTransactionLog(transactionLog);

                    // Process operations in this partition as a batch
                    await ExecutePartitionBatch(partitionKey, partitionOps, transactionLog);
                }
                catch (Exception ex)
                {
                    await RollbackTransaction(transactionLog);
                    return new TransactionResult
                    {
                        Success = false,
                        TransactionId = transactionLog.Id,
                        Error = $"Failed in partition {partitionKey}: {ex.Message}"
                    };
                }
            }

            // Phase 3: Mark as Committed
            transactionLog.Status = TransactionStatus.Committed;
            transactionLog.CompletedAt = DateTime.UtcNow;
            await UpdateTransactionLog(transactionLog);

            return new TransactionResult
            {
                Success = true,
                TransactionId = transactionLog.Id
            };
        }
        catch (Exception ex)
        {
            await RollbackTransaction(transactionLog);
            return new TransactionResult
            {
                Success = false,
                TransactionId = transactionLog.Id,
                Error = ex.Message
            };
        }
    }

    private async Task ExecutePartitionBatch(
        string partitionKey, 
        List<TransactionOperation> operations,
        TransactionLog log)
    {
        var batch = _container.CreateTransactionalBatch(new PartitionKey(partitionKey));
        var compensatingOperations = new List<TransactionOperation>();

        // Add operations to batch
        foreach (var operation in operations)
        {
            var compensatingOp = CreateCompensatingOperation(operation);
            compensatingOperations.Insert(0, compensatingOp);

            switch (operation.Type)
            {
                case OperationType.Create:
                    batch.CreateItem(operation.Document);
                    break;

                case OperationType.Update:
                    batch.ReplaceItem(
                        operation.Document.Id.ToString(),
                        operation.Document
                    );
                    break;

                case OperationType.Delete:
                    batch.DeleteItem(operation.Document.Id.ToString());
                    break;
            }
        }

        // Execute batch
        using TransactionalBatchResponse batchResponse = await batch.ExecuteAsync();
        
        if (!batchResponse.IsSuccessStatusCode)
        {
            throw new Exception($"Batch failed with status {batchResponse.StatusCode}");
        }

        // Update transaction log with compensating operations
        log.CompensatingOperations.AddRange(compensatingOperations);
        await UpdateTransactionLog(log);
    }

    private async Task RollbackTransaction(TransactionLog log)
    {
        log.Status = TransactionStatus.RollingBack;
        await UpdateTransactionLog(log);

        // Group compensating operations by partition
        var compensatingByPartition = log.CompensatingOperations
            .GroupBy(o => o.Document.PartitionKey.ToString());

        foreach (var partitionGroup in compensatingByPartition)
        {
            var partitionKey = partitionGroup.Key;
            var compensatingOps = partitionGroup.ToList();

            try
            {
                var batch = _container.CreateTransactionalBatch(new PartitionKey(partitionKey));

                // Add compensating operations to batch
                foreach (var operation in compensatingOps)
                {
                    switch (operation.Type)
                    {
                        case OperationType.Create:
                            batch.CreateItem(operation.Document);
                            break;

                        case OperationType.Update:
                            batch.ReplaceItem(
                                operation.Document.Id.ToString(),
                                operation.Document
                            );
                            break;

                        case OperationType.Delete:
                            batch.DeleteItem(operation.Document.Id.ToString());
                            break;
                    }
                }

                // Execute rollback batch
                using var response = await batch.ExecuteAsync();
                
                if (!response.IsSuccessStatusCode)
                {
                    log.RollbackErrors = log.RollbackErrors ?? new List<RollbackError>();
                    log.RollbackErrors.Add(new RollbackError
                    {
                        PartitionKey = partitionKey,
                        Error = $"Rollback batch failed with status {response.StatusCode}"
                    });
                }
            }
            catch (Exception ex)
            {
                log.RollbackErrors = log.RollbackErrors ?? new List<RollbackError>();
                log.RollbackErrors.Add(new RollbackError
                {
                    PartitionKey = partitionKey,
                    Error = ex.Message
                });
            }
        }

        log.Status = TransactionStatus.RolledBack;
        await UpdateTransactionLog(log);
    }

    private async Task UpdateTransactionLog(TransactionLog log)
    {
        await _container.ReplaceItemAsync(
            log,
            log.Id,
            new PartitionKey($"txlog_{log.Id}")
        );
    }

    private TransactionOperation CreateCompensatingOperation(TransactionOperation operation)
    {
        switch (operation.Type)
        {
            case OperationType.Create:
                return new TransactionOperation
                {
                    Type = OperationType.Delete,
                    Document = operation.Document
                };

            case OperationType.Update:
                return new TransactionOperation
                {
                    Type = OperationType.Update,
                    Document = operation.OriginalDocument
                };

            case OperationType.Delete:
                return new TransactionOperation
                {
                    Type = OperationType.Create,
                    Document = operation.Document
                };

            default:
                throw new ArgumentException($"Unknown operation type: {operation.Type}");
        }
    }

    public async Task<List<TransactionLog>> RecoverIncompleteTransactions()
    {
        var incompleteTransactions = new List<TransactionLog>();

        var query = new QueryDefinition(
            "SELECT * FROM c WHERE c.type = 'transaction_log' AND c.status IN ('PENDING', 'ROLLING_BACK')"
        );

        using var iterator = _container.GetItemQueryIterator<TransactionLog>(query);
        while (iterator.HasMoreResults)
        {
            var response = await iterator.ReadNextAsync();
            foreach (var log in response)
            {
                await RecoverTransaction(log);
                incompleteTransactions.Add(log);
            }
        }

        return incompleteTransactions;
    }

    private async Task RecoverTransaction(TransactionLog log)
    {
        if (log.Status == TransactionStatus.Pending)
        {
            // Find partitions that haven't been processed yet
            var remainingPartitions = log.PartitionGroups
                .Skip(log.PartitionGroups.IndexOf(log.CurrentPartition) + 1);

            try
            {
                foreach (var partition in remainingPartitions)
                {
                    var partitionOps = log.Operations
                        .Where(o => o.Document.PartitionKey.ToString() == partition)
                        .ToList();

                    await ExecutePartitionBatch(partition, partitionOps, log);
                }

                log.Status = TransactionStatus.Committed;
                log.CompletedAt = DateTime.UtcNow;
            }
            catch
            {
                await RollbackTransaction(log);
            }
        }
        else if (log.Status == TransactionStatus.RollingBack)
        {
            await RollbackTransaction(log);
        }

        await UpdateTransactionLog(log);
    }
}

public class TransactionLog
{
    public string Id { get; set; }
    public TransactionStatus Status { get; set; }
    public List<TransactionOperation> Operations { get; set; } = new List<TransactionOperation>();
    public List<TransactionOperation> CompensatingOperations { get; set; } = new List<TransactionOperation>();
    public List<string> PartitionGroups { get; set; } = new List<string>();
    public string CurrentPartition { get; set; }
    public DateTime Timestamp { get; set; }
    public DateTime? CompletedAt { get; set; }
    public List<RollbackError> RollbackErrors { get; set; }
}

public class RollbackError
{
    public string PartitionKey { get; set; }
    public string Error { get; set; }
}

// Other supporting classes remain the same