In [None]:
import JupyterFormatter
JupyterFormatter.enable_autoformat();

In [None]:
const DB_CONNECTION_STRING = read("../../../environment/database/primary.txt", String);

In [None]:
import Dates
import LibPQ
import SHA
import JSON3

const STDOUT_LOCK = ReentrantLock()

function logerror(x::String)
    Threads.lock(STDOUT_LOCK) do
        println("$(Dates.now()) [ERROR] $x")
        println(catch_backtrace())
    end
end

const CONNECTION = LibPQ.Connection(DB_CONNECTION_STRING)
const CONNECTION_LOCK = ReentrantLock()

function db_update_junction_table(
    primary_table::String,
    junction_table::String,
    idcols::Vector{String},
    idvals::Vector,
    primary_data::Union{Dict,Nothing},
    junction_data::Union{Vector,Nothing},
    success::Bool,
)
    lock(CONNECTION_LOCK) do
        try
            db_update_junction_table(
                CONNECTION,
                primary_table,
                junction_table,
                idcols,
                idvals,
                primary_data,
                junction_data,
                success,
            )
        catch
            logerror("db_update_junction_table connection error $idcols $idvals")
            LibPQ.reset!(CONNECTION; throw_error = false)
        end
    end
end

function db_update_primary_table(
    table::String,
    idcol::String,
    idval::Union{String,Int},
    data::Dict,
    success::Bool,
)
    lock(CONNECTION_LOCK) do
        try
            db_update_primary_table(CONNECTION, table, idcol, idval, data, success)
        catch
            logerror("db_update_primary_table connection error $idcol $idval")
            LibPQ.reset!(CONNECTION; throw_error = false)
        end
    end
end

function canonical_hash(d::AbstractDict)
    canon(x::AbstractDict) = Dict(k => canon(x[k]) for k in sort(collect(keys(x))))
    canon(x::AbstractVector) = [canon(y) for y in x]
    canon(x) = x
    bytes2hex(SHA.sha256(JSON3.write(canon(d))))
end
function canonical_hash(d::AbstractVector)
    hashes = sort([canonical_hash(x) for x in d])
    bytes2hex(SHA.sha256(JSON3.write(hashes)))
end
function normalize(d::AbstractDict)
    norm(x::AbstractDict) = JSON3.write(x)
    norm(x::AbstractVector) = JSON3.write(x)
    norm(::Nothing) = missing
    norm(x) = x
    Dict(k => norm(v) for (k, v) in d)
end
normalize(d::AbstractVector) = [normalize(x) for x in d]

function db_upsert(
    conn::LibPQ.Connection,
    table::String,
    idcols::Vector{String},
    data::Dict,
    conflict_rules::Dict = Dict(),
)
    data = normalize(data)
    cols = collect(keys(data))
    vals = Tuple(data[k] for k in cols)
    col_str = join(cols, ", ")
    idcol_str = join(idcols, ", ")
    placeholders = join(["\$" * string(i) for i = 1:length(vals)], ", ")
    conflict_rules =
        merge(Dict(c => c * " = EXCLUDED." * c for c in cols if c ∉ idcols), conflict_rules)
    update_assignments = join(collect(values(conflict_rules)), ", ")
    query = """
    INSERT INTO $table ($col_str) VALUES ($placeholders)
    ON CONFLICT ($idcol_str) DO UPDATE SET $update_assignments;
    """
    LibPQ.execute(conn, query, vals)
end;

function db_update_primary_table(
    conn::LibPQ.Connection,
    table::String,
    idcol::String,
    idval::Union{String,Int},
    data::Dict,
    success::Bool,
)
    curtime = time()
    if !success
        failure_data = Dict(
            idcol => idval,
            "db_refreshed_at" => curtime,
            "db_consecutive_failures" => 1,
        )
        db_upsert(
            conn,
            table,
            [idcol],
            failure_data,
            Dict(
                "db_consecutive_failures" => "db_consecutive_failures = $table.db_consecutive_failures + 1",
            ),
        )
        return
    end
    hash = canonical_hash(data)
    let
        col_str = join([idcol, "db_entry_hash"], ", ")
        refresh = LibPQ.execute(
            conn,
            """
            UPDATE $table
            SET db_refreshed_at = \$1,
            db_last_success_at = \$1,
            db_consecutive_failures = 0
            WHERE ($col_str) = (\$2, \$3);
            """,
            (curtime, idval, hash),
        )
        if LibPQ.num_affected_rows(refresh) > 0
            return
        end
    end
    db_metadata = Dict(
        "db_refreshed_at" => curtime,
        "db_last_changed_at" => curtime,
        "db_entry_hash" => hash,
        "db_last_success_at" => curtime,
        "db_consecutive_failures" => 0,
    )
    db_upsert(conn, table, [idcol], merge(data, db_metadata))
end

function db_update_junction_table(
    conn::LibPQ.Connection,
    primary_table::String,
    junction_table::String,
    idcols::Vector{String},
    idvals::Vector,
    primary_data::Union{Dict,Nothing},
    junction_data::Union{Vector,Nothing},
    success::Bool,
)
    curtime = time()
    if !success
        failure_data = Dict(
            (idcols .=> idvals)...,
            "db_refreshed_at" => curtime,
            "db_consecutive_failures" => 1,
        )
        db_upsert(
            conn,
            primary_table,
            idcols,
            failure_data,
            Dict(
                "db_consecutive_failures" => "db_consecutive_failures = $primary_table.db_consecutive_failures + 1",
            ),
        )
        return
    end
    primary_hash = canonical_hash(primary_data)
    if isnothing(junction_data)
        let
            col_list = [idcols..., "db_primary_hash"]
            col_str = join(col_list, ", ")
            placeholders = join(["\$" * string(i + 1) for i = 1:length(col_list)], ", ")
            refresh = LibPQ.execute(
                conn,
                """
                UPDATE $primary_table
                SET db_refreshed_at = \$1,
                db_last_success_at = \$1,
                db_consecutive_failures = 0
                WHERE ($col_str) = ($placeholders);
                """,
                (curtime, idvals..., primary_hash),
            )
            if LibPQ.num_affected_rows(refresh) > 0
                return
            end
        end
        db_metadata = Dict(
            "db_refreshed_at" => curtime,
            "db_primary_last_changed_at" => curtime,
            "db_primary_hash" => primary_hash,
            "db_last_success_at" => curtime,
            "db_consecutive_failures" => 0,
        )
        db_upsert(conn, primary_table, idcols, merge(primary_data, db_metadata))
        return
    end
    junction_hash = canonical_hash(junction_data)
    junction_data = normalize(junction_data)
    let
        col_list = [idcols..., "db_primary_hash", "db_junction_hash"]
        col_str = join(col_list, ", ")
        placeholders = join(["\$" * string(i + 1) for i = 1:length(col_list)], ", ")
        refresh = LibPQ.execute(
            conn,
            """
            UPDATE $primary_table
            SET db_refreshed_at = \$1,
            db_last_success_at = \$1,
            db_consecutive_failures = 0
            WHERE ($col_str) = ($placeholders);
            """,
            (curtime, idvals..., primary_hash, junction_hash),
        )
        if LibPQ.num_affected_rows(refresh) > 0
            return
        end
    end
    LibPQ.execute(conn, "BEGIN;")
    try
        db_metadata = Dict(
            "db_refreshed_at" => curtime,
            "db_primary_last_changed_at" => curtime,
            "db_primary_hash" => primary_hash,
            "db_junction_last_changed_at" => curtime,
            "db_junction_hash" => junction_hash,
            "db_last_success_at" => curtime,
            "db_consecutive_failures" => 0,
        )
        db_upsert(conn, primary_table, idcols, merge(primary_data, db_metadata))
        col_str = join(idcols, ", ")
        placeholders = join(["\$" * string(i) for i = 1:length(idcols)], ", ")
        LibPQ.execute(
            conn,
            "DELETE FROM $junction_table WHERE ($col_str) = ($placeholders);",
            idvals,
        )
        col_list = collect(keys(first(junction_data)))
        col_str = join(col_list, ", ")
        placeholders = join(["\$" * string(i) for i = 1:length(col_list)], ", ")
        LibPQ.load!(
            (; (Symbol(k) => [x[k] for x in junction_data] for k in col_list)...),
            conn,
            "INSERT INTO $junction_table ($col_str) VALUES ($placeholders);",
        )
    finally
        LibPQ.execute(conn, "END")
    end
end;