In [3]:
import pandas as pd
import time 
from copy import copy

class Operation:
    """Read or Write"""
    READ = "read"
    WRITE = "write"

    def __init__(self, _id: str, target: pd.DataFrame, operation_type: str, function = None):
        self.id = _id
        #Operation.target est un pointeur vers la vraie target de l'Opération
        #Operation.local_target est un pointeur vers une des local target de la Transaction
        self.target = target 
        self.target_local = None 
        self.function = function
        self.operation_type = operation_type
        if self.operation_type == Operation.WRITE and self.function == None:
            raise Exception("A WRITE Operation was not provided a function !")
        self.has_applied = False

    def apply(self) -> bool :
        # Defin a local copy of the target object that we will modify
        if self.operation_type == Operation.READ:
            # Do nothing (simulate a read)
            return True
        elif self.operation_type == Operation.WRITE: 
            # Dataframe specific function : .apply(function)
            # We modify what's pointed by target local
            self.target_local.update(self.target_local.apply(self.function))
            # We assume that applying this function always works, hence why success=True
            # We we might put the application of the function inside a try/except clause and return False if there is an error.
            success = True
            self.has_applied = success
            return success
        else:
            return False



In [10]:
def list_intersect(l1, l2):
    intersect = []
    for item in l1:
        if item in l2:
            intersect.append(l2)
    return intersect

class Transaction:
    phase_READ = 'read'
    phase_VALIDATE = "validate"
    phase_WRITE = "write"
    phase_FINISHED = "finished"

    def __init__(self, _id: str, operation_list):
        self.id = _id
        self.operation_list = operation_list
        self.operation_iter = 0
        # Current phase. None means that we didn't start. 
        self.phase = None
        # Timestamps of different phases
        self.ts_start_read = None
        self.ts_start_validate = None
        self.ts_start_write = None
        self.ts_finished = None
        # Init all_targets

        # Define read and write sets
        self.init_all_targets()
        self.init_read_set()
        self.init_write_set()


    def init_read_set(self):
        self.read_set = []
        for operation in self.operation_list:
            if operation.operation_type == Operation.READ and operation.target not in self.read_set:
                self.read_set.append(operation.target)

    def init_write_set(self):
        self.write_set = []
        for operation in self.operation_list:
            if operation.operation_type == Operation.WRITE and operation.target not in self.write_set:
                self.write_set.append(operation.target)

    def is_finished(self):
        return self.ts_finished

    def init_all_targets(self):
        self.all_targets = []
        # Récolter les targets
        for operation in self.operation_list:
            if operation.target not in self.all_targets:
                self.all_targets.append(operation.target)

    def init_target_local(self):
        """We want to make the local targets of Operations dependent on the Transaction.
        So we'll create a set of shared local targets in the Transaction."""
         # Create copy of all_targets
        self.all_targets_copy = [copy(target) for target in self.all_targets]
        # Faire des pointeurs vers ces copies locales dans chaque operation
        for operation in self.operation_list:
            target_id = self.all_targets.index(operation.target)
            operation.target_local = self.all_targets_copy[target_id]

    def phase_read(self):
        """Apply the current operation.
        Each operation changes its local target."""
        # Create fresh local copies of target objects
        self.init_target_local()
        
        success = self.operation_list[self.operation_iter].apply()
        if success:
            self.operation_iter += 1 
        # If we read all operations :
        if self.operation_iter >= len(self.operation_list):
            return True
        else:
            return False

    def phase_validate(self, other_transactions):
        earlier_transactions = []
        for transaction in other_transactions:
            if transaction.ts_start_read:
                if transaction.ts_start_read < self.ts_start_read:
                    earlier_transactions.append(transaction)
        for transaction in earlier_transactions:
            # Test 1
            if transaction.ts_finished == None:
                test_1 = False
            else:
                test_1 = transaction.ts_finished < self.ts_start_read
            # Test 3
            if transaction.ts_start_validate == None:
                test_3 = False
            else:
                test_3 = (
                    transaction.ts_start_validate < self.ts_start_validate 
                    and list_intersect(transaction.write_set, self.read_set) == [] 
                    and list_intersect(transaction.write_set, self.write_set) == []
                )
            # Test 2
            if transaction.ts_finished == None:
                test_2 = False
            else:
                # In the slides, it said to test "transaction.ts_finished < self.ts_start_write"
                # We never have started writing when we are in the validate phase.
                # So we replace "ts writing time" by the current timestamp 
                # That's also why we put test 2 last
                test_2 = transaction.ts_finished < time.time() and list_intersect(transaction.write_set, self.read_set) == []
            test = test_1 or test_2 or test_3
            if test is False:
                return False
        # We succeeded in every test. Return true. 
        return True
    

    def phase_write(self):
        """Commit changes of the current transaction.
        Write the local target to the global target."""
        for i in range(len(self.all_targets)):
            # On veut remplacer le contenu dans la variable pointée par all_targets[i]
            # par le contenu dans all_targets_copy[i]
            self.all_targets[i].update(self.all_targets_copy[i])
        print(self.operation_list[0].target)
        return True
        

    def apply_next(self, other_transactions) -> bool:
        if self.ts_finished:
            return True # Transaction has finished, skip
        # Otherwise, run transaction:
        # READ
        if self.phase == Transaction.phase_READ:
            print(f"Reading {self.id}")
            success = self.phase_read()
            if success:
                self.phase = Transaction.phase_VALIDATE
                self.ts_start_validate = time.time() # Record time when validation begins 
        # VALIDATE
        elif self.phase == Transaction.phase_VALIDATE:
            print(f"Validate {self.id}")
            success = self.phase_validate(other_transactions)
            if success:
                self.phase = Transaction.phase_WRITE
                self.ts_start_write = time.time()
            else:
                # Restart transaction
                print(f"Validating {self.id} failed. Restarting.")
                self.restart()
        # WRITE
        elif self.phase == Transaction.phase_WRITE:
            print(f"Writing {self.id}")
            success = self.phase_write()
            if success:
                self.phase = Transaction.phase_FINISHED
                self.ts_finished = time.time()
        else:
            self.phase = Transaction.phase_READ
            self.ts_start_read = time.time()
            return True
        return success

    def restart(self):
        """Reset transaction"""
        # Current phase. None means that we didn't start. 
        self.phase = None
        self.operation_iter = 0
        # Timestamps of different phases
        self.ts_start_read = None
        self.ts_start_validate = None
        self.ts_start_write = None
        self.ts_finished = None

In [11]:
from random import randint
from copy import copy
from typing import Callable, Iterator, Union, Optional, List

def run_transactions(transactions: List[Transaction], max_iterations=10):
    non_finished_transa = transactions #copy(transactions)
    curr_iteration = 0
    while len(non_finished_transa) > 0 and curr_iteration < max_iterations:
        curr_iteration += 1
        n_transa = len(non_finished_transa)
        # Simulate random access to memory : we try to run the next step of a random transaction
        i = randint(0, n_transa-1) 
        curr_transa = non_finished_transa[i]
        curr_transa.apply_next(non_finished_transa)
        # Remove transactions from list if finished
        if curr_transa.is_finished():
            print(f'Execution of {curr_transa.id} finished.')
            non_finished_transa.pop(i)

In [12]:
df = pd.DataFrame({"test": [1,2,3,55]})
test_target = df

def append_value(df: pd.DataFrame, value) -> pd.DataFrame:
    max_index = max(df.index) 
    df.loc[max_index + 1] = value 
    return df 

test_function = lambda x: x-1

test_transa_1 = Transaction("transa 1", [
    Operation("read values", df, Operation.READ),
    Operation("change stock", df, Operation.WRITE, test_function),
])

test_transa_2 = Transaction("transa 2", [
    Operation("read values", df, Operation.READ ),
    Operation("change stock", df, Operation.WRITE, test_function),
])

run_transactions([test_transa_1, test_transa_2], 100)
df

Reading transa 2
init
Reading transa 1
init
Reading transa 2
Reading transa 1
Validate transa 2
Writing transa 2
   test
0     0
1     1
2     2
3    54
Execution of transa 2 finished.
Validate transa 1
Writing transa 1
   test
0     0
1     1
2     2
3    54
Execution of transa 1 finished.


Unnamed: 0,test
0,0
1,1
2,2
3,54
