<h1>Cockroach Developer Success - Cockroach Bank</h1>

<p>In this Notebook we will turn your CC Free Tier cluster into a bank!!</p>
<p>Firstly you will need to log in using an admin user to create the required database and user:</p><br>
<code>cockroach sql --url "postgresql://USER:PASSWORD@ADDRESS:26257/defaultdb?sslmode=require&options=--cluster=CLUSTER_NAME"</code>
<p>
<code>CREATE DATABASE bank;
USE bank;
CREATE TABLE accounts (
      id INT8 NOT NULL,
      balance INT8 NULL,
      CONSTRAINT "primary" PRIMARY KEY (id ASC),
      FAMILY "primary" (id, balance)
  );
INSERT INTO accounts VALUES (1,400.00), (2,450.00), (3,500.00), (4,550.00), (5,600.00); 
CREATE TABLE public.payments (
      payment_id UUID NOT NULL DEFAULT gen_random_uuid(),
      from_account INT8 NULL,
      to_account INT8 NULL,
      amount DECIMAL(10,2) NULL,
      settlement_date DATE NULL,
      created_at TIMESTAMP NULL DEFAULT now():::TIMESTAMP,
      CONSTRAINT "primary" PRIMARY KEY (payment_id ASC),
      CONSTRAINT fk_from_account_ref_accounts FOREIGN KEY (from_account) REFERENCES public.accounts(id),
      CONSTRAINT fk_to_account_ref_accounts FOREIGN KEY (to_account) REFERENCES public.accounts(id),
      FAMILY "primary" (payment_id, from_account, to_account, amount, settlement_date, created_at)
  );
CREATE USER bank_user WITH PASSWORD "L0ads0fc4sh";
GRANT ALL ON DATABASE bank TO bank_user;
GRANT ALL ON bank.public.* TO bank_user;
</code>
<p>You will need to make sure that you have installed all the libraries below using <code>pip install</code></p>

In [None]:
import time
import random
import logging
from datetime import datetime, date, timedelta
from argparse import ArgumentParser, RawTextHelpFormatter

import psycopg2
from psycopg2.errors import SerializationFailure

<p>Set the following to your CC Free Tier cluser address and name ...</p>

In [None]:
cc_address = 'xxxxxxxxxxxx.cockroachlabs.cloud'
cc_cluster_name = 'xxxxxxxx'

<p>Execute the following cell to set up the core banking functions ...</p>

In [None]:
def connect_to_CRDB():
    conn_str = "host='{}' port='26257' dbname='{}.bank'".format(cc_address, cc_cluster_name)
    conn_str += " user='bank_user' password='L0ads0fc4sh' sslmode='require'"
    conn = psycopg2.connect(conn_str)
    conn.autocommit = False
    with conn.cursor() as cur:
        cur.execute("SET application_name = 'Python-Bank-App'")
    return conn
    
        
def print_balances(conn):
    with conn.cursor() as cur:
        cur.execute("SELECT id, balance FROM accounts")
        logging.debug("print_balances(): status message: %s", cur.statusmessage)
        rows = cur.fetchall()
        conn.commit()
        print(f"Balances at {time.asctime()}:")
        for row in rows:
            print(row)

def transfer_funds(conn, frm, to, amount):
    with conn.cursor() as cur:

        # Check the current balance.
        cur.execute("SELECT balance FROM accounts WHERE id = %s", (frm,))
        from_balance = cur.fetchone()[0]
        if from_balance < amount:
            raise RuntimeError(
                f"Insufficient funds in {frm}: have {from_balance}, need {amount}"
            )

        # Perform the transfer.
        cur.execute("UPDATE accounts SET balance = balance - %s WHERE id = %s", (amount, frm))
        cur.execute("UPDATE accounts SET balance = balance + %s WHERE id = %s", (amount, to))
        cur.execute("""
        INSERT INTO payments (from_account, to_account, amount, settlement_date)
                        VALUES (%s, %s, %s, (now()+'2 days')::DATE)
        """, (frm, to, amount))

    conn.commit()
    logging.debug("transfer_funds(): status message: %s", cur.statusmessage)
    
def run_transaction(conn, op, max_retries=3):
    """
    Execute the operation *op(conn)* retrying serialization failure.

    If the database returns an error asking to retry the transaction, retry it
    *max_retries* times before giving up (and propagate it).
    """
    # leaving this block the transaction will commit or rollback
    # (if leaving with an exception)
    with conn:
        for retry in range(1, max_retries + 1):
            try:
                op(conn)

                # If we reach this point, we were able to commit, so we break
                # from the retry loop.
                return

            except SerializationFailure as e:
                # This is a retry error, so we roll back the current
                # transaction and sleep for a bit before retrying. The
                # sleep time increases for each failed transaction.
                logging.debug("got error: %s", e)
                conn.rollback()
                logging.debug("EXECUTE SERIALIZATION_FAILURE BRANCH")
                sleep_ms = (2 ** retry) * 0.1 * (random.random() + 0.5)
                logging.debug("Sleeping %s seconds", sleep_ms)
                time.sleep(sleep_ms)

            except psycopg2.Error as e:
                logging.debug("got error: %s", e)
                logging.debug("EXECUTE NON-SERIALIZATION_FAILURE BRANCH")
                raise e

        raise ValueError(f"Transaction did not succeed after {max_retries} retries")
        
def test_retry_loop(conn):
    """
    Cause a seralization error in the connection.

    This function can be used to test retry logic.
    """
    with conn.cursor() as cur:
        # The first statement in a transaction can be retried transparently on
        # the server, so we need to add a dummy statement so that our
        # force_retry() statement isn't the first one.
        cur.execute("SELECT now()")
        cur.execute("SELECT crdb_internal.force_retry('1s'::INTERVAL)")
        logging.debug("test_retry_loop(): status message: %s", cur.statusmessage)
    


<p>Now let's connect to CockroachDB and print out all accounts with their current balances ...</p> 

In [None]:
cc=connect_to_CRDB()
print_balances(cc)

<p>The main function we will use is transfer_funds() which transfers money from one account to another ...</p>

In [None]:
transfer_funds(cc, 1, 2, 10.00)
print_balances(cc)

<p>Let's now simulate lots of balance transfers in an infinite loop ...</p>

In [None]:
from_account = 0
while True: 
    to_account = random.randrange(1,5)
    from_account = random.randrange(1,5)
    while (from_account == to_account):
        from_account = random.randrange(1,5)
    amount = random.randrange(1,10)
    transfer_funds(cc, from_account, to_account, amount)    
    

<p>This will now run silently forever - or until it encounters an error.</p>
<p>We can check that it is working correctly by executing the following statement in the <code>cockroach sql</code> CLI. If you run it twice a few seconds apart you should see the number of payments increasing :
<br><code>select count(*) from payments;</code>
<br><br>
<p>We can force an error to happen by executing a high priority transaction in the CLI using the following commands:</p>
<code>BEGIN PRIORITY HIGH;
update accounts set balance=balance+1 where id=1;</code>
<p>Then wait about 10 seconds before committing the transaction with:</p>
<code>COMMIT;</code>
<p>If the above cell does not error, then try again with a longer gap between the BEGIN/UPDATE and the COMMIT<p>
<p>The reason this happens is that Cockroach runs with the highest possible level of isolation - Serializable - which means that we must ensure that different transactions execute as if they had the cluster to themselves. This in turn means that two transactions that have read/written the same data in the same timeframe cannot both be allowed to succeed. One of them will fail - if both transactions are at the same priority then either could fail. In this case we elevated the priority of the CLI transaction to make sure that this succeeded and the notebook cell experienced the failure.</p>
<p>When this happens we need to roll back the failed transaction before we can do any more database work in the same session ...</p>

In [None]:
cc.rollback()

<p>Let's check that this was all working as expected and we have generated rows in the payments table ...</p>

In [None]:
with cc.cursor() as cur:
    cur.execute("SELECT from_account, to_account, amount, settlement_date FROM payments")
    payments = cur.fetchall()
    cc.commit()
    for payment in payments:
        print(payment)

<p>We can force an error to test our database code using the Cockroach crdb_internal.force_retry() function which is demonstrated in test_retry_loop()</p>
<p>We will need to run a rollback again afterwards ...</p>

In [None]:
test_retry_loop(cc)

In [None]:
cc.rollback()

<p>Let's try that infinite loop again, but this time wrap some retry logic around the balance transfer function using run_transaction() - it's really cool that in Python you can pass one function to another as an argument!!</p>
<p>This loop will be much more resilient run_transaction() will retry each balance transfer up to 3 times but this is completely configurable. Try to break it like before with a high priority transaction from the CLI or by cloning this notebook and running 2 database sessions simultaneously.</p>

In [None]:
from_account = 0
while True: 
    to_account = random.randrange(1,5)
    from_account = random.randrange(1,5)
    while (from_account == to_account):
        from_account = random.randrange(1,5)
    amount = random.randrange(1,10)
    run_transaction(cc, lambda conn: transfer_funds(conn,  from_account, to_account, amount))    
    

<p>You can stop the infinite loop manually using the square stop button on the toolbar above.</p>
<p>Close the connection to disconnect from the CC cluster ...</p>

In [None]:
cc.close()
cc