In [None]:
import psycopg2
import datetime
import time
import threading

In [None]:
db_connection = "dbname=ucu_ddb_t2 user=postgres password=***"

In [None]:
conn = psycopg2.connect(db_connection)
cur = conn.cursor()
dataCreatioinQuery = """

CREATE SCHEMA IF NOT EXISTS fly AUTHORIZATION postgres;
CREATE SCHEMA IF NOT EXISTS hotel AUTHORIZATION postgres;
CREATE SCHEMA IF NOT EXISTS account AUTHORIZATION postgres;

DROP TABLE IF EXISTS fly.booking;
DROP TABLE IF EXISTS hotel.booking;
DROP TABLE IF EXISTS account.balance;

CREATE TABLE fly.Booking (
	BookingID  serial primary key,
	OrderID char(12) unique,
	ClientName varchar (255) not null,
	FlyNumber varchar(10) not null,
	FromIATA char(3) not null,
	ToIATA char(3) not null,
	Date date not null
);

CREATE TABLE hotel.Booking (
	BookingID  serial primary key,
	OrderID char(12) unique,
	ClientName varchar (255) not null,
	HotelName varchar(255) not null,
	Arrival date not null,
	Departure date not null
);

CREATE TABLE account.balance (
	AccountID  serial primary key,
	ClientName varchar (255) not null,
	Amount decimal(15,2) CHECK (Amount >= 0)
) ;

INSERT INTO account.balance (ClientName, Amount)  VALUES ('Nick', 200);

"""

cur.execute(dataCreatioinQuery)
conn.commit()
conn.close()

In [None]:
# Configuring participandsqueries for each process
parts = {
    "fly": {
        "process": "Booking tickets",
        "query": """ INSERT INTO fly.booking (OrderID, ClientName, FlyNumber, FromIATA, ToIATA, Date) 
                     VALUES (%(OrderID)s, %(ClientName)s, %(FlyNumber)s, %(From)s, %(To)s, %(FlightDate)s); """
    },
    "hotel": {
        "process": "Booking hotel",
        "query": """ INSERT INTO hotel.booking (OrderID, ClientName, HotelName, Arrival, Departure) 
                     VALUES (%(OrderID)s, %(ClientName)s, %(Hotel)s, %(ArrivalDate)s, %(DepartureDate)s); """
    },
    "account": {
        "process": "Payment processing",
        "query": """ UPDATE account.balance 
                     SET amount = amount - %(Amount)s where AccountID = %(AccoundID)s;
                """
    }
}

In [None]:
def Prepare2PC(order, part, results):
    global db_connection, parts

    # connection to database
    conn = psycopg2.connect(db_connection)
    cur = conn.cursor()

    transaction_id = conn.xid(1, parts[part]['process'], order['OrderID'])
    
    try: # transaction preparation
        conn.tpc_begin(transaction_id)
        cur.execute(parts[part]["query"], order)
        conn.tpc_prepare()
        result = {"process": parts[part]['process'], "status": "WAIT", "details": "OK", "transaction_id": transaction_id}
        print(parts[part]['process'] + ' - WAIT for commit or rollback')

    except Exception as ex: # exception handling with passing ABORT status
        result = {"process": parts[part]['process'], "status": "ABORT", "details": str(ex), "transaction_id": transaction_id}
        print(parts[part]['process'] + ' - ABORT - ' + str(ex))

    results[part] = result
    conn.close()


In [None]:
def Finish2PC(transaction, part,  decision, testLock):
    global db_connection, parts
    
    # condition to simulate coonection lock
    if part in testLock:
        return

    conn = psycopg2.connect(db_connection)
    if decision == 'COMMIT':
        conn.tpc_commit(transaction[part]['transaction_id'])
        transaction[part]['status'] = decision
    elif decision == 'ABORT':
        conn.tpc_rollback(transaction[part]['transaction_id'])
        transaction[part]['status'] = decision
    else:
        print ("Decision could accept values COMMIT or ABORT only")
        return 
    
    print(parts[part]['process'] +' - ' + decision + 'ed')
    conn.close()

In [None]:
order = {
    "OrderID": "20211217-002",
    "ClientName": "Nik",
    "AccoundID": 1,
    "From": "KBP",
    "To": "AMS",
    "FlyNumber": "PS 1442",
    "FlightDate": datetime.datetime(2021, 12, 31),
    "Hotel": "Super puper Luxury",
    "ArrivalDate": datetime.datetime(2021, 12, 31),
    "DepartureDate": datetime.datetime(2022, 1, 4),
    "Amount": 100.00
}

In [None]:
results = {}

print('Preparing 2PC transaction for Order #'+order['OrderID'])

# Preparing transactions in separate threads for each participant
for p in parts:
    prepareThread = threading.Thread(target=Prepare2PC, args=(order, p, results, ))
    prepareThread.start()

# Waiting for responces from all participants
while True:
    time.sleep(1)
    if len(results.keys()) == len(parts):
        break
    else:
        statuses = ''
        for p in parts:
            status = ' - PREPARED; ' if p in results else ' - NOT RESPONDED YET; ' 
            statuses = statuses + parts[p]['process'] + status
        print ('Waiting for all responces ... ' + statuses) 
        

In [None]:
# Decision to COMMIT or ABORT 
decision = 'COMMIT'
for p in results:
    if results[p]['status'] != 'WAIT':
        decision = 'ABORT'
        break
print ('Transaction decision - ' + decision)

In [None]:
print('Finish 2PC Trnasaction #'+order['OrderID'] + 'with decision to '+ decision)
for p in parts:
    if results[p]['status']=='WAIT':
        rt = threading.Thread(target=Finish2PC, args=(results, p, decision, ['account'], ))
        rt.start()

In [None]:
# Block of code to look for prepared and unfinished transactions
conn = psycopg2.connect(db_connection)
conn.tpc_recover()


In [None]:
# closing prepared transactions
decision = 'COMMIT'
for t in conn.tpc_recover():
    if decision == 'COMMIT':
        conn.tpc_commit(t)
    elif decision == 'ABORT':
        conn.tpc_rollback(t)
    else:
        print ("Decision could accept values COMMIT or ABORT only")
    
conn.close()