%md
Bad query metrics:
1. execution_duration_ms
2. total_duration_ms
3. compilation_duration_ms
<!-- 4. read_partitions -->
<!-- 5. pruned_files -->
<!-- 6. read_files -->
7. read_rows
8. produced_rows
9. read_bytes
10. statement_id
11. executed_by

In [0]:
# %sql
# CREATE TABLE IF NOT EXISTS ds_training_1.ds_control.metric_weights_bad_queries (
#   metric_name VARCHAR(50),
#   weight DECIMAL(5, 4)
# );

In [0]:
# %sql
# INSERT INTO ds_training_1.ds_control.metric_weights_bad_queries (metric_name, weight) VALUES
# ("Total Duration", 0.2),
# ("Execution Duration", 0.2),
# ("Compilation Duration", 0.1),
# ("Read Rows", 0.15),
# ("Produced Rows", 0.15),
# ("Read Bytes", 0.2)

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Window

In [0]:
%sql
SELECT * FROM ds_training_1.ds_control.metric_weights_bad_queries;

In [0]:
%python
catalog_schema_df = spark.sql("SELECT DISTINCT catalog_name, schema_name FROM ds_training_1.ds_control.table_config")
catalog_schema_df.show()

list_of_schemas = [(row.catalog_name, row.schema_name) for row in catalog_schema_df.collect()]
print(list_of_schemas)

In [0]:
weights_df = spark.sql("SELECT metric_name, weight FROM ds_training_1.ds_control.metric_weights_bad_queries")
weights = {row.metric_name: row.weight for row in weights_df.collect()}

print(weights)

In [0]:
dbutils.widgets.text("num_days", "7", "Number of Days")
num_days = int(dbutils.widgets.get("num_days"))

In [0]:
def get_reqd_metrics_df(num_days):
    reqd_metrics = f"""
        SELECT 
            statement_id,
            executed_by,
            statement_text,
            execution_status,
            error_message,
            total_duration_ms,
            execution_duration_ms,
            compilation_duration_ms,
            read_rows,
            produced_rows,
            read_bytes,
            end_time 
        FROM 
            system.query.history h
        JOIN 
            (SELECT 
                 '%' || CONCAT(catalog_name, '.', schema_name, '.', table_name) || '%' AS pattern
            FROM 
                ds_training_1.ds_control.table_config) tc
        ON 
            h.statement_text LIKE tc.pattern
        WHERE 
            end_time BETWEEN date_sub(current_date, {num_days}) AND current_date   
    """

    reqd_metrics_df = spark.sql(reqd_metrics.format(num_days=num_days))
    return reqd_metrics_df

# Assuming `num_days` is defined
reqd_metrics_df = get_reqd_metrics_df(num_days)

In [0]:
def normalize_metrics(reqd_metrics_df):
    # Normalize the specified metrics
    for metric in ["total_duration_ms", "execution_duration_ms", "compilation_duration_ms", "read_rows", "produced_rows", "read_bytes"]:
        max_value = reqd_metrics_df.agg(F.max(metric).alias("max")).collect()[0]["max"]
        min_value = reqd_metrics_df.agg(F.min(metric).alias("min")).collect()[0]["min"]
        range_value = max_value - min_value
        reqd_metrics_df = reqd_metrics_df.withColumn(f"normalized_{metric}", (F.col(metric) - min_value) / range_value)

    # Define a window specification to order by end_time to find the most recent execution of the query
    window_spec = Window.partitionBy("statement_text").orderBy(F.desc("end_time"))

    reqd_metrics_df = (
        reqd_metrics_df.withColumn("execution_status", F.first("execution_status").over(window_spec))
                  .withColumn("error_message", F.first("error_message").over(window_spec))
                  .withColumn("executed_by", F.first("executed_by").over(window_spec))
    )
    
    return reqd_metrics_df

# Assuming `num_days` is defined
reqd_metrics_df = get_reqd_metrics_df(num_days)
reqd_metrics_df = normalize_metrics(reqd_metrics_df)
display(reqd_metrics_df)

In [0]:
def calculate_composite_score(reqd_metrics_df, weights):
    # Calculate composite score using normalized values
    composite_score_df = reqd_metrics_df.groupBy("statement_text").agg(
        F.avg(F.coalesce(F.col("normalized_total_duration_ms"), F.lit(1))).alias("normalized_total_duration_ms"),
        F.avg(F.coalesce(F.col("normalized_execution_duration_ms"), F.lit(1))).alias("normalized_execution_duration_ms"),
        F.avg(F.coalesce(F.col("normalized_compilation_duration_ms"), F.lit(1))).alias("normalized_compilation_duration_ms"),
        F.avg(F.coalesce(F.col("normalized_read_rows"), F.lit(0))).alias("normalized_read_rows"),
        F.avg(F.coalesce(F.col("normalized_produced_rows"), F.lit(0))).alias("normalized_produced_rows"),
        F.avg(F.coalesce(F.col("normalized_read_bytes"), F.lit(0))).alias("normalized_read_bytes"),
        F.first("execution_status").alias("execution_status"),
        F.first("error_message").alias("error_message"),
        F.first("executed_by").alias("executed_by")
    ).withColumn(
        "composite_score",
        (weights['Total Duration'] * F.col("normalized_total_duration_ms")) +
        (weights['Execution Duration'] * F.col("normalized_execution_duration_ms")) +
        (weights['Compilation Duration'] * F.col("normalized_compilation_duration_ms")) +
        (weights['Read Rows'] * F.col("normalized_read_rows")) +
        (weights['Produced Rows'] * F.col("normalized_produced_rows")) +
        (weights['Read Bytes'] * F.col("normalized_read_bytes"))
    )

    # Rank the queries based on composite score
    ranked_df = composite_score_df.withColumn("rank", F.row_number().over(Window.orderBy(F.desc("composite_score"))))

    # Calculate min and max composite scores
    min_max_scores = composite_score_df.agg(
        F.min("composite_score").alias("min_score"),
        F.max("composite_score").alias("max_score")
    ).collect()[0]

    min_score = min_max_scores['min_score']
    max_score = min_max_scores['max_score']

    # Add a new column for normalized composite score
    normalized_score_df = composite_score_df.withColumn(
        "normalized_composite_score",
        (F.col("composite_score") - min_score) / (max_score - min_score)
    )

    # Rank the queries based on normalized composite score
    ranked_normalized_df = normalized_score_df.withColumn("rank", F.row_number().over(Window.orderBy(F.desc("normalized_composite_score"))))

    return ranked_normalized_df

# Assuming `num_days` and `weights` are defined
reqd_metrics_df = get_reqd_metrics_df(num_days)
reqd_metrics_df = normalize_metrics(reqd_metrics_df)
ranked_normalized_df = calculate_composite_score(reqd_metrics_df, weights)
display(ranked_normalized_df.select("statement_text", "executed_by",  "execution_status", "error_message","normalized_composite_score", "rank"))

In [0]:
# from pyspark.sql import functions as F
# from pyspark.sql import Window

# # variable to convert ms to s and bytes to megabytes
# ms_to_s = 1000
# b_to_mb = 1e6

# reqd_metrics = spark.sql("""
#         SELECT 
#             statement_id,
#             executed_by,
#             statement_text,
#             execution_status,
#             error_message,
#             total_duration_ms,
#             execution_duration_ms,
#             compilation_duration_ms,
#             read_rows,
#             produced_rows,
#             read_bytes,
#             end_time 
#         FROM 
#             system.query.history h
#         JOIN 
#             (SELECT 
#                  '%' || CONCAT(catalog_name, '.', schema_name, '.', table_name) || '%' AS pattern
#             FROM 
#                 ds_training_1.ds_control.table_config) tc
#         ON 
#             h.statement_text LIKE tc.pattern
# """)

# # Normalize the specified metrics
# for metric in ["total_duration_ms", "execution_duration_ms", "compilation_duration_ms", "read_rows", "produced_rows", "read_bytes"]:
#     max_value = reqd_metrics.agg(F.max(metric).alias("max")).collect()[0]["max"]
#     min_value = reqd_metrics.agg(F.min(metric).alias("min")).collect()[0]["min"]
#     range_value = max_value - min_value
#     reqd_metrics = reqd_metrics.withColumn(f"normalized_{metric}", (F.col(metric) - min_value) / range_value)

# # Define a window specification to order by end_time to find the most recent execution of the query
# window_spec = Window.partitionBy("statement_text").orderBy(F.desc("end_time"))

# reqd_metrics_df = (
#     reqd_metrics.withColumn("execution_status", F.first("execution_status").over(window_spec))
#               .withColumn("error_message", F.first("error_message").over(window_spec))
#               .withColumn("executed_by", F.first("executed_by").over(window_spec))
# )
# display(reqd_metrics_df)

# # Calculate composite score using normalized values
# composite_score_df = reqd_metrics_df.groupBy("statement_text").agg(
#     F.avg(F.coalesce(F.col("normalized_total_duration_ms"), F.lit(1))).alias("normalized_total_duration_ms"),
#     F.avg(F.coalesce(F.col("normalized_execution_duration_ms"), F.lit(1))).alias("normalized_execution_duration_ms"),
#     F.avg(F.coalesce(F.col("normalized_compilation_duration_ms"), F.lit(1))).alias("normalized_compilation_duration_ms"),
#     F.avg(F.coalesce(F.col("normalized_read_rows"), F.lit(0))).alias("normalized_read_rows"),
#     F.avg(F.coalesce(F.col("normalized_produced_rows"), F.lit(0))).alias("normalized_produced_rows"),
#     F.avg(F.coalesce(F.col("normalized_read_bytes"), F.lit(0))).alias("normalized_read_bytes"),
#     F.first("execution_status").alias("execution_status"),
#     F.first("error_message").alias("error_message"),
#     F.first("executed_by").alias("executed_by")
# ).withColumn(
#     "composite_score",
#     (weights['Total Duration'] * F.col("normalized_total_duration_ms")) +
#     (weights['Execution Duration'] * F.col("normalized_execution_duration_ms")) +
#     (weights['Compilation Duration'] * F.col("normalized_compilation_duration_ms")) +
#     (weights['Read Rows'] * F.col("normalized_read_rows")) +
#     (weights['Produced Rows'] * F.col("normalized_produced_rows")) +
#     (weights['Read Bytes'] * F.col("normalized_read_bytes"))
# )
# display(composite_score_df)

# # Rank the queries based on composite score
# ranked_df = composite_score_df.withColumn("rank", F.row_number().over(Window.orderBy(F.desc("composite_score"))))
# display(ranked_df)

# # Calculate min and max composite scores
# min_max_scores = composite_score_df.agg(
#     F.min("composite_score").alias("min_score"),
#     F.max("composite_score").alias("max_score")
# ).collect()[0]

# min_score = min_max_scores['min_score']
# max_score = min_max_scores['max_score']

# # Add a new column for normalized composite score
# normalized_score_df = composite_score_df.withColumn(
#     "normalized_composite_score",
#     (F.col("composite_score") - min_score) / (max_score - min_score)
# )

# # Rank the queries based on normalized composite score
# ranked_normalized_df = normalized_score_df.withColumn("rank", F.row_number().over(Window.orderBy(F.desc("normalized_composite_score"))))

# display(ranked_normalized_df.select("statement_text", "executed_by",  "execution_status", "error_message","normalized_composite_score", "rank"))

In [0]:
# from pyspark.sql import functions as F
# from pyspark.sql import Window

# # variable to convert ms to s and bytes to megabytes
# ms_to_s = 1000
# b_to_mb = 1e6

# reqd_metrics = spark.sql("""
#         SELECT 
#             statement_id,
#             executed_by,
#             statement_text,
#             execution_status,
#             error_message,
#             total_duration_ms,
#             execution_duration_ms,
#             compilation_duration_ms,
#             read_rows,
#             produced_rows,
#             read_bytes,
#             end_time 
#         FROM 
#             system.query.history h
#         JOIN 
#             (SELECT 
#                  '%' || CONCAT(catalog_name, '.', schema_name, '.', table_name) || '%' AS pattern
#             FROM 
#                 ds_training_1.ds_control.table_config) tc
#         ON 
#             h.statement_text LIKE tc.pattern
# """)
# display(reqd_metrics)

# # Define a window specification to order by end_time to find the most recent execution of the query
# window_spec = Window.partitionBy("statement_text").orderBy(F.desc("end_time"))

# reqd_metrics_df = (
#     reqd_metrics.withColumn("execution_status", F.first("execution_status").over(window_spec))
#               .withColumn("error_message", F.first("error_message").over(window_spec))
#               .withColumn("executed_by", F.first("executed_by").over(window_spec))
# )
# display(reqd_metrics_df)


# # Calculate composite score
# composite_score_df = reqd_metrics_df.groupBy("statement_text").agg(
# F.avg(F.coalesce(F.col("total_duration_ms"), F.lit(1000))).alias("total_duration_ms"),
# F.avg(F.coalesce(F.col("execution_duration_ms"), F.lit(1000))).alias("execution_duration_ms"),
# F.avg(F.coalesce(F.col("compilation_duration_ms"), F.lit(1000))).alias("compilation_duration_ms"),
# F.avg(F.coalesce(F.col("read_rows"), F.lit(0))).alias("read_rows"),
# F.avg(F.coalesce(F.col("produced_rows"), F.lit(0))).alias("produced_rows"),
# F.avg(F.coalesce(F.col("read_bytes"), F.lit(0))).alias("read_bytes"),
# F.first("execution_status").alias("execution_status"),
# F.first("error_message").alias("error_message"),
# F.first("executed_by").alias("executed_by")
# ).withColumn(
#     "composite_score",
#     (weights['Total Duration'] * (F.col("total_duration_ms") / ms_to_s)) +
#     (weights['Execution Duration'] * (F.col("execution_duration_ms") / ms_to_s)) +
#     (weights['Compilation Duration'] * (F.col("compilation_duration_ms") / ms_to_s)) +
#     (weights['Read Rows'] * F.col("read_rows")) +
#     (weights['Produced Rows'] * F.col("produced_rows")) +
#     (weights['Read Bytes'] * (F.col("read_bytes") / b_to_mb))
# )
# display(composite_score_df)

# # Rank the queries based on composite score
# ranked_df = composite_score_df.withColumn("rank", F.row_number().over(Window.orderBy(F.desc("composite_score"))))
# display(ranked_df.select("statement_text", "execution_status", "error_message", "composite_score", "rank"))


# # Calculate min and max composite scores
# min_max_scores = composite_score_df.agg(
#     F.min("composite_score").alias("min_score"),
#     F.max("composite_score").alias("max_score")
# ).collect()[0]

# min_score = min_max_scores['min_score']
# max_score = min_max_scores['max_score']

# # Add a new column for normalized composite score
# normalized_score_df = composite_score_df.withColumn(
#     "normalized_composite_score",
#     (F.col("composite_score") - min_score) / (max_score - min_score)
# )

# # Rank the queries based on normalized composite score
# ranked_normalized_df = normalized_score_df.withColumn("rank", F.row_number().over(Window.orderBy(F.desc("normalized_composite_score"))))

# # Show the ranked DataFrame with normalized scores
# display(ranked_normalized_df.select("statement_text", "executed_by",  "execution_status", "error_message","normalized_composite_score", "rank"))

# # Important queries which have failed to get executed
# failed_imp_queries = ranked_normalized_df.filter(F.col("execution_status") == "FAILED")
# display(failed_imp_queries.select("statement_text",  "executed_by", "execution_status", "error_message", "normalized_composite_score", "rank"))


In [0]:
copied_df = ranked_normalized_df
display(copied_df)

In [0]:
from pyspark.sql import functions as F
import numpy as np

def calculate_threshold_mean_sd(df):
    
    # Select relevant columns
    thresh_df = df.select("statement_text", "normalized_composite_score", "rank")
    
    # Collect the normalized composite scores into a list
    composite_scores = [row['normalized_composite_score'] for row in thresh_df.collect()]
    
    # Calculate mean and standard deviation
    mean_score = np.mean(composite_scores)
    std_dev_score = np.std(composite_scores)
    
    # Set threshold as 2 standard deviations above the mean
    threshold = mean_score + 2 * std_dev_score
    return threshold
    

In [0]:
print(calculate_threshold_mean_sd(copied_df))

In [0]:
%pip install kneed
from kneed import KneeLocator
import matplotlib.pyplot as plt

def calculate_threshold_elbow_point(thresh_df):

    # Assuming composite_scores are sorted in descending order and collected from the DataFrame
    composite_scores = [row['normalized_composite_score'] for row in thresh_df.orderBy(F.desc("normalized_composite_score")).collect()]

    ranks = list(range(1, len(composite_scores) + 1))

    # Plot the rank vs composite score for visualization
    plt.plot(ranks, composite_scores)
    plt.xlabel('Rank')
    plt.ylabel('Composite Score')
    plt.title('Elbow Method for Threshold Selection')
    plt.show()

    # Use KneeLocator to find the elbow point
    knee_locator = KneeLocator(ranks, composite_scores, curve='convex', direction='decreasing')

    # Return the composite score
    return composite_scores[knee_locator.knee - 1]

In [0]:
print(calculate_threshold_elbow_point(copied_df))

In [0]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans

def calculate_threshold_k_means(thresh_df):

    # Convert the Spark DataFrame to a Pandas DataFrame and extract the numpy array
    composite_scores = np.array(thresh_df.select("normalized_composite_score").toPandas()).reshape(-1, 1)

    # Perform K-means clustering with 2 clusters
    kmeans = KMeans(n_clusters=2, random_state=0).fit(composite_scores)

    # Identify the cluster centroids
    centroids = kmeans.cluster_centers_

    # Choose the higher centroid as the "bad query" cluster threshold
    threshold = max(centroids)[0]

    # Visualize the clusters
    plt.scatter(composite_scores, np.zeros_like(composite_scores), c=kmeans.labels_, cmap='rainbow')
    plt.scatter(centroids[:, 0], [0, 0], color='black', marker='x', s=100, label='Centroids')
    plt.title('K-means Clustering of Composite Scores')
    plt.xlabel('Normalized Composite Score')
    plt.legend()
    plt.show()

    return threshold

In [0]:
print(calculate_threshold_k_means(copied_df))

In [0]:
def calulate_threshold_percentile(thresh_df):
    # Calculate the 90th percentile threshold
    threshold = thresh_df.approxQuantile("normalized_composite_score", [0.9], 0.0)[0]

    return threshold

In [0]:
print(calulate_threshold_percentile(copied_df))

**Classifying good and bad queries**

In [0]:
# Define a method to dynamically select the threshold calculation method
def calculate_dynamic_threshold(method, df):
    if method == "mean_sd":
        return calculate_threshold_mean_sd(df)
    elif method == "elbow_point":
        return calculate_threshold_elbow_point(df)
    elif method == "k_means":
        return calculate_threshold_k_means(df)
    elif method == "percentile":
        return calulate_threshold_percentile(df)
    else:
        return 0.4

# User can select the method here
selected_method = ""  # Options: "mean_sd", "elbow_point", "k_means", "percentile"

# Call the dynamic threshold calculation function
comp_score_thresh = calculate_dynamic_threshold(selected_method, copied_df)
display(comp_score_thresh)

# Add the new column 'type_of_query' with conditions applied only to "FINISHED" queries, default to "Good Query"
copied_df = copied_df.withColumn(
    "type_of_query",
    F.when(
        (F.col("normalized_composite_score") > comp_score_thresh) & 
        (F.col("execution_status") == "FINISHED"), 
        "Bad Query"
    ).when(
        F.col("execution_status") == "FINISHED",
        "Good Query"
    ).otherwise("Not Applicable")
)

# Display the updated DataFrame
display(copied_df.select("statement_text", "executed_by", "execution_status", "error_message", "normalized_composite_score", "rank", "type_of_query"))

In [0]:
# Define the threshold
# comp_score_thresh = 0.40

# # Add the new column 'type_of_query' with conditions applied only to "FINISHED" queries, default to "Good Query"
# ranked_normalized_df = ranked_normalized_df.withColumn(
#     "type_of_query",
#     F.when(
#         (F.col("normalized_composite_score") > comp_score_thresh) & 
#         (F.col("execution_status") == "FINISHED"), 
#         "Bad Query"
#     ).when(
#         F.col("execution_status") == "FINISHED",
#         "Good Query"
#     ).otherwise("Not Applicable")
# )

# # Display the updated DataFrame
# display(ranked_normalized_df.select("statement_text", "executed_by", "execution_status", "error_message", "normalized_composite_score", "rank", "type_of_query"))

In [0]:
top_10_bad_queries=copied_df.filter(F.col("type_of_query") == "Bad Query").orderBy(F.col("rank").asc()).limit(10)
display(top_10_bad_queries)

In [0]:
bad_queries = [row['statement_text'] for row in top_4_bad_queries.select("statement_text").collect()]
print(bad_queries)

In [0]:
import json
params = {
    "bad_queries" : bad_queries
    
}

print(params)

In [0]:
dbutils.jobs.taskValues.set("params", params)

In [0]:
# %pip install kneed
# from kneed import KneeLocator
# import matplotlib.pyplot as plt

# # Assuming composite_scores are sorted in descending order and collected from the DataFrame
# composite_scores = [row['normalized_composite_score'] for row in thresh_df.orderBy(F.desc("normalized_composite_score")).collect()]

# ranks = list(range(1, len(composite_scores) + 1))

# # Plot the rank vs composite score for visualization
# plt.plot(ranks, composite_scores)
# plt.xlabel('Rank')
# plt.ylabel('Composite Score')
# plt.title('Elbow Method for Threshold Selection')
# plt.show()

# # Use KneeLocator to find the elbow point
# knee_locator = KneeLocator(ranks, composite_scores, curve='convex', direction='decreasing')

# # Print the elbow point
# print(f"The elbow point is at rank: {knee_locator.knee}, with a composite score of: {composite_scores[knee_locator.knee - 1]}")

In [0]:
# import matplotlib.pyplot as plt

# # Convert the Spark DataFrame to a Pandas DataFrame and extract the numpy array
# composite_scores = np.array(thresh_df.select("normalized_composite_score").toPandas()).reshape(-1, 1)

# # Perform K-means clustering with 2 clusters
# from sklearn.cluster import KMeans
# kmeans = KMeans(n_clusters=2, random_state=0).fit(composite_scores)

# # Identify the cluster centroids
# centroids = kmeans.cluster_centers_

# # Choose the higher centroid as the "bad query" cluster threshold
# threshold = max(centroids)[0]
# display(threshold)

# # Visualize the clusters
# plt.scatter(composite_scores, np.zeros_like(composite_scores), c=kmeans.labels_, cmap='rainbow')
# plt.scatter(centroids[:, 0], [0, 0], color='black', marker='x', s=100, label='Centroids')
# plt.title('K-means Clustering of Composite Scores')
# plt.xlabel('Normalized Composite Score')
# plt.legend()
# plt.show()

# # Filter thresh_df to include only bad queries based on the threshold
# bad_queries_df = thresh_df.filter(thresh_df.normalized_composite_score >= threshold)

# display(bad_queries_df)

In [0]:
# from pyspark.sql.functions import lit, when

# # Calculate the 90th percentile threshold
# threshold = thresh_df.approxQuantile("normalized_composite_score", [0.9], 0.0)[0]
# display(threshold)

# # Label as bad queries if above threshold
# bad_queries_df = thresh_df.withColumn("is_bad_query", when(thresh_df.normalized_composite_score >= threshold, lit(True)).otherwise(lit(False)))

# display(bad_queries_df)

In [0]:
# from pyspark.sql import functions as F
# from pyspark.sql import Window 

# for i in range(len(list_of_schemas)):

#     print(list_of_schemas[i][0], list_of_schemas[i][1])

#     reqd_metrics = spark.sql(f"""
#     SELECT 
#         statement_id,
#         executed_by,
#         statement_text,
#         total_duration_ms,
#         execution_duration_ms,
#         compilation_duration_ms,
#         read_rows,
#         produced_rows,
#         read_bytes 
#     FROM system.query.history 
#     WHERE statement_text LIKE '%{list_of_schemas[i][0]}.{list_of_schemas[i][1]}%';
#     """)

#     # Define weights
#     w1 = 0.2  # total_duration_ms
#     w2 = 0.2  # execution_duration_ms
#     w3 = 0.1  # compilation_duration_ms
#     w4 = 0.15  # read_rows
#     w5 = 0.15  # produced_rows
#     w6 = 0.2  # read_bytes


# # Calculate composite score
#     composite_score_df = reqd_metrics.groupBy("statement_text").agg(
#         F.avg(F.coalesce(F.col("total_duration_ms"), F.lit(1000))).alias("total_duration_ms"),
#         F.avg(F.coalesce(F.col("execution_duration_ms"), F.lit(1000))).alias("execution_duration_ms"),
#         F.avg(F.coalesce(F.col("compilation_duration_ms"), F.lit(1000))).alias("compilation_duration_ms"),
#         F.avg(F.coalesce(F.col("read_rows"), F.lit(0))).alias("read_rows"),
#         F.avg(F.coalesce(F.col("produced_rows"), F.lit(0))).alias("produced_rows"),
#         F.avg(F.coalesce(F.col("read_bytes"), F.lit(0))).alias("read_bytes")
#     ).withColumn(
#         "composite_score",
#         (w1 * (F.col("total_duration_ms") / 1000)) +
#         (w2 * (F.col("execution_duration_ms") / 1000)) +
#         (w3 * (F.col("compilation_duration_ms") / 1000)) +
#         (w4 * F.col("read_rows")) +
#         (w5 * F.col("produced_rows")) +
#         (w6 * (F.col("read_bytes") / 1e6))
#     )

#     display(composite_score_df)

#     # Filter to unique statement_text entries before ranking
#     unique_statements_df = composite_score_df.select("statement_text", "composite_score").distinct()

#      # Rank the queries based on composite score
#     ranked_df = unique_statements_df.withColumn("rank", F.row_number().over(Window.orderBy(F.desc("composite_score"))))

#     # Show the ranked DataFrame
#     display(ranked_df.select("statement_text", "composite_score", "rank"))

In [0]:
# # Save the DataFrame as a Delta table
# reqd_metrics.write.format("delta").mode("overwrite").saveAsTable("ds_training_1.ds_gold.bad_qd")


In [0]:
# %sql
# -- Use SQL to insert data from the temporary Delta table into another table
# INSERT INTO ds_training_1.ds_gold.bad_queries_data
# SELECT statement_id, executed_by, statement_text, total_duration_ms, execution_duration_ms, compilation_duration_ms, read_rows, produced_rows, read_bytes FROM ds_training_1.ds_gold.bad_qd;


In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Window

# variable to convert ms to s and bytes to megabytes
ms_to_s = 1000
b_to_mb = 1e6

reqd_metrics = spark.sql("""
        SELECT 
            statement_id,
            executed_by,
            statement_text,
            execution_status,
            error_message,
            total_duration_ms,
            execution_duration_ms,
            compilation_duration_ms,
            read_rows,
            produced_rows,
            read_bytes,
            end_time 
        FROM 
            system.query.history h
        JOIN 
            (SELECT 
                 '%' || CONCAT(catalog_name, '.', schema_name, '.', table_name) || '%' AS pattern
            FROM 
                ds_training_1.ds_control.table_config) tc
        ON 
            h.statement_text LIKE tc.pattern
        WHERE 
            end_time BETWEEN date_sub(current_date, 7) AND current_date   
            
""")

# Normalize the specified metrics
for metric in ["total_duration_ms", "execution_duration_ms", "compilation_duration_ms", "read_rows", "produced_rows", "read_bytes"]:
    max_value = reqd_metrics.agg(F.max(metric).alias("max")).collect()[0]["max"]
    min_value = reqd_metrics.agg(F.min(metric).alias("min")).collect()[0]["min"]
    range_value = max_value - min_value
    reqd_metrics = reqd_metrics.withColumn(f"normalized_{metric}", (F.col(metric) - min_value) / range_value)

# Define a window specification to order by end_time to find the most recent execution of the query
window_spec = Window.partitionBy("statement_text").orderBy(F.desc("end_time"))

reqd_metrics_df = (
    reqd_metrics.withColumn("execution_status", F.first("execution_status").over(window_spec))
              .withColumn("error_message", F.first("error_message").over(window_spec))
              .withColumn("executed_by", F.first("executed_by").over(window_spec))
)
display(reqd_metrics_df)

# Calculate composite score using normalized values
composite_score_df = reqd_metrics_df.groupBy("statement_text").agg(
    F.avg(F.coalesce(F.col("normalized_total_duration_ms"), F.lit(1))).alias("normalized_total_duration_ms"),
    F.avg(F.coalesce(F.col("normalized_execution_duration_ms"), F.lit(1))).alias("normalized_execution_duration_ms"),
    F.avg(F.coalesce(F.col("normalized_compilation_duration_ms"), F.lit(1))).alias("normalized_compilation_duration_ms"),
    F.avg(F.coalesce(F.col("normalized_read_rows"), F.lit(0))).alias("normalized_read_rows"),
    F.avg(F.coalesce(F.col("normalized_produced_rows"), F.lit(0))).alias("normalized_produced_rows"),
    F.avg(F.coalesce(F.col("normalized_read_bytes"), F.lit(0))).alias("normalized_read_bytes"),
    F.first("execution_status").alias("execution_status"),
    F.first("error_message").alias("error_message"),
    F.first("executed_by").alias("executed_by")
).withColumn(
    "composite_score",
    (weights['Total Duration'] * F.col("normalized_total_duration_ms")) +
    (weights['Execution Duration'] * F.col("normalized_execution_duration_ms")) +
    (weights['Compilation Duration'] * F.col("normalized_compilation_duration_ms")) +
    (weights['Read Rows'] * F.col("normalized_read_rows")) +
    (weights['Produced Rows'] * F.col("normalized_produced_rows")) +
    (weights['Read Bytes'] * F.col("normalized_read_bytes"))
)
display(composite_score_df)

# Rank the queries based on composite score
ranked_df = composite_score_df.withColumn("rank", F.row_number().over(Window.orderBy(F.desc("composite_score"))))
display(ranked_df)

# Calculate min and max composite scores
min_max_scores = composite_score_df.agg(
    F.min("composite_score").alias("min_score"),
    F.max("composite_score").alias("max_score")
).collect()[0]

min_score = min_max_scores['min_score']
max_score = min_max_scores['max_score']

# Add a new column for normalized composite score
normalized_score_df = composite_score_df.withColumn(
    "normalized_composite_score",
    (F.col("composite_score") - min_score) / (max_score - min_score)
)

# Rank the queries based on normalized composite score
ranked_normalized_df = normalized_score_df.withColumn("rank", F.row_number().over(Window.orderBy(F.desc("normalized_composite_score"))))

display(ranked_normalized_df.select("statement_text", "executed_by",  "execution_status", "error_message","normalized_composite_score", "rank"))

In [0]:
'''
df = copied_df.select("statement_text", "executed_by", "execution_status", "error_message", "normalized_composite_score", "rank", "type_of_query")
df.write.saveAsTable("ds_training_1.ds_gold.bad_query_final")
'''

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Window

# variable to convert ms to s and bytes to megabytes
ms_to_s = 1000
b_to_mb = 1e6

reqd_metrics = f"""
        SELECT 
            statement_id,
            executed_by,
            statement_text,
            execution_status,
            error_message,
            total_duration_ms,
            execution_duration_ms,
            compilation_duration_ms,
            read_rows,
            produced_rows,
            read_bytes,
            end_time 
        FROM 
            system.query.history h
        JOIN 
            (SELECT 
                 '%' || CONCAT(catalog_name, '.', schema_name, '.', table_name) || '%' AS pattern
            FROM 
                ds_training_1.ds_control.table_config) tc
        ON 
            h.statement_text LIKE tc.pattern
        WHERE 
            end_time BETWEEN date_sub(current_date, {num_days}) AND current_date   
"""

# Assuming `spark` is your SparkSession and `num_days` is defined
reqd_metrics_df = spark.sql(reqd_metrics.format(num_days=num_days))

# Normalize the specified metrics
for metric in ["total_duration_ms", "execution_duration_ms", "compilation_duration_ms", "read_rows", "produced_rows", "read_bytes"]:
    max_value = reqd_metrics_df.agg(F.max(metric).alias("max")).collect()[0]["max"]
    min_value = reqd_metrics_df.agg(F.min(metric).alias("min")).collect()[0]["min"]
    range_value = max_value - min_value
    reqd_metrics_df = reqd_metrics_df.withColumn(f"normalized_{metric}", (F.col(metric) - min_value) / range_value)

# Define a window specification to order by end_time to find the most recent execution of the query
window_spec = Window.partitionBy("statement_text").orderBy(F.desc("end_time"))

reqd_metrics_df = (
    reqd_metrics_df.withColumn("execution_status", F.first("execution_status").over(window_spec))
              .withColumn("error_message", F.first("error_message").over(window_spec))
              .withColumn("executed_by", F.first("executed_by").over(window_spec))
)
display(reqd_metrics_df)

# Calculate composite score using normalized values
composite_score_df = reqd_metrics_df.groupBy("statement_text").agg(
    F.avg(F.coalesce(F.col("normalized_total_duration_ms"), F.lit(1))).alias("normalized_total_duration_ms"),
    F.avg(F.coalesce(F.col("normalized_execution_duration_ms"), F.lit(1))).alias("normalized_execution_duration_ms"),
    F.avg(F.coalesce(F.col("normalized_compilation_duration_ms"), F.lit(1))).alias("normalized_compilation_duration_ms"),
    F.avg(F.coalesce(F.col("normalized_read_rows"), F.lit(0))).alias("normalized_read_rows"),
    F.avg(F.coalesce(F.col("normalized_produced_rows"), F.lit(0))).alias("normalized_produced_rows"),
    F.avg(F.coalesce(F.col("normalized_read_bytes"), F.lit(0))).alias("normalized_read_bytes"),
    F.first("execution_status").alias("execution_status"),
    F.first("error_message").alias("error_message"),
    F.first("executed_by").alias("executed_by")
).withColumn(
    "composite_score",
    (weights['Total Duration'] * F.col("normalized_total_duration_ms")) +
    (weights['Execution Duration'] * F.col("normalized_execution_duration_ms")) +
    (weights['Compilation Duration'] * F.col("normalized_compilation_duration_ms")) +
    (weights['Read Rows'] * F.col("normalized_read_rows")) +
    (weights['Produced Rows'] * F.col("normalized_produced_rows")) +
    (weights['Read Bytes'] * F.col("normalized_read_bytes"))
)
display(composite_score_df)

# Rank the queries based on composite score
ranked_df = composite_score_df.withColumn("rank", F.row_number().over(Window.orderBy(F.desc("composite_score"))))
display(ranked_df)

# Calculate min and max composite scores
min_max_scores = composite_score_df.agg(
    F.min("composite_score").alias("min_score"),
    F.max("composite_score").alias("max_score")
).collect()[0]

min_score = min_max_scores['min_score']
max_score = min_max_scores['max_score']

# Add a new column for normalized composite score
normalized_score_df = composite_score_df.withColumn(
    "normalized_composite_score",
    (F.col("composite_score") - min_score) / (max_score - min_score)
)

# Rank the queries based on normalized composite score
ranked_normalized_df = normalized_score_df.withColumn("rank", F.row_number().over(Window.orderBy(F.desc("normalized_composite_score"))))

display(ranked_normalized_df.select("statement_text", "executed_by",  "execution_status", "error_message","normalized_composite_score", "rank"))