In [1]:
from scripts.postgres_connect import *
path = 'tunnel-ssh.cer'

In [2]:
def return_query(script_path: str):
    query = open(script_path).read()
    # j = JinjaSql(param_style='pyformat')
    # query, bind_params = j.prepare_query(sql_query, params)
    # query = get_sql_from_template(query, bind_params)
    return query

def execute_query(sql_query, host, database, fetchall = False):
    conn = get_conn2('Yes','tunnel-ssh.cer',host,database)
    cursor = conn.cursor()

    try:
        cursor.execute(sql_query)
        conn.commit()

        if fetchall:
            rows = cursor.fetchall()
            return rows
        else:
            # For queries that don't fetch data (e.g., UPDATE, INSERT), return None
            return None
    except Exception as e:
        conn.rollback()
        print(f"Error executing query: {e}")
        raise RuntimeError(e)
    finally:
        cursor.close()
        conn.close()

def write_df(host,database,key_path, table_name: str, df_op: pd.DataFrame, if_exists: str, schema: str="public"):
    """
    Bulk writing of Pandas Dataframe to postgres Table
    """
    conn = get_conn2('Yes',key_path,host,database)
    df_op.to_sql(
        name=table_name, con=conn, schema=schema, chunksize=10000, if_exists=if_exists, index=False
    )

def insert_on_conflict_data(
    temp_table, dest_table, host,database, conflict_key, update_cond=None, update=True, exclude_cols = []
):
    """
    Function to generate upsertion query as per requirements. 
    """
    ## updation_part : EXCLUDED.last_update_time>t.last_update_time
    ## (t.modified_date IS NULL AND EXCLUDED.modified_date IS NOT NULL) OR EXCLUDED.modified_date>t.modified_date
    ## Updation Part t for temp table, excluded part using EXCLUDED

    columns_query = f"SELECT * FROM {temp_table} LIMIT 1"
    conn = get_conn2('Yes','tunnel-ssh.cer',host,database)
    df_columns = pd.read_sql(columns_query, conn)
    conn.close()
    columns = list(df_columns.columns)
    if len(exclude_cols):
        columns = [c for c in columns if c not in exclude_cols]
    columns_list = ", ".join(columns)
    print("Columns :", columns_list)
    update_part = ", ".join([f"{k}=EXCLUDED.{k}" for k in list(columns)])
    if update:
        if update_cond:
            sql_query = f"""INSERT INTO {dest_table} AS t
                    (SELECT 
                        {columns_list}
                    FROM (
                        SELECT DISTINCT *
                        FROM {temp_table}
                    ) AS distinct_items)
                    ON CONFLICT ("{conflict_key}") DO UPDATE
                    SET 
                        {update_part}
                    WHERE
                        ({update_cond})
                """
        else:
            sql_query = f"""INSERT INTO {dest_table} AS t
                    (SELECT 
                        {columns_list}
                    FROM (
                        SELECT DISTINCT *
                        FROM {temp_table}
                    ) AS distinct_items)
                    ON CONFLICT ("{conflict_key}") DO UPDATE
                    SET 
                        {update_part}
                """
    else:
        sql_query = f"""INSERT INTO {dest_table} AS t
                    (SELECT 
                        {columns_list}
                    FROM (
                        SELECT DISTINCT *
                        FROM {temp_table}
                    ) AS distinct_items)
                    ON CONFLICT ("{conflict_key}") DO NOTHING
                """
    print(sql_query)

    return sql_query

def temp_to_main_table(query, host, database, temp_table):
    """
    Given a quey to merge the tenp table with main table, we merge to main table
    and drop the temp table
    """
    conn = get_conn2(query, host, database, temp_table)
    with conn.cursor() as cur:
        cur.execute(query)
    conn.commit()

    with conn.cursor() as cur:
        cur.execute(f"DROP TABLE IF EXISTS {temp_table} CASCADE")
    conn.commit()
    conn.close()

def insert_data(temp_table, dest_table, host,database, conflict_key, update_cond=None, update=True, exclude_cols = []):
    sql_query = insert_on_conflict_data(
        temp_table, dest_table, host,database, conflict_key, update_cond, update, exclude_cols
    )
    temp_to_main_table(sql_query, host, database, temp_table)


In [3]:
def load_data_to_temp(sql_stmt, source_host, source_database, dest_host, dest_database, temp_table_name):
    """
    Helper function which clubs two actions:
    1. Reading from a source DB (using SQL)
    2. Writing to a destination
    This function will append to existing data
    """
    source_conn = get_conn2('Yes','tunnel-ssh.cer',source_host,source_database)
    df = pd.read_sql(
        sql_stmt,
        source_conn,
    )
    print(sql_stmt)
    print(df.shape)
    source_conn.close()

    write_df(dest_host,dest_database,'tunnel-ssh.cer', temp_table_name, df, "append")

# def load_data_to_temp_in_chunks(sql_stmt, source_conn_id, dest_conn_id, temp_table_name):
#     """
#     Helper function which clubs two actions:
#     1. Reading from a source DB (using SQL)
#     2. Writing to a destination in chunks
#     This function will append to existing data
#     """

#     #Source connection
#     pg_hook_source = PostgresHook(postgres_conn_id=source_conn_id)
#     postgres_engine = create_engine(
#         pg_hook_source.get_uri().replace("postgres://", "postgresql://"), echo=False, pool_pre_ping=True,
#         pool_recycle=3000
#     )
#     print(sql_stmt)

#     # Chunk the data
#     conn = postgres_engine.connect().execution_options(
#         stream_results=True)

#     #Destination Connection
#     pg_hook_dest = PostgresHook(postgres_conn_id=dest_conn_id)
#     print(pg_hook_dest.get_uri().replace("postgres://", "postgresql://"))

#     try:
#         for df in pd.read_sql(sql_stmt, conn,chunksize=10000):
#             print(df.shape)
#             write_df(pg_hook_dest, temp_table_name, df,"append")
#     finally:
#         print("loading data to temp table successful")
#         conn.close()
#         postgres_engine.dispose()

In [4]:
def create_tracking_events_temp():
    tracking_events_table = """
        CREATE TABLE IF NOT EXISTS public.tracking_events_temp
        (
            shipment_id bigint NOT NULL,
            out_for_pickup_time timestamp without time zone,
            pickup_time timestamp without time zone,
            last_mile_time timestamp without time zone,
            no_attempts double precision,
            first_ofd timestamp without time zone,
            last_ofd timestamp without time zone,
            first_failed_delivered timestamp without time zone,
            last_failed_delivered timestamp without time zone,
            first_failed_delivered_remarks text COLLATE pg_catalog."default",
            last_failed_delivered_remarks text COLLATE pg_catalog."default",
            delivered_time timestamp without time zone,
            rto_initiated_time timestamp without time zone,
            rto_delivered_time timestamp without time zone,
            last_update_time timestamp without time zone,
            last_remark text COLLATE pg_catalog."default",
            closed bigint,
            first_pickup_attempt_time timestamp without time zone,
            last_pickup_attempt_time timestamp without time zone,
            no_pickup_attempts integer,
            last_location character varying COLLATE pg_catalog."default",
            ndr_initiated_time timestamp without time zone,
            last_scan_location character varying COLLATE pg_catalog."default",
            rto_ofd_time timestamp without time zone,
            rto_freeze_time timestamp without time zone,
            sort_to_bin_time timestamp without time zone,
            lost_mark_time timestamp without time zone,
            hub_in_scan_time timestamp without time zone,
            cfc_pickup_time timestamp without time zone,
            addtobag_time timestamp without time zone,
	        in_transit_time timestamp without time zone,
	        dest_city_time timestamp without time zone,
	        intracity_in_transit_time timestamp without time zone,
            intercity_in_transit_time timestamp without time zone
        )
    """
    execute_query('DROP TABLE IF EXISTS tracking_events_temp','datawarehouse','postgres')
    execute_query(tracking_events_table,'datawarehouse','postgres')


In [5]:
def load_data_tracking_events():

    query = return_query(
        script_path="tracking_events.sql",
    )
    print(query)
    print('Tracking Events - Data Loaded from query')
    create_tracking_events_temp()
    print('Tracking Events - Temp Table Created')
    # load_data_to_temp_in_chunks(
    #     sql_stmt=query,
    #     source_conn_id="postgres_reap_replica_postgres",
    #     dest_conn_id="postgres_datawarehouse_public",
    #     temp_table_name="tracking_events_temp",
    # )
    load_data_to_temp(
        sql_stmt = query,
        source_host = 'read_replica',
        source_database = 'postgres',
        dest_host = 'datawarehouse',
        dest_database = 'postgres',
        temp_table_name = 'tracking_events_temp',
    )
    print('Tracking Events - Merged')

In [6]:
def merge_tracking_events():
    insert_data(
        temp_table="tracking_events_temp",
        dest_table="tracking_events",  ## Revert
        conn_id="postgres_datawarehouse_public",
        conflict_key="shipment_id",
        update_cond=None,
    )


In [7]:
load_data_tracking_events()

WITH rel_shipments AS
	(SELECT
        DISTINCT shipment_id
    FROM
        public.order_tracking
    WHERE
		--  (	
        --     -- ( --to update for a past interval with last four hours included. Comment this post usage and uncomment the regular expression
		-- 		-- (update_time::date >= '2024-01-01'::date OR timestamp::date >='2024-01-01'::date)
		-- 		-- AND 
		-- 		-- (update_time::date<'2024-01-02'::date OR timestamp>='2024-01-02'::date)
		-- 	-- ) 
		-- 	-- OR
		-- 		-- (update_time>=(NOW() - interval '4 hours') OR timestamp>=(NOW() - interval '4 hours'))
		-- )
		(update_time>=(NOW() - interval '4 hours') OR timestamp>=(NOW() - interval '4 hours'))
		-- (date_trunc('day',update_time) >= '2024-06-25')
        AND user_id NOT IN (1, 2, 3, 10, 11, 42)
		),
rel_order_tracking AS 
	(SELECT
	 	*
	 FROM
		(SELECT 
		order_tracking.tracking_id AS tracking_id, 
		order_tracking.shipment_id AS shipment_id, 
		order_tracking.shipper_id AS shipper_id, 
		CASE 
			WHEN order_tracking.shi

DatabaseError: Execution failed on sql '
        SELECT
            name
        FROM
            sqlite_master
        WHERE
            type IN ('table', 'view')
            AND name=?;
        ': relation "sqlite_master" does not exist
LINE 5:             sqlite_master
                    ^
