In [1]:
# !pip install -U psycopg2-binary
# !pip install -U jsondiff

In [2]:
import psycopg2
import jsondiff
import sqlite3
import json
import os
from datetime import date, datetime, timezone
import csv
import time

with open('./config.json') as f:
    config = json.load(f)

db_file = './app/patron_validation_data.db'

class SQLiteConnection:
    """
    Usage:
    with SQLiteConnection('your_database_path.sqlite') as conn:
        # Your database operations here
    """
    def __init__(self, db_path):
        self.db_path = db_path

    @staticmethod
    def compute_diff(new_json, old_json):
        try:
            diff_result = jsondiff.diff(
                new_json, 
                old_json, 
                load=True, 
                dump=True, 
                marshal=True, 
                syntax='explicit'
            )

            # return an empty json array if the result of the diff is None, 
            # ... or the json result of the diff 
            return str(diff_result) if diff_result is not None else str('{}')

        except Exception as e:
            return str('{}')

    def __enter__(self):
        self.conn = sqlite3.connect(self.db_path)
        self.conn.create_function('json_diff', 2, self.compute_diff)
        # run this sql script when the connection to the database is opened ... 
        sql = """\
        PRAGMA locking_mode=SHARED;    -- https://www.sqlite.org/lockingv3.html
        PRAGMA cache_size = 12800;     -- results in about 50MB of cache at the default 4KB page size
        -- PRAGMA foreign_keys = ON;   -- TODO consider turning this on, and then running a check ...       
        -- run a check ...
        -- PRAGMA foreign_key_check;
        """
        self.cursor = self.conn.cursor()
        self.cursor.executescript(sql)
        self.cursor.close()
        del(self.cursor)
        return self.conn

    def __exit__(self, exc_type, exc_val, exc_tb):
        # https://www.sqlite.org/pragma.html#pragma_analysis_limit
        sql = """\
        PRAGMA analysis_limit=-1;   -- no analysis limit
        PRAGMA optimize;            -- analyze 
        PRAGMA analysis_limit=2000; -- set analysis limit back to something more reasonable 
        """
        self.cursor = self.conn.cursor()
        self.cursor.executescript(sql)
        
        self.conn.close()

In [3]:
# set up table schemas, indexes, and triggers
with SQLiteConnection(db_file) as con:
    cursor = con.cursor()

    with open('./sql_schema.sql') as f:
        cursor.executescript(f.read())

    cursor.close()

In [4]:
def insert_patron_data(con, data, columns):
    """
    takes the patron data returned from the Sierra SQL query, and inserts it into the local database
    
    con            : sqlite connection,
    data           : data from the sierra result query
    columns        : the column names
    """

    sql_patron_insert = """\
    INSERT INTO patrons (
        patron_record_id, 
        patron_record_num, 
        campus_code, 
        barcode1, 
        home_library_code,
        ptype_code, 
        create_timestamp_utc, 
        delete_timestamp_utc, 
        update_timestamp_utc, 
        expire_timestamp_utc, 
        active_timestamp_utc,
        claims_returned_total, 
        owed_amt_cents, 
        mblock_code, 
        highest_level_overdue_num, 
        num_revisions
    ) 
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    ON CONFLICT(patron_record_id) DO UPDATE SET
        patron_record_num = excluded.patron_record_num,
        campus_code = excluded.campus_code,
        barcode1 = excluded.barcode1,
        home_library_code = excluded.home_library_code,
        ptype_code = excluded.ptype_code,
        create_timestamp_utc = excluded.create_timestamp_utc,
        delete_timestamp_utc = excluded.delete_timestamp_utc,
        update_timestamp_utc = excluded.update_timestamp_utc,
        expire_timestamp_utc = excluded.expire_timestamp_utc,
        active_timestamp_utc = excluded.active_timestamp_utc,
        claims_returned_total = excluded.claims_returned_total,
        owed_amt_cents = excluded.owed_amt_cents,
        mblock_code = excluded.mblock_code,
        highest_level_overdue_num = excluded.highest_level_overdue_num,
        num_revisions = excluded.num_revisions
    ;
    """

    try:
        con.executemany(
            sql_patron_insert, 
            (
                (
                    int(row[columns.index('patron_record_id')]) \
                        if row[columns.index('patron_record_id')] is not None else None,
                    int(row[columns.index('patron_record_num')]) \
                        if row[columns.index('patron_record_num')] is not None else None,
                    str(row[columns.index('campus_code')]) \
                        if row[columns.index('campus_code')] is not None else None,
                    str(row[columns.index('barcode1')]) \
                        if row[columns.index('barcode1')] is not None else None,
                    str(row[columns.index('home_library_code')]) \
                        if row[columns.index('home_library_code')] is not None else None,
                    str(row[columns.index('ptype_code')]) \
                        if row[columns.index('ptype_code')] is not None else None,
                    int(row[columns.index('create_timestamp_utc')]) \
                        if row[columns.index('create_timestamp_utc')] is not None else None,
                    int(row[columns.index('delete_timestamp_utc')]) \
                        if row[columns.index('delete_timestamp_utc')] is not None else None,
                    int(row[columns.index('update_timestamp_utc')]) \
                        if row[columns.index('update_timestamp_utc')] is not None else None,
                    int(row[columns.index('expire_timestamp_utc')]) \
                        if row[columns.index('expire_timestamp_utc')] is not None else None,
                    int(row[columns.index('active_timestamp_utc')]) \
                        if row[columns.index('active_timestamp_utc')] is not None else None,
                    int(row[columns.index('claims_returned_total')]) \
                        if row[columns.index('claims_returned_total')] is not None else None,
                    int(row[columns.index('owed_amt_cents')]) \
                        if row[columns.index('owed_amt_cents')] is not None else None,
                    str(row[columns.index('mblock_code')]) \
                        if row[columns.index('mblock_code')] is not None else None,
                    int(row[columns.index('highest_level_overdue_num')]) \
                        if row[columns.index('highest_level_overdue_num')] is not None else None,
                    int(row[columns.index('num_revisions')]) \
                        if row[columns.index('num_revisions')] is not None else None,
                )
                for row in data
            )
        )
    except Exception as e:
        print(e)


# insert json_data function ... 
def insert_json_data(con, data, columns, json_data_type):
    """
    con            : sqlite connection,
    data           : data from the sierra result query
    columns        : the column names
    json_data_type : the type of data
    ---
    json_data_type come from the Sierra SQL results (columns)
        patron_address_json,
        identifiers_json,
        phone_numbers_json,
        emails_json
    
    """

    sql = """\
    INSERT INTO patron_json_data (
        patron_record_id,
        json_data_type,
        json_data
    ) VALUES (
        ?,
        ?,
        CASE WHEN json_valid(?) THEN ? ELSE '{}' END
    )
    ON CONFLICT(patron_record_id, json_data_type) DO UPDATE SET
        json_data = CASE WHEN json_valid(excluded.json_data) THEN excluded.json_data ELSE '{}' END,
        update_timestamp_utc = strftime('%s', 'now')
    ;
    """
    try:
        con.executemany(
            sql,
            (
                (
                    int(row[columns.index('patron_record_id')]),  # patron_record_id
                    str(json_data_type),                          # json_data_type
                    str(row[columns.index(json_data_type)]),      # json_data
                    str(row[columns.index(json_data_type)]),      # json_data
                    # NOTE, we need to send the json_data 2x since it's referenced 2x in the statement
                )
                for row in data
            )
        )
        
    except Exception as e:
        print(f'{json_data_type} Error:', e)


def get_latest_update(cursor):
    """
    extract the latest UNIX timestamp from the sqlite database
    """
    
    latest_update = 0 # 0 is the start time of UNIX EPOCH

    sql = """\
    SELECT
        -- grab the last recent change we made, 
        max(patrons.update_timestamp_utc)
        
        -- if we want to buffer it by 10 minutes, use this
        -- max(patrons.update_timestamp_utc) - 600
    FROM
    	patrons
    LIMIT 1
    """
    try:
        cursor.execute(sql)
        result = cursor_sqlite.fetchone()
        if (
            len(result) == 1
            and result[0] is not None
        ):
            latest_update = str(result[0])

        # print(latest_update, '\n')
    except Exception as e:
        print(e)
        return(0)

    return(latest_update)



In [None]:
sql_patron_insert = """\
INSERT INTO patrons (
    patron_record_id, 
    patron_record_num, 
    campus_code, 
    barcode1, 
    home_library_code,
    ptype_code, 
    create_timestamp_utc, 
    delete_timestamp_utc, 
    update_timestamp_utc, 
    expire_timestamp_utc, 
    active_timestamp_utc,
    claims_returned_total, 
    owed_amt_cents, 
    mblock_code, 
    highest_level_overdue_num, 
    num_revisions
) 
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(patron_record_id) DO UPDATE SET
    patron_record_num = excluded.patron_record_num,
    campus_code = excluded.campus_code,
    barcode1 = excluded.barcode1,
    home_library_code = excluded.home_library_code,
    ptype_code = excluded.ptype_code,
    create_timestamp_utc = excluded.create_timestamp_utc,
    delete_timestamp_utc = excluded.delete_timestamp_utc,
    update_timestamp_utc = excluded.update_timestamp_utc,
    expire_timestamp_utc = excluded.expire_timestamp_utc,
    active_timestamp_utc = excluded.active_timestamp_utc,
    claims_returned_total = excluded.claims_returned_total,
    owed_amt_cents = excluded.owed_amt_cents,
    mblock_code = excluded.mblock_code,
    highest_level_overdue_num = excluded.highest_level_overdue_num,
    num_revisions = excluded.num_revisions
;
"""

# connect to both the sierra db, and the local sqlite db
with psycopg2.connect(dsn=config['dsn']) as con, \
SQLiteConnection(db_file) as con_sqlite:
    start_time = time.time()
    end_time = start_time + 60*60  # 60 minutes * 60 seconds/minute
    # end_time = start_time + 5*60

    while time.time() < end_time:
        cursor = con.cursor(name="named_cursor")
        cursor_sqlite = con_sqlite.cursor()
    
        # get the latest UNIX timestamp from the local database
        latest_update = get_latest_update(cursor_sqlite)
        print(latest_update, end=',')
    
        try:
            with open('./sierra_patron_data.sql') as f:
                cursor.execute(f.read(), (latest_update,))
        except Exception as e:
            print('Error executing sierra_patron_data.sql', e)
            
        i = 0
        try:
            while(data:=cursor.fetchmany(1000)):
                # get the columns from the cursor
                columns = [col[0] for col in cursor.description]
                
                # insert the appropriate data into each local sqlite table ..
                # patrons table
                insert_patron_data(con_sqlite, data, columns)
    
                # insert the json data
                # (NOTE: `json_data_type` is the row's column data from the Sierra 
                # SQL query that contains the JSON data of that type 
                insert_json_data(con_sqlite, data, columns, 'patron_address_json')
                insert_json_data(con_sqlite, data, columns, 'identifiers_json')
                insert_json_data(con_sqlite, data, columns, 'phone_numbers_json')
                insert_json_data(con_sqlite, data, columns, 'emails_json')
                
                i+=1
                print('.', end='')
        except Exception as e:
            print('Error:', e)
              
        # print('done. ', i)
        con_sqlite.commit()
        cursor_sqlite.close()
        cursor.close()
        del(cursor_sqlite)
        del(cursor)
        time.sleep(5)

1692032601,.1692032745,.1692032751,.1692032761,.1692032769,.1692032777,.1692032785,.1692032793,.1692032801,.1692032809,.1692032816,.1692032826,.1692032834,.1692032842,.1692032849,.1692032857,.1692032864,.1692032873,.1692032881,.1692032888,.1692032896,.1692032904,.1692032912,.1692032920,.1692032928,.1692032935,.1692032943,.1692032951,.1692032959,.1692032967,.1692032975,.1692032982,.1692032990,.1692032998,.1692033006,.1692033014,.1692033022,.1692033029,.1692033037,.1692033045,.1692033054,.1692033062,.1692033070,.1692033077,.1692033086,.1692033094,.1692033101,.1692033109,.1692033117,.1692033125,.1692033133,.1692033140,.1692033148,.1692033156,.1692033163,.1692033172,.1692033179,.1692033187,.1692033195,.1692033203,.1692033211,.1692033218,.1692033226,.1692033234,.1692033242,.1692033250,.1692033259,.1692033265,.1692033273,.1692033282,.1692033290,.1692033298,.1692033306,.1692033314,.1692033322,.1692033330,.1692033338,.1692033345,.1692033351,.1692033362,.1692033370,.1692033377,.1692033385,.1692