Skip to content

Real-Time Conflict Resolution & Operational Transformation (OT) #46

@bchou9

Description

@bchou9

Feature Description

Implement Operational Transformation (OT) or Conflict-free Replicated Data Types (CRDTs) to handle concurrent editing conflicts gracefully and ensure eventual consistency across all connected clients.

Problem Statement

Current concurrency issues:

  • Race conditions when multiple users edit simultaneously
  • Last-write-wins approach can lose data
  • No conflict detection or resolution
  • Undo/redo breaks when others are editing
  • Strokes can appear in wrong order
  • No guarantee of consistency across clients

Operational Transformation Basics

OT ensures that concurrent operations converge to the same state by transforming operations against each other.

Example Conflict:

  • User A: Adds stroke at position 5
  • User B: Deletes stroke at position 3 (simultaneously)

Without OT, User A's stroke might reference a position that no longer exists after User B's deletion.

1. OT Core Implementation

// frontend/src/services/OperationalTransformation.js
export class OperationTransformer {
  /**
   * Transform operation A against operation B
   * Returns transformed version of A that accounts for B's changes
   */
  transform(opA, opB) {
    const type = `${opA.type}_${opB.type}`;
    
    switch (type) {
      case 'insert_insert':
        return this.transformInsertInsert(opA, opB);
      case 'insert_delete':
        return this.transformInsertDelete(opA, opB);
      case 'delete_insert':
        return this.transformDeleteInsert(opA, opB);
      case 'delete_delete':
        return this.transformDeleteDelete(opA, opB);
      default:
        return opA;
    }
  }

  transformInsertInsert(opA, opB) {
    // If B inserts before A's position, shift A's position
    if (opB.position <= opA.position) {
      return { ...opA, position: opA.position + 1 };
    }
    return opA;
  }

  transformInsertDelete(opA, opB) {
    // If B deletes before A's position, shift A's position down
    if (opB.position < opA.position) {
      return { ...opA, position: opA.position - 1 };
    }
    return opA;
  }

  transformDeleteInsert(opA, opB) {
    // If B inserts before A's delete position, shift A up
    if (opB.position <= opA.position) {
      return { ...opA, position: opA.position + 1 };
    }
    return opA;
  }

  transformDeleteDelete(opA, opB) {
    // Both trying to delete same position - B wins
    if (opA.position === opB.position) {
      return null; // Cancel A's operation
    }
    // B deleted before A's position
    if (opB.position < opA.position) {
      return { ...opA, position: opA.position - 1 };
    }
    return opA;
  }
}

2. Vector Clock for Causality

// frontend/src/services/VectorClock.js
export class VectorClock {
  constructor(clientId) {
    this.clientId = clientId;
    this.clock = { [clientId]: 0 };
  }

  increment() {
    this.clock[this.clientId]++;
    return this.getCopy();
  }

  update(otherClock) {
    for (const [clientId, timestamp] of Object.entries(otherClock)) {
      this.clock[clientId] = Math.max(
        this.clock[clientId] || 0,
        timestamp
      );
    }
  }

  getCopy() {
    return { ...this.clock };
  }

  happensBefore(otherClock) {
    let strictlyLess = false;
    
    for (const clientId in this.clock) {
      if (this.clock[clientId] > (otherClock[clientId] || 0)) {
        return false;
      }
      if (this.clock[clientId] < (otherClock[clientId] || 0)) {
        strictlyLess = true;
      }
    }
    
    return strictlyLess;
  }

  concurrent(otherClock) {
  }

  happensAfter(otherClock) {
    return new VectorClock(this.clientId).setClock(otherClock).happensBefore(this.clock);
  }

  setClock(clock) {
    this.clock = { ...clock };
    return this;
  }
}

3. Operation History Buffer

// frontend/src/services/OperationHistory.js
export class OperationHistory {
  constructor(clientId) {
    this.clientId = clientId;
    this.history = [];
    this.vectorClock = new VectorClock(clientId);
    this.transformer = new OperationTransformer();
  }

  addLocalOperation(operation) {
    const op = {
      ...operation,
      clientId: this.clientId,
      vectorClock: this.vectorClock.increment(),
      id: generateOperationId()
    };
    
    this.history.push(op);
    return op;
  }

  addRemoteOperation(operation) {
    // Update our vector clock
    this.vectorClock.update(operation.vectorClock);
    
    // Transform against concurrent local operations
    let transformedOp = operation;
    
    for (const localOp of this.getLocalOperationsSince(operation.vectorClock)) {
      transformedOp = this.transformer.transform(transformedOp, localOp);
    }
    
    this.history.push(transformedOp);
    return transformedOp;
  }

  getLocalOperationsSince(vectorClock) {
    return this.history.filter(op => 
      op.clientId === this.clientId &&
    );
  }

  pruneHistory(beforeTimestamp) {
    this.history = this.history.filter(op => 
      op.timestamp > beforeTimestamp
    );
  }
}

4. Canvas Operation Types

// frontend/src/models/operations.js
export const OperationType = {
  INSERT_STROKE: 'insert_stroke',
  DELETE_STROKE: 'delete_stroke',
  MODIFY_STROKE: 'modify_stroke',
  MOVE_STROKE: 'move_stroke',
  CLEAR_CANVAS: 'clear_canvas'
};

export class Operation {
  static insertStroke(stroke, position) {
    return {
      type: OperationType.INSERT_STROKE,
      stroke: stroke,
      position: position,
      timestamp: Date.now()
    };
  }

  static deleteStroke(strokeId, position) {
    return {
      type: OperationType.DELETE_STROKE,
      strokeId: strokeId,
      position: position,
      timestamp: Date.now()
    };
  }

  static modifyStroke(strokeId, changes) {
    return {
      type: OperationType.MODIFY_STROKE,
      strokeId: strokeId,
      changes: changes,
      timestamp: Date.now()
    };
  }
}

5. Collaborative Canvas Manager

// frontend/src/services/CollaborativeCanvas.js
export class CollaborativeCanvas {
  constructor(roomId, userId, socket) {
    this.roomId = roomId;
    this.userId = userId;
    this.socket = socket;
    this.operationHistory = new OperationHistory(userId);
    this.pendingOperations = [];
  }

  localStrokeAdded(stroke) {
    const operation = Operation.insertStroke(stroke, this.getStrokeCount());
    const op = this.operationHistory.addLocalOperation(operation);
    
    // Send to server
    this.socket.emit('operation', {
      roomId: this.roomId,
      operation: op
    });
    
    // Apply locally
    this.applyOperation(op);
  }

  remoteOperationReceived(operation) {
    const transformedOp = this.operationHistory.addRemoteOperation(operation);
    
    if (transformedOp) {
      this.applyOperation(transformedOp);
    }
  }

  applyOperation(operation) {
    switch (operation.type) {
      case OperationType.INSERT_STROKE:
        this.canvas.insertStrokeAt(operation.stroke, operation.position);
        break;
      case OperationType.DELETE_STROKE:
        this.canvas.deleteStrokeAt(operation.position);
        break;
      case OperationType.MODIFY_STROKE:
        this.canvas.modifyStroke(operation.strokeId, operation.changes);
        break;
    }
  }

  undo() {
    // Get last local operation
    const lastOp = this.operationHistory.getLastLocalOperation();
    
    // Create inverse operation
    const inverseOp = this.createInverseOperation(lastOp);
    const op = this.operationHistory.addLocalOperation(inverseOp);
    
    // Broadcast
    this.socket.emit('operation', {
      roomId: this.roomId,
      operation: op
    });
    
    this.applyOperation(op);
  }

  createInverseOperation(operation) {
    switch (operation.type) {
      case OperationType.INSERT_STROKE:
        return Operation.deleteStroke(operation.stroke.id, operation.position);
      case OperationType.DELETE_STROKE:
        return Operation.insertStroke(operation.stroke, operation.position);
      default:
        return null;
    }
  }
}

6. Backend Operation Server

# backend/services/operation_server.py
class OperationServer:
    def __init__(self):
        self.room_operations = {}  # room_id -> list of operations
        
    def receive_operation(self, room_id, operation):
        if room_id not in self.room_operations:
            self.room_operations[room_id] = []
        
        # Validate operation
        if not self.validate_operation(operation):
            return {'error': 'Invalid operation'}
        
        # Store operation
        self.room_operations[room_id].append(operation)
        
        # Broadcast to other clients
        return {'success': True, 'operation': operation}
    
    def validate_operation(self, operation):
        # Check required fields
        required = ['type', 'clientId', 'vectorClock', 'timestamp']
        return all(field in operation for field in required)
    
    def get_operations_since(self, room_id, vector_clock):
        if room_id not in self.room_operations:
            return []
        
        # Return operations that happened after the given vector clock
        return [
            op for op in self.room_operations[room_id]
            if not self.happened_before(op['vectorClock'], vector_clock)
        ]
    
    def happened_before(self, clock_a, clock_b):
        # Vector clock comparison
        for client_id in clock_a:
            if clock_a[client_id] > clock_b.get(client_id, 0):
                return False
        return True

7. Socket.IO Integration

# backend/routes/socketio_handlers.py
from services.operation_server import OperationServer

operation_server = OperationServer()

@socketio.on('operation')
@require_auth_socketio
def handle_operation(data):
    user = get_current_user()
    room_id = data.get('roomId')
    operation = data.get('operation')
    
    # Process operation
    result = operation_server.receive_operation(room_id, operation)
    
    if 'error' in result:
        emit('operation_error', result)
        return
    
    # Broadcast to room (except sender)
    emit('operation', {
        'operation': operation
    }, room=room_id, skip_sid=request.sid)

Files to Create/Modify

Frontend:

  • frontend/src/services/OperationalTransformation.js ⭐ (NEW)
  • frontend/src/services/VectorClock.js ⭐ (NEW)
  • frontend/src/services/OperationHistory.js ⭐ (NEW)
  • frontend/src/services/CollaborativeCanvas.js ⭐ (NEW)
  • frontend/src/models/operations.js ⭐ (NEW)
  • frontend/src/components/Canvas.js (MODIFY)

Backend:

  • backend/services/operation_server.py ⭐ (NEW)
  • backend/routes/socketio_handlers.py (MODIFY)

Benefits

  • Guaranteed eventual consistency
  • No lost edits due to conflicts
  • Proper concurrent undo/redo
  • Resilient to network delays
  • Scales to many simultaneous users
  • Professional collaborative editing like Google Docs

Testing Requirements

  • Simulate concurrent operations
  • Test network partition scenarios
  • Verify convergence with 3+ clients
  • Stress test with rapid operations
  • Test undo/redo during conflicts

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions