
### Step 1: Connect to ADLS

Since we are using Azure Credentials Passthrough connection method, we just need to use the custom access token and mount the folder that has all our TestStand files. 


In [0]:
# Declare script variables
debug_mode = True
database_name = "dbo"
source_URI = "REDACTED"
target_URI = source_URI + database_name
mnt_pt = "/mnt/bronze/" + database_name

# Azure Credentials Passthrough configuration.
configs = {
  "fs.azure.account.auth.type": "CustomAccessToken",
  "fs.azure.account.custom.token.provider.class": spark.conf.get("spark.databricks.passthrough.adls.gen2.tokenProviderClassName")
}

# Add mnt_pt to the source URI if not already mounted.
if any(mount.mountPoint == mnt_pt for mount in dbutils.fs.mounts()):
  pass
else:
  dbutils.fs.mount(source = source_URI, mount_point = mnt_pt, extra_configs = configs)

# List files in directory
if debug_mode:
  dbutils.fs.ls(target_URI)


### Step 2: Setup Database and Extract Data to Tables in Databricks

We instantiate a DBFS database and USE it. Then we iterate over the TSDB directory at that level and create a table for each CSV file in that mount point. 


In [0]:
# Create and use DATABASE [drop IF ALREADY EXISTS]
spark.sql(f"drop database IF EXISTS {database_name} cascade ")
spark.sql(f"create database {database_name}")
spark.sql(f"use {database_name}")

# Create all tables into DBFS database
tables_list = dbutils.fs.ls(target_URI)

for table in tables_list:
    table_name = table.name.strip(".parquet")
    spark.sql(f"DROP TABLE IF EXISTS `{table_name}`")
    spark.sql(f"""
              CREATE TABLE `{table_name}` 
              USING parquet 
              OPTIONS(path '{table.path}', header 'true', inferschema 'true')
              """)
    
# Display table
# if debug_mode:
#     df = spark.sql('select * from uut_result')
#     display(df)


### Step 3: Transform and Export Processed Data to ADLS

Extract data from imported database assuming default Access NI schema.
Data is converted to flat denormalized CSV files and exported to ADLS **silver** container.

One CSV file will be created for each Sequence File name that was called -
* Numeric Data - data of steps with Numeric Limit Tests; column names step names and tolerances


In [0]:
"""Query Step_SeqCalls Table"""

sql_string2 = """
SELECT STEP_SEQCALL.STEP_RESULT, STEP_SEQCALL.SEQUENCE_FILE_PATH 
FROM STEP_SEQCALL
ORDER BY STEP_SEQCALL.STEP_RESULT ASC
"""

sequence_calls = spark.sql(sql_string2)

# Transform raw Column of filepaths to clean Column of Sequence_Name's
rdd=sequence_calls.rdd.map(lambda x: 
    (x['STEP_RESULT'],x['SEQUENCE_FILE_PATH'].split('\\')[-1].split('.')[0])
    )  
sequence_names=rdd.toDF(["STEP_RESULT","SEQUENCE_FILE_PATH"])

if debug_mode:
    display(sequence_names)

STEP_RESULT,SEQUENCE_FILE_PATH
1,352021
5,Common
6,Common
61,Common
72,352021
76,Common
77,Common
132,Common
142,Common
147,Common


In [0]:
"""Create a dictionary where each unique value in SEQUENCE_NAME becomes a key"""

tbl_dict = {}

# Extract list of unique Sequence_Name's
seq_list = sequence_names.select("SEQUENCE_FILE_PATH").distinct().rdd.map(lambda x : x[0]).collect()

# Dictionary values are empty and will be populated later
for seq_name in seq_list:
    tbl_dict[f"{seq_name}"] = []

if debug_mode:
    print(tbl_dict)


{'352018': [], 'Common': [], '352021': []}


In [0]:
"""Query UUT_Results Table"""

# Get all desired test runs

sql_string = (
    "SELECT ID, STATION_ID, START_DATE_TIME, EXECUTION_TIME, TEST_SOCKET_INDEX, UUT_SERIAL_NUMBER, UUT_STATUS "
    "FROM UUT_RESULT "
    "WHERE UUT_STATUS <> 'Terminated' and STATION_ID is not NULL"
)
sql_string += " ORDER BY START_DATE_TIME, TEST_SOCKET_INDEX"

uut_runs = spark.sql(sql_string)

if debug_mode:
    display(uut_runs)

ID,STATION_ID,START_DATE_TIME,EXECUTION_TIME,TEST_SOCKET_INDEX,UUT_SERIAL_NUMBER,UUT_STATUS
1,BRM-DQ1VH63,2021-11-10T14:54:05Z,85.4034475,-1,352Z9999,Error
2,BRM-DQ1VH63,2021-11-10T15:00:30Z,545.1689131,-1,352C0000,Passed
3,BRM-DQ1VH63,2021-11-11T03:55:27Z,538.7750436,-1,352C3129,Failed
4,BRM-DQ1VH63,2021-11-11T04:04:53Z,534.2498016,-1,352C3129,Passed
5,BRM-DQ1VH63,2021-11-11T04:16:34Z,551.1659191,-1,352C3134,Passed
6,BRM-DQ1VH63,2021-11-11T04:26:20Z,156.0671306,-1,352C3136,Error
8,BRM-DQ1VH63,2021-11-11T04:41:02Z,525.8419235,-1,352C3132,Passed
9,BRM-DQ1VH63,2021-11-11T05:06:29Z,656.8235001,-1,352C3125,Passed
10,BRM-DQ1VH63,2021-11-11T05:17:39Z,666.4288131,-1,352C3125,Passed
11,BRM-DQ1VH63,2021-11-11T05:35:20Z,524.1531004,-1,352C3135,Passed


In [0]:
"""Construct Test Results Data Table by matching each Step_Result to its parent UUT_Result"""

for run in uut_runs.collect():
    # Identify Metadata for each Run in UUT_Results
    data = {
            "Test Start": run.START_DATE_TIME,
            "Station ID": run.STATION_ID,
            "Serial Number": run.UUT_SERIAL_NUMBER,
            "Test Socket": run.TEST_SOCKET_INDEX,
            "Test Status": run.UUT_STATUS,
            "Test Time (s)": int(round(run.EXECUTION_TIME, None)),
        }

    # Query individual step results inside each Test Run
    sql_string3 = (
        f"""
            SELECT  STEP_RESULT.ID, 
                    STEP_RESULT.STEP_PARENT, 
                    STEP_RESULT.STEP_NAME, 
                    STEP_RESULT.STEP_TYPE, 
                    STEP_RESULT.STATUS, 
                    STEP_RESULT.ORDER_NUMBER,
                    PROP_RESULT.ID AS PROP_ID, 
                    PROP_RESULT.TYPE_NAME, 
                    PROP_RESULT.DATA, 
                    PROP_RESULT.NAME,
                    PROP_NUMERICLIMIT.COMP_OPERATOR AS COP, 
                    PROP_NUMERICLIMIT.HIGH_LIMIT AS HL, 
                    PROP_NUMERICLIMIT.LOW_LIMIT AS LL, 
                    PROP_NUMERICLIMIT.UNITS AS UNITS
            FROM (STEP_RESULT LEFT JOIN PROP_RESULT ON STEP_RESULT.ID = PROP_RESULT.STEP_RESULT)
                    LEFT JOIN PROP_NUMERICLIMIT ON PROP_RESULT.ID = PROP_NUMERICLIMIT.PROP_RESULT
            WHERE STEP_RESULT.UUT_RESULT = {run.ID} 
                    and STEP_RESULT.STEP_TYPE = 'NumericLimitTest' 
                    and PROP_RESULT.TYPE_NAME = 'NumericLimitTest'
            ORDER BY STEP_RESULT.ORDER_NUMBER ASC
            """
    )
    
    run_steps = spark.sql(sql_string3)

    # Process Step for each UUT_Run
    for step in run_steps.collect():

        # Different step runs will have repeated step name.
        repeat_index = 0
        key_name = (
                f"{step.STEP_NAME} ({step.UNITS}) [{repeat_index}]"
        )

        # Append numeric suffix to distinguish.
        while key_name in data:
                repeat_index += 1
                key_name = (
                f"{step.STEP_NAME} ({step.UNITS}) [{repeat_index}]"
                )

        # Add new key-value pair with precision of 3 decimal places
        data[key_name] = round(float(step.DATA), 3)
        
    # Convert STEP_PARENT index to the corresponding sequence file dictionary value
    dict_val = ""
    for x, y in sequence_names.collect():
        if x == step.STEP_PARENT:
            dict_val = y

    # Append data to the appropriate output table according to dictionary value
    tbl_dict[f"{dict_val}"].append(data)

if debug_mode:
    display(data)


In [0]:
# """Export each output table to SILVER ADLS2 container"""

# import pandas as pd

# for seq_name in seq_list:
#     filename = seq_name + ".csv"
#     output_file = output_folder + "\\" + filename
#     filepath = Path(output_file)
#     if filepath.is_file():
#         write_header = False
#     else:
#         write_header = True
#     pd.DataFrame.from_records(tbl_dict[f"{seq_name}"]).to_csv(
#         output_file, mode="a", index=False, header=write_header
#     )


