In [1]:
import threading
import time

In [3]:
class RawCASCounter:

    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()

    def raw_cas(self, address, expected, new_value):
        with self._lock:
            current = self.value
            print(f"CAS attempt: Expected {expected}, Current {current}")
            if current == expected:
                self.value = new_value  # Swap!
                print(f"  -> SWAP SUCCESS! Now {self.value}")
                return True
            else:
                print(f"  -> NO SWAP (mismatch). Retry needed.")
                return False

    def increment(self):
        expected = self.value
        print(f"Thread {threading.current_thread().name} starting increment. ${expected} -> ${expected + 1}")
        while True:
            if self.raw_cas(id(self.value), expected, expected + 1):
                break
            expected = self.value  # Update expected to current value
            time.sleep(0.01)  # Sleep briefly to avoid busy-waiting





In [4]:
def thread1(counter):
    print("Thread 1 starting increment...")
    counter.increment()
    print("Thread 1 finished.")

def thread2(counter):
    time.sleep(0.05)
    print("Thread 2 starting increment...")
    counter.increment()
    print("Thread 2 finished.")

In [7]:
if __name__ == "__main__":
    counter = RawCASCounter()
    print(f"Initial counter value: {counter.value}")
    t1 = threading.Thread(target=thread1, args=(counter,), name="T1")
    t2 = threading.Thread(target=thread2, args=(counter,), name="T2")
    t1.start()    
    t2.start()
    t1.join()    
    t2.join()
    print(f"Final counter value: {counter.value}")


Initial counter value: 0
Thread 1 starting increment...
Thread T1 starting increment. $0 -> $1
CAS attempt: Expected 0, Current 0
  -> SWAP SUCCESS! Now 1
Thread 1 finished.
Thread 2 starting increment...
Thread T2 starting increment. $1 -> $2
CAS attempt: Expected 1, Current 1
  -> SWAP SUCCESS! Now 2
Thread 2 finished.
Final counter value: 2


Without Lock (RACE CHAOS)

In [15]:
import threading
import time

counter = 0  # Shared, no protection

def bad_increment():
    global counter
    for _ in range(1000):
        temp = counter          # Read (GIL holds)
        time.sleep(0.000001)    # Sleep: Releases GIL! Another thread can jump in
        counter = temp + 1      # Write: Might overwrite another's change

threads = [threading.Thread(target=bad_increment) for _ in range(2)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Without lock: {counter}")

Without lock: 1001


In [12]:
import threading

counter = 0
lock = threading.Lock()

def good_increment():
    global counter
    for _ in range(1000):
        with lock:  # Atomic section
            counter += 1

threads = [threading.Thread(target=good_increment) for _ in range(2)]
for t in threads: t.start()
for t in threads: t.join()
print(f"With lock: {counter}")  # Always 2000

With lock: 2000
