# Objective 
Run test cases for different thread counts and different db sizes
Initial run 'assumes' that the 16-year data set has already been downloaded and built as parquet files. 

In [3]:
# import required services
import datetime
import json
import platform
import time
import warnings
from pathlib import Path
import os
import glob
import shutil

import click
import duckdb
import ibis
import psutil
from jinja2 import Template
from memory_profiler import memory_usage

In [4]:
def setThreadsAndYears(threads, years):
    global env
    env = {}
    # specify benchmark characteristics
    # number of processors to use for this run
    env["thread_count"] = threads
    # number of years to use for this run
    env["year_count"] = years
    # execution mode
    env["mode"] = "sql"

    # set up the select criteria for finding the correct performance and acquisition files
    if years > 10:
        #first 10 years
        year_list = list(range(0,10))
        env["perf_regex"] = "'Performance_200{0}*.parquet'".format(year_list)
        env["acq_regex"] = "'Acquisition_200{0}*.parquet'".format(year_list)
        #everything else
        year_list = list(range(0,env["year_count"]-10))
        env["perf_regex_2"] = "'Performance_201{0}*.parquet'".format(year_list)
        env["acq_regex_2"] = "'Acquisition_201{0}*.parquet'".format(year_list)
    else:
        env["acq_regex_2"] = None
        env["perf_regex_2"] = None
        year_list = list(range(0,env["year_count"]))
        env["perf_regex"] = "'Performance_200{0}*.parquet'".format(year_list)
        env["acq_regex"] = "'Acquisition_200{0}*.parquet'".format(year_list)
    


In [5]:
def setGlobals():
    global env
    # specify the name of the 'data' sub-folder
    env["data_folder"] = "data"
    #enable this to run on Linux, OSX, Windows
    env["path_separator"] = os.path.sep
    performance_folder = "perf"
    acquisition_folder = "acq"
    # define path information for source data sets. 
    env["base_path"] = os.getcwd()
    env["data_path"] = env["base_path"] + env["path_separator"] + env["data_folder"]
    env["perf_path"] = env["data_path"] + env["path_separator"] + performance_folder
    env["acq_path"] = env["data_path"] + env["path_separator"] + acquisition_folder

    env["perf_temp"] = "perf1"
    env["acq_temp"] = "acq1"
    env["perf_temp_path"] = env["data_path"] + env["path_separator"] + env["perf_temp"]
    env["acq_temp_path"] = env["data_path"] + env["path_separator"] + env["acq_temp"]

    os.chdir(env["data_path"])
    # create temp folders if needed for this run
    ![ -d "'{env['perf_temp']}'" ] || mkdir "'{env['perf_temp']}'"
    ![ -d "'{env['acq_temp']}'" ] || mkdir "'{env['acq_temp']}'"
    # if the temp folders have content, empty them
    os.chdir(env["perf_temp_path"])
    #print(os.getcwd())
    #!ls
    #print("removing files ")
    !rm *.*
    #print("ls after removal\n")
    #!ls
    os.chdir(env["acq_temp_path"])
    #print(os.getcwd())
    !rm *.*
    #!ls

In [6]:
def copyMortgageFiles():
    global env
    # get a list of the files to be copied
    os.chdir(env["perf_path"])
    perf_list = eval("glob.glob({0})".format(env["perf_regex"]))
    if env["perf_regex_2"] != None:
        t_list = eval("glob.glob({0})".format(env["perf_regex_2"]))
        perf_list = perf_list + t_list
    os.chdir(env["acq_path"])
    t_str = "glob.glob({0})".format(env["acq_regex"])
    acq_list = eval(t_str)
    if env["acq_regex_2"] != None:
        t_list = eval("glob.glob({0})".format(env["acq_regex_2"]))
        acq_list = acq_list + t_list
    # reset back to the root folder for this repo. 
    os.chdir(env["base_path"])
    
    # copy over just those parquet files to be used for this run
    # performance folder copy
    src_base = env["perf_path"] + env["path_separator"]
    target_base = env["data_path"] + env["path_separator"] + env["perf_temp"] + env["path_separator"]
    for each in perf_list:
        shutil.copyfile(src_base + each, target_base + each)

    # acquisition folder copy
    src_base = env["acq_path"] + env["path_separator"]
    target_base = env["data_path"] + env["path_separator"] + env["acq_temp"] + env["path_separator"]

    for each in acq_list:
        shutil.copyfile(src_base + each, target_base + each)

In [7]:
def create_db(datadir):
    global env
    conn = duckdb.connect("mortgage.db")
    perf_path = env["perf_temp_path"] + env["path_separator"] + "*.parquet"
    acq_path =  env["acq_temp_path"] + env["path_separator"] + "*.parquet"
    conn.execute(f"CREATE OR REPLACE VIEW perf AS SELECT * FROM '{perf_path}'")
    conn.execute(f"CREATE OR REPLACE VIEW acq AS SELECT * FROM '{acq_path}'")
    conn.close()

In [8]:
def summary_expr(datadir):
    db = ibis.duckdb.connect("mortgage.db")
    perf = db.table("perf")
    acq = db.table("acq")
    t0 = time.time()
    acq = acq[
        acq.loan_id, acq.orig_date.split("/")[1].name("year"), acq.borrower_credit_score
    ]
    joined = acq.inner_join(perf, acq.loan_id == perf.loan_id)
    t1 = time.time()
    chargeoffs = (
        ibis.case()
        .when(
            (perf.zero_balance_code.isin(["02", "03", "09", "15"]))
            & (perf.disposition_date.notnull()),
            1,
        )
        .else_(0)
        .end()
        .name("charegoffs")
    )
    t2 = time.time()
    
    dollar_co = (
        perf.zero_balance_code.isin(["02", "03", "09", "15"])
        & (perf.disposition_date.notnull())
    ).ifelse(perf.current_actual_upb, 0)
    t3 = time.time()
    
    loans = joined.mutate(chargeoffs=chargeoffs, dollar_co=dollar_co).projection(
        [
            perf.loan_id,
            chargeoffs,
            dollar_co.name("dollar_co"),
            perf.loan_age,
            perf.current_actual_upb,
            acq.year,
            acq.borrower_credit_score,
        ]
    )
    t4 = time.time()
    
    summary = (
        loans[loans.loan_age > 0]
        .groupby([loans.year, loans.loan_age])
        .aggregate(
            co_count=lambda x: x.charegoffs.cast("int64").sum(),
            dollar_co=lambda x: x.dollar_co.sum(),
            avg_credit_score=lambda x: x.borrower_credit_score.mean(),
            upb_sum=lambda x: x.current_actual_upb.sum(),
        )
    )
    t5 = time.time()
    
    acq_agg = acq.groupby([acq.year]).loan_id.count()
    t6 = time.time()
    
    summary = summary.inner_join(acq_agg, acq_agg.year == summary.year)
    t7 = time.time()
    
    summary = summary.projection(
        [
            summary.year_x,
            summary.loan_age,
            summary["count(loan_id)"],
            summary.avg_credit_score,
            summary.upb_sum,
            summary.dollar_co,
        ]
    )
    t8 = time.time()
    print(f"t1: {t1-t0}\t t2: {t2-t1}\t t3: {t3-t2}\t t4: {t4-t3}\t t5: {t5-t4}\t t6: {t6-t5}\t t7: {t7-t6}")
    del db
    return summary


In [9]:
def summary_sql(datadir):
    global env
    perf_path = env["perf_temp_path"] + env["path_separator"] + "*.parquet"
    acq_path =  env["acq_temp_path"] + env["path_separator"] + "*.parquet"
    with open("summary.sql") as f:
        template = Template(f.read())
    return template.render(
        perf=str(perf_path), acq=str(acq_path)
    )




In [10]:
def window_sql(datadir):
    global env
    perf_path = env["perf_temp_path"] + env["path_separator"] + "*.parquet"
    acq_path =  env["acq_temp_path"] + env["path_separator"] + "*.parquet"

    sql = f"select count(*) from (select RANK() OVER (PARTITION BY loan_id ORDER BY monthly_reporting_period) as number from '{perf_path}')"
    return sql





In [11]:
def execute(expr, threads=8):
    conn = duckdb.connect("mortgage.db")
    conn.execute(f"PRAGMA threads={threads}")
    result = conn.execute(str(expr)).fetchall()
    conn.close()
    return result




In [12]:
def platform_info():
    return {
        "machine": platform.machine(),
        "version": platform.version(),
        "platform": platform.platform(),
        "system": platform.system(),
        "cpu_count": psutil.cpu_count(),
        "memory": psutil.virtual_memory().total,
        "processor": platform.processor(),
    }


In [13]:
def main(mode, datadir, threads):
    global thread_count
    datadir = Path(datadir)
    # t3 = time.time()
    create_db(datadir)
    # t4 = time.time()
    # print(f"time to create database: {t4-t3}")
    
    if mode == "ibis":
        summary = summary_expr(datadir)
        sql = summary.compile().compile(compile_kwargs={"literal_binds": True})
    else:
        sql = summary_sql(datadir)
    #t5 = time.time()
    row_count_perf = execute("select count(*) from perf")[0][0]
    row_count_acq = execute("select count(*) from acq")[0][0]
    #t6 = time.time()
    #print(f"time to count rows: {t6-t5}")
    #if threads:
        #runs = range(2, psutil.cpu_count() + 2, 2)
    #else:
        #runs = [psutil.cpu_count()]
    runs = [env["thread_count"]]
    result = []
    for thread in runs:
        start_time = time.time()
        mem = memory_usage(
            (
                execute,
                (
                    sql,
                    thread,
                ),
            )
        )
        total_time = time.time() - start_time

        data = {
            **platform_info(),
            "threads": thread,
            "run_date": datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
            "total_time": total_time,
            "row_count_perf": row_count_perf,
            "row_count_acq": row_count_acq,
            "max_memory_usage": max(mem),
            "incremental_memory_usage": mem[-1] - mem[0],
            "sql": " ".join(str(sql).split()),
        }
        result.append(data)
    # print(json.dumps(result))

    return(result)


In [None]:
t1 = []
_cpus = (platform_info())["cpu_count"]
for y in range (2, 17, 2):
    
    for t in range(1, _cpus+1):
        setThreadsAndYears(t,y)
        setGlobals()
        copyMortgageFiles()

        res  = main(env["mode"], env["data_folder"], env["thread_count"])

        print(f"CPUs: {res[0]['cpu_count']} Threads: {res[0]['threads']} \t Years: {env['year_count']} \t Elapsed Time: {res[0]['total_time']} ")
        res[0]['year_count'] = env['year_count']
        t1.append(res[0])

In [13]:
print(t1)

[[1, 2, 34.13179802894592], [2, 2, 27.07899498939514], [3, 2, 20.35041618347168], [4, 2, 15.83047890663147], [5, 2, 12.359760999679565], [6, 2, 11.785526037216187], [7, 2, 10.902962923049927], [8, 2, 10.591654062271118], [9, 2, 8.909969091415405], [10, 2, 9.083163976669312], [1, 4, 302.4434003829956], [2, 4, 193.0558729171753], [3, 4, 150.8484320640564], [4, 4, 98.68069505691528], [5, 4, 85.20340299606323], [6, 4, 67.68311214447021], [7, 4, 62.85779595375061], [8, 4, 57.582435846328735], [9, 4, 52.809869050979614], [10, 4, 49.02885985374451], [1, 6, 483.0493550300598], [2, 6, 364.9708499908447], [3, 6, 223.07752895355225], [4, 6, 179.41523385047913], [5, 6, 125.97319102287292], [6, 6, 114.27647399902344], [7, 6, 86.06626009941101], [8, 6, 77.14006400108337], [9, 6, 68.0812497138977], [10, 6, 69.91337895393372], [1, 8, 710.6453788280487], [2, 8, 468.09270095825195], [3, 8, 280.84032011032104], [4, 8, 246.90446186065674], [5, 8, 179.73874592781067], [6, 8, 150.85428714752197], [7, 8, 106