# Init the connection to Postgres Server 

In [None]:
import psycopg2

conn = psycopg2.connect(
    host="db", # use here as a host name, service's name defined in docker-compose .yaml file
    database="postgres", 
    user="postgres", 
    password="postgres", 
    port=5432
)
conn.set_session(autocommit=True)

## Define helper functions

In [None]:
def execute_sql(conn, sql):
    with conn:
        with conn.cursor() as cur:
            cur.execute(sql)
            return cur.fetchall()

        
def check_dst_table(conn):
    cur = conn.cursor()
    cur.execute('SELECT * FROM dest_person')
    rows = cur.fetchall()
    for row in rows:
        print(row)
        
def get_current_watermark(conn):
    result = execute_sql(conn, 'SELECT max(last_modified_time) FROM src_person')
    return result[0][0]


def get_old_watermark(conn):
    result = execute_sql(conn, 'SELECT max(watermark_value) FROM watermarktable')
    return result[0][0]


def update_watermark(conn, new_watermark):
    cur = conn.cursor()
    cur.execute(f"call update_watermark_table('{new_watermark}');")
    
    
def copy_data(conn, old_watermark, current_watermark):
    sql = f"""
    insert into dest_person
    select * from src_person 
    where 
        last_modified_time > '{old_watermark}'
        and last_modified_time <= '{current_watermark}'
    """
    cur = conn.cursor()
    cur.execute(sql)
    num_of_rows_affected = cur.rowcount
    print(f"* Rows affected: {num_of_rows_affected}")
    

def copy_pipeline(conn):
    # watermark from watermark table
    old_watermark = get_old_watermark(conn)

    # current watermark from current table with data 
    current_watermark = get_current_watermark(conn)

    copy_data(conn, old_watermark, current_watermark)

    update_watermark(conn, new_watermark=current_watermark)

# Run demo

In [None]:
# check if destination table is empty
content = check_dst_table(conn)
print(content)

In [None]:
# check timestamp from watermark table, it has to be the 1/1/2010 12:00:00 AM, as this 
# value was set during during docker initialization
watermark = get_old_watermark(conn)
print(watermark)

In [None]:
# copy everything from src table to dest table
copy_pipeline(conn)

In [None]:
# all rows were transfered from source table to destination table 
check_dst_table(conn)

### Insert one new data record into source table

In [None]:
sql = """
    INSERT INTO src_person
    VALUES (10, 'newdata1','9/10/2017 2:23:00 AM')
"""
cur = conn.cursor()
cur.execute(sql)

### Executing copy pipeline, it has to copy only one record

In [None]:
copy_pipeline(conn)

### Checking if new record was added

In [None]:
check_dst_table(conn)

### Inserting another two records into source table

In [None]:
sql = """
    INSERT INTO src_person
    VALUES 
        (11, 'newdata2','9/11/2017 9:01:00 AM'), 
        (12, 'newdata2','9/12/2017 9:01:00 AM') 
"""
cur = conn.cursor()
cur.execute(sql)

### After execution of `copy_pipeline` new two records have to be transferred

In [None]:
copy_pipeline(conn)

In [None]:
check_dst_table(conn)

## Restart demo

### to repeat the demo, execute the following function, it will set everything to the initial state

In [None]:
def restart_demo():
    sql = """
    truncate table src_person;
    truncate table dest_person;
    truncate table watermarktable;
 
    INSERT INTO watermarktable
    VALUES ('1/1/2010 12:00:00 AM');
    
    insert into src_person (person_id, name, last_modified_time) 
    values
        (1, 'aaaa', '2017-09-01 00:56:00.000'),
        (2, 'bbbb', '2017-09-02 05:23:00.000'),
        (3, 'cccc', '2017-09-03 02:36:00.000'),
        (4, 'dddd', '2017-09-04 03:21:00.000'),
        (5, 'eeee', '2017-09-05 08:06:00.000'),
        (6, 'fffffff', '2017-09-06 02:23:00.000'),
        (7, 'gggg', '2017-09-07 09:01:00.000'),
        (8, 'hhhh', '2017-09-08 09:01:00.000'),
        (9, 'iiiiiiiii', '2017-09-09 09:01:00.000');
    
    """
    cur = conn.cursor()
    cur.execute(sql)

restart_demo()