In [6]:
import os
from pyspark.sql import SparkSession
import time
from scipy.stats import gmean

# Notes on how queries work

### At the end of each Phase
- Execute batch validation query
- Write into DImessages a phase completion record

### Time to time
- Execution of Data Visibility Queries in historical and incremental

### Automated Audit Phase
(to be executed just after 2nd Incremental Update)

1. Load audit data into the audit table (note that first row of each audit file is the header)
2. Execute Data Visibility 1 query (appendix C)
3. Execute the audit query (appendix A)

Re-think of incremental insert then

# TO-DO
1. DONE: Load audit data 1 into the audit table
2. DONE: Load audit data 2 into the audit table
3. DONE: Load audit data 3 into the audit table
4. DONE: Execute Data Visibility 1 query
5. DONE: Execute audit query
6. DONE: Batch validation in Historical and Incremental
7. DONE: Data visibility queries time to time
8. DONE: Metrics
9. DONE: Re-think of incremental insert
10. DImessages

## Data Visibility queries

Those are a pain in the ass :)

Basically from start of Incremental 1 up to the end of Incremental 2 we need to run **once** visibility 1 and then **n times** visibility 2. Between each execution no more than 5 minutes can elapse. So what I thought about?

1. Run visibility 1 at the start of Incremental Phase 1 (so that surely less than 5 minutes elapse between start of Incremental 1 and Visibility 1)
2. Run visibility 2 right after Visibility 2 (so that we're sure Visibility 2 is executed at least once)
3. Start measuring time
2. After each load, run "visibility_2_fun" function. It works as follows:
    - Checks if more than 5 minutes elapsed (if so, return an error)
    - Checks at least 2 minutes elapsed (we don't want to execute the visibility queries too frequently, as they may slow down the benchmark)
    - If a time between 2 and 5 minutes elapsed, proceed:
        - Run visibility 2
        - Reset the time

In [4]:
def visibility_2_fun():
    
    global start_time
    end_time = time.time()
    execution_time = end_time - start_time
    
    if execution_time > 300: # 300 seconds = 5 minutes
        print("ERROR: more than 5 minutes elapsed between visibility queries's executions")
        
    elif execution_time < 300 and execution_time > 120:
        spark.sql(os.getcwd() + "/data/output/tpcdi_visibility_2.sql")
        start_time = time.time()
        return
        
    else:
        return

In [None]:
# Run Visibility 1 at the start of incremental
spark.sql(os.getcwd() + "/data/output/tpcdi_visibility_1.sql")

# Run Visibility 2 right after visibility 1
spark.sql(os.getcwd() + "/data/output/tpcdi_visibility_2.sql")

# Register time
start_time = time.time()

# Block of code...

# After each block of code:
visibility_2_fun()

# Another block of code...

## Before Historical Load

In [None]:
# If not done: create the audit table in tpcdi.sql

In [None]:
# This function loads audit data into the audit table. It can be used for all the three batches.

def audit_upload(batch_n):
    # Go to Batch1 and consider only the files that end for "audit.csv"
    files_in_directory = os.listdir(os.getcwd() + f"/data/output/{batch_n}/")
    audit_files = [file for file in files_in_directory if file.endswith("_audit.csv")]

    # For each audit_file...
    for audit_file in audit_files:
        # Place "audit_file" data into a Spark DF
        file_path = os.path.join(os.getcwd() + f"/data/output/{batch_n}/", audit_file)
        audit_df = spark.read.csv(file_path, header=True, inferSchema=True)

        # Append data to the Audit table
        audit_df.write.insertInto("Audit", overwrite=False)

In [None]:
# Just before startin the Historical Load: execute the batch validation query
spark.sql(os.getcwd() + "/data/output/tpcdi_validation.sql")

## Historical Load

In [None]:
# Execute the following line during the Historical Phase
audit_upload("Batch1")

# ...

# At the end of the Historical Load: execute the batch validation query
spark.sql(os.getcwd() + "/data/output/tpcdi_validation.sql")

## Incremental Phase 1

In [None]:
# Execute the following line during the Incremental Phase 1
audit_upload("Batch2")

# ...

# At the end of the Incremental Phase 1: execute the batch validation query
spark.sql(os.getcwd() + "/data/output/tpcdi_validation.sql")

## Incremental Phase 2

In [None]:
# Execute the following line during the Incremental Phase 2
audit_upload("Batch3")

# ...

# At the end of the Incremental Phase 2: execute the batch validation query
spark.sql(os.getcwd() + "/data/output/tpcdi_validation.sql")

## Automated Audit Phase

In [None]:
## The followinf line execute the data visibility query 1. It has to be executed once at the start of the Automated Audit Phase
spark.sql(os.getcwd() + "/data/output/tpcdi_visibility_1.sql")

# Then the audit query has to be runned once
spark.sql(os.getcwd() + "/data/output/tpcdi_audit.sql")

# After Automated Audit (metric computation)

In [None]:
# In TPC-DI the times are given as output of the Batch Validation query,
# so we need to retreive them from DImessages table

# CT = Completion Timestamp
CT0 = spark.sql(
    "select MessageDateAndTime from DImessages where BatchID = 0 and MessageType = ‘PCR’"
)
CT1 = spark.sql(
    "select MessageDateAndTime from DImessages where BatchID = 1 and MessageType = ‘PCR’"
)
CT2 = spark.sql(
    "select MessageDateAndTime from DImessages where BatchID = 2 and MessageType = ‘PCR’"
)
CT3 = spark.sql(
    "select MessageDateAndTime from DImessages where BatchID = 3 and MessageType = ‘PCR’"
)

# TH = Throughput of Historical Load
# TH = RH / EH
# --> RH = Row count of Batch1 (from "digen_report.txt")
# --> EH = Elapsed time for Historical Load
TH = 7804509 / (CT1 - CT0)

# For incremental phases, similar to what did before, except that
# the denominator is the maximum betweeen the elapsed time and 1800
TI1 = 33380 / max(CT2 - CT1, 1800)
TI2 = 33455 / max(CT3 - CT2, 1800)

TPC_DI_RPS = round(gmean([TH, min(TI1, TI2)]))

