# Rule Based recommendation Method - Run Time Based Weighted Count
  * A rule based method for partition recommendation
  * Multiplies each operation with a weight (based on the run time of the belonging physical plan) and summs the results for each column
  * The **run time-weight** is based on how many seconds needed for the physical plan to be executed

In [0]:
import pyspark.sql.functions as F
import json
from datetime import timedelta, datetime
from pyspark.sql.types import (
    StructType,
    StructField,
    IntegerType,
    StringType,
    TimestampType,
    DoubleType,
    FloatType,
)

### 1. Inputs
* **toTime**: (yyyy-mm-dd HH:MM:SS) from when you want to start the relevant interval (last date)
* **interval**: (int/float) how many weeks (starting from toTime and goes backwards) you want to use data
* **min_weight**: (float) starting weight that will be used for physical plans executed in 0 seconds
* **max_weight**: (float) maximum weight that will be used for physical plans executed  >= **max_time**.
* **max_time**: (int) how many seconds needed to acheive the maximum weight

In [0]:
%run "./Validators"

In [0]:
datetime_format = "%Y-%m-%d %H:%M:%S"

dbutils.widgets.text(
    "to_time",
    "None",
    'toTime, End of interval, use "None" for time.now(), Format: yyyy-mm-dd HH:MM:SS',
)

dbutils.widgets.text(
    "interval", "4", "Size of interval (weeks) for how much data the methods will use"
)

dbutils.widgets.text(
    "max_weight", "2", "max_weight, The weight that will be used for the newest filters"
)

dbutils.widgets.text(
    "min_weight", "0", "min_weight, Weight that will be used for the oldest filters"
)

dbutils.widgets.text(
    "max_time", "10", "max_time in seconds. How many seconds needed to reach max_weight"
)

to_time = dbutils.widgets.get("to_time")
interval = dbutils.widgets.get("interval")
max_weight = dbutils.widgets.get("max_weight")
min_weight = dbutils.widgets.get("min_weight")
max_time = dbutils.widgets.get("max_time")

# Because of work flows
if to_time == "None":
    to_time = datetime.now()
else:
    # function from ./Validators notebook
    if validate_time_input(to_time, datetime_format):
        to_time = datetime.strptime(to_time, datetime_format)
    else:
        raise ValueError("inputted to_time is not on the right format")

assert validate_positive_number(
    interval, gt_zero=True
), "interval is not a valid number > 0"
assert validate_positive_number(
    max_weight, gt_zero=True
), "max_weight is not a valid number > 0"
assert validate_positive_number(
    min_weight, gt_zero=False
), "min_weight input is not a a valid number >= 0"
assert validate_positive_number(
    max_time, gt_zero=True
), "max_time input is not a a valid number > 0"

interval = float(interval)
max_weight = float(max_weight)
min_weight = float(min_weight)
max_time = float(max_time)

from_time = to_time - timedelta(weeks=interval)

#### Tables:
* method_runs
* method_results
* method_recommendations

Are already created in the **SetupTables** notebook

In [0]:
# Schema used to insert into the method_runs table
method_runs_schema = StructType(
    [
        StructField("runId", IntegerType(), nullable=True),
        StructField("methodName", StringType(), nullable=True),
        StructField("params", StringType(), nullable=True),
        StructField("fromTime", TimestampType(), nullable=True),
        StructField("toTime", TimestampType(), nullable=True),
        StructField("whenRun", TimestampType(), nullable=True),
    ]
)

### 2. Preprocessing
##### Fetch the data from the operations table and perform the following steps:
* Only fetch operations from within the **from_time** and **to_time** input parameters
* Ignore the "Filter" rows as columns in here seems to already be stored under the "PushedFilters" rows
* Remove rows with NA values and empty strings (may have happended when extracting the table and database name)

In [0]:
# Convert from and to-time to the same format as the table
from_time_timestamp = int(from_time.timestamp() * 1000)
to_time_timestamp = int(to_time.timestamp() * 1000)

# Only get data between from and to-date
# Remove rows where operation name isn't Filter (end up with PartitionFilters and PushedFilters)
operations = (
    spark.sql("SELECT * FROM operations")
    .filter(F.col("timeGenerated").between(from_time_timestamp, to_time_timestamp))
    .filter(F.col("operationName") != "Filter")
)

# Remove empty rows
operations = operations.select(
    [
        F.when(operations[col] == "", None).otherwise(operations[col]).alias(col)
        for col in operations.columns
    ]
).dropna(how="any")

In [0]:
# This code is responsible for creating a dataframe where you can check whether a table is partitioned on a particular column
# Used as a lookup before writing results to tables after having run some of the methods

# Group by databaseName, tableName, columnName and find the max timeGenerated
max_time_df = operations.groupBy("databaseName", "tableName", "columnName").agg(
    F.max("timeGenerated").alias("maxTimeGenerated")
)


# Join the max_time_df with the original dataframe
max_time_df = (
    operations.join(max_time_df, ["databaseName", "tableName", "columnName"])
    .filter(F.col("timeGenerated") == F.col("maxTimeGenerated"))
    .groupBy("databaseName", "tableName", "columnName")
    .agg(
        F.first(F.col("operationName")).alias("operationName"),
    )
)


is_partitioned = max_time_df.withColumn(
    "isPartitioned",
    F.when(F.col("operationName") == "PartitionFilters", True).otherwise(False),
).drop("operationName")


display(is_partitioned)

### 3. Method - Run Time Weighted Counts
For each databse, table, column; check how often the column is used for filtering (PartitionFilters, Filter), count the occurences
##### parameters:
* **max_weight**: (float) the weight that will be used for physical plans run >= **max_time** seconds
* **min_weight**: (float) the weight that will be used for physical plans run in 0 seconds
* **max_time**: (float) number of seconds needed for the physical plan to reach **max_weight**
* **windowStart**: (int - unix_ms) window start of which data to be used
* **windowEnd**: (int - unix_ms) window end of which data to be used
* **windowSize**: (float) number of weeks of which the interval spans


##### metadata of method:
* **whenRun**: (timestamp) when the method is ran

In [0]:
runId = 1
# update runId if there is already a max_id in the method_runs table
max_id = spark.sql("SELECT MAX(runId) AS max_id FROM method_runs").collect()[0][
    "max_id"
]
if max_id is not None:
    runId = max_id + 1
print(f"runId: {runId}")

# No parameters for this method
params = {
    "max_weight": max_weight,
    "min_weight": min_weight,
    "max_time": max_time,
    "windowStart": from_time_timestamp,
    "windowEnd": to_time_timestamp,
    "windowSize": interval,
}

metadata = {
    "whenRun": datetime.now(),
}

runId: 4


In [0]:
method_run_info = {
    "runId": runId,
    "methodName": "runTimeWeightedCount",
    "params": json.dumps(params) if params else "",
    "fromTime": from_time,
    "toTime": to_time,
    "whenRun": metadata["whenRun"],
}

method_run = spark.createDataFrame([method_run_info], schema=method_runs_schema)
display(method_run)

runId,methodName,params,fromTime,toTime,whenRun
4,runTimeWeightedCount,"{""max_weight"": 2.0, ""min_weight"": 0.0, ""max_time"": 10.0, ""windowStart"": 1680723700217, ""windowEnd"": 1683142900217, ""windowSize"": 4.0}",2023-04-05T19:41:40.217+0000,2023-05-03T19:41:40.217+0000,2023-05-03T19:41:41.310+0000


### 3.1 Joining operations table with queries table

Need to perform this join in order to get the run time duration of the execution plans mapped to each of the physical plan's belonging operations

In [0]:
run_times = spark.sql("select physicalPlanKey, duration_ms from queries")
operations = operations.join(run_times, on='physicalPlanKey')

### 3.2 Creating Weight Column Based on Physical Plan Execution Time

In [0]:
max_time_ms = max_time * 1000

run_time_weight = F.when(
    F.col("duration_ms") >= max_time_ms, max_weight
).otherwise(
    F.when(F.col("duration_ms") <= 0, min_weight).otherwise(
        (
            F.col("duration_ms") / max_time_ms
        )
        * (max_weight - min_weight)
        + min_weight
    )
)

In [0]:
weighted_operations = operations.withColumn("weight", run_time_weight.cast("float"))
display(weighted_operations)

physicalPlanKey,columnName,databaseName,executionId,operationName,eventlogKey,tableName,timeGenerated,operationId,duration_ms,weight
-2094219702,countryName,default,62,PushedFilters,603474611,monthly_global_oil_demand_forecast_countries_v0r0,1680775208774,34359738379,923.0,0.1846
-2094219702,countryName,default,62,PushedFilters,603474611,monthly_global_oil_demand_forecast_countries_v0r42,1680775208774,34359738378,923.0,0.1846
-1970774078,clusterInstanceID,default,363,PushedFilters,2058828336,eventlog_raw,1681298375870,34359738438,6829.0,1.3658
-1970774078,lastModified,default,363,PushedFilters,2058828336,eventlog_raw,1681298375870,34359738437,6829.0,1.3658
-1970774078,sparkContextID,default,363,PushedFilters,2058828336,eventlog_raw,1681298375870,34359738436,6829.0,1.3658
-1970774078,clusterInstanceID,default,363,PushedFilters,2058828336,eventlog_raw,1681298375870,34359738435,6829.0,1.3658
-1970774078,lastModified,default,363,PushedFilters,2058828336,eventlog_raw,1681298375870,34359738434,6829.0,1.3658
-1970774078,sparkContextID,default,363,PushedFilters,2058828336,eventlog_raw,1681298375870,34359738433,6829.0,1.3658
-1799015525,lastModified,default,231,PushedFilters,1620025786,eventlog_raw,1681208470750,34359738466,15190.0,2.0
-1799015525,lastModified,default,231,PushedFilters,1620025786,eventlog_raw,1681208470750,34359738465,15190.0,2.0


### 3.3 Aggregate Weighted Operations

In [0]:
method_output = (
    weighted_operations.groupBy(
        "databaseName", "tableName", "columnName"
    )
    .agg(F.sum("weight").alias("weightedSum"))
    .orderBy("databaseName", "tableName", "columnName")
)

method_output.show()

+------------+--------------------+-----------------+----------------+-------------------+
|databaseName|           tableName|       columnName|   operationName|        weightedSum|
+------------+--------------------+-----------------+----------------+-------------------+
|     default|        eventlog_raw|clusterInstanceID|   PushedFilters|  921.7923990190029|
|     default|        eventlog_raw|      eventlogKey|   PushedFilters|  50.59439994394779|
|     default|        eventlog_raw|     lastModified|   PushedFilters| 1350.7943984735757|
|     default|        eventlog_raw|   sparkContextID|   PushedFilters|  871.9183990061283|
|     default|method_reccomenda...|        tableName|   PushedFilters|0.10500000091269612|
|     default|      method_results|        tableName|   PushedFilters| 0.1356000006198883|
|     default|monthly_global_oi...|      countryName|   PushedFilters|  2.074199967086315|
|     default|monthly_global_oi...|      countryName|   PushedFilters|  2.074199967086315|

### 3.4 Create Columns Needed for the method_results Table

In [0]:
method_results = (
    method_output.withColumn("methodValue", F.col("weightedSum"))
    .withColumn("methodValue", F.col("methodValue").cast("float"))
).drop("occurrences")

# join on is_partitioned dataframe to check if columns is partitioned or not
# note: this is based on the last occurence of the filter in the operations table
method_results = method_results.join(
    is_partitioned, on=["databaseName", "tableName", "columnName"]
)

method_results = method_results.withColumn("runId", F.lit(runId)).select(
    "databaseName",
    "tableName",
    "columnName",
    "methodValue",
    "isPartitioned",
    "runId",
)

method_results.show()

+------------+--------------------+-----------------+-----------+-------------+-----+
|databaseName|           tableName|       columnName|methodValue|isPartitioned|runId|
+------------+--------------------+-----------------+-----------+-------------+-----+
|     default|        eventlog_raw|clusterInstanceID|   921.7924|        false|    4|
|     default|        eventlog_raw|      eventlogKey|    50.5944|        false|    4|
|     default|        eventlog_raw|     lastModified|  1350.7944|        false|    4|
|     default|        eventlog_raw|   sparkContextID|   871.9184|        false|    4|
|     default|method_reccomenda...|        tableName|0.105000004|        false|    4|
|     default|      method_results|        tableName|     0.1356|        false|    4|
|     default|monthly_global_oi...|      countryName|     2.0742|        false|    4|
|     default|monthly_global_oi...|      countryName|     2.0742|        false|    4|
|     default|monthly_global_oi...|         etl_year| 

### 3.5 Find recommendations Based on the Column with the Highest methodValue per db-table

In [0]:
method_recommendations = (
    method_results.groupBy("databaseName", "tableName")
    .agg(
        F.max(F.struct("methodValue", "columnName", "isPartitioned")).alias(
            "max_methodValue_colName_isPartitioned"
        )
    )
    .select(
        "databaseName",
        "tableName",
        "max_methodValue_colName_isPartitioned.columnName",
        "max_methodValue_colName_isPartitioned.methodValue",
        "max_methodValue_colName_isPartitioned.isPartitioned",
    )
    .withColumn("runId", F.lit(runId))
    .withColumnRenamed("max_methodValue_colName_isPartitioned.columnName", "columnName")
    .withColumnRenamed(
        "max_methodValue_colName_isPartitioned.methodValue", "methodValue"
    )
    .withColumnRenamed(
        "max_methodValue_colName_isPartitioned.isPartitioned", "isPartitioned"
    )
)

method_recommendations.show(truncate=False)

+------------+--------------------------------------------------+---------------+-----------+-------------+-----+
|databaseName|tableName                                         |columnName     |methodValue|isPartitioned|runId|
+------------+--------------------------------------------------+---------------+-----------+-------------+-----+
|default     |eventlog_raw                                      |lastModified   |1350.7944  |false        |4    |
|default     |method_reccomendations                            |tableName      |0.105000004|false        |4    |
|default     |method_results                                    |tableName      |0.1356     |false        |4    |
|default     |monthly_global_oil_demand_forecast_countries_v0r0 |countryName    |2.0742     |false        |4    |
|default     |monthly_global_oil_demand_forecast_countries_v0r42|countryName    |2.0742     |false        |4    |
|default     |operations                                        |timeGenerated  |42.0568

### 4. Save Data to Tables

In [0]:
# write method_runs information
method_run.write.format("delta").mode("append").saveAsTable("method_runs")

# write method_results information
method_results.write.format("delta").mode("append").saveAsTable("method_results")

# write method_recommendation information
method_recommendations.write.format("delta").mode("append").saveAsTable(
    "method_recommendations"
)

In [0]:
%sql
select
  *
from
  method_runs

runId,methodName,params,fromTime,toTime,whenRun
4,runTimeWeightedCount,"{""max_weight"": 2.0, ""min_weight"": 0.0, ""max_time"": 10.0, ""windowStart"": 1680723700217, ""windowEnd"": 1683142900217, ""windowSize"": 4.0}",2023-04-05T19:41:40.217+0000,2023-05-03T19:41:40.217+0000,2023-05-03T19:41:41.310+0000
3,stepwiseWeightedCount,"{""num_steps"": 4, ""weights"": [0.25, 0.5, 0.75, 1.0], ""windowStart"": 1680704156691, ""windowEnd"": 1683123356691, ""windowSize"": 4.0}",2023-04-05T14:15:56.691+0000,2023-05-03T14:15:56.691+0000,2023-05-03T14:15:57.747+0000
2,weightedCount,"{""max_weight"": 2.0, ""min_weight"": 0.0, ""windowStart"": 1680704130970, ""windowEnd"": 1683123330970, ""windowSize"": 4.0}",2023-04-05T14:15:30.970+0000,2023-05-03T14:15:30.970+0000,2023-05-03T14:15:32.079+0000
1,simpleCount,"{""windowStart"": 1680704113431, ""windowEnd"": 1683123313431, ""windowSize"": 4.0}",2023-04-05T14:15:13.431+0000,2023-05-03T14:15:13.431+0000,2023-05-03T14:15:14.516+0000


In [0]:
%sql
select
  *
from
  method_results

runId,databaseName,tableName,columnName,methodValue,isPartitioned
1,default,eventlog_raw,clusterInstanceID,589.0,False
1,default,eventlog_raw,eventlogKey,44.0,False
1,default,eventlog_raw,lastModified,893.0,False
1,default,eventlog_raw,sparkContextID,560.0,False
1,default,method_reccomendations,tableName,3.0,False
1,default,method_results,tableName,3.0,False
1,default,monthly_global_oil_demand_forecast_countries_v0r0,countryName,4.0,False
1,default,monthly_global_oil_demand_forecast_countries_v0r42,countryName,4.0,False
1,default,monthly_global_oil_demand_forecast_countries_v0r42,etl_year,2.0,True
1,default,operations,columnName,3.0,False


In [0]:
%sql
select
  *
from
  method_recommendations

runId,databaseName,tableName,columnName,methodValue,isPartitioned
4,default,eventlog_raw,lastModified,1350.7944,False
4,default,method_reccomendations,tableName,0.105000004,False
4,default,method_results,tableName,0.1356,False
4,default,monthly_global_oil_demand_forecast_countries_v0r0,countryName,2.0742,False
4,default,monthly_global_oil_demand_forecast_countries_v0r42,countryName,2.0742,False
4,default,operations,timeGenerated,42.0568,False
4,default,physical_plan_keys,physicalPlanKey,131.0918,False
4,default,plant_v0r44,UP_DATE,1.9258001,True
4,default,queries,physicalPlanKey,96.6222,False
2,default,eventlog_raw,lastModified,704.8151,False
