# Set Up MVAD in Synapse

In [1]:
%%configure -f
{
  "name": "synapseml",
  "conf": {
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.9.5-19-82d6b563-SNAPSHOT",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,io.netty:netty-tcnative-boringssl-static",
      "spark.yarn.user.classpath.first": "true"
  }
}

StatementMeta(, 59, -1, Finished, Available)

# Import Necessary Python Packages

In [2]:
import synapse.ml
import pyspark
import re
import ast
import pandas as pd
import numpy as np
import json
import http.client, urllib.request, urllib.parse, urllib.error, base64
from synapse.ml.cognitive import *
from pyspark.sql import functions as F
from sklearn.preprocessing import MinMaxScaler
from notebookutils import mssparkutils

StatementMeta(newSpark3, 59, 1, Finished, Available)

# Set Parameter Variables

In [3]:
start_time = None
end_time = None

scenario_name = None
adls_container = None

kusto_database = None
adx_table = None

adt_endpoint = None
customer_adt_query = None
relevant_metrics = None

kv_name = None

mvad_region = None

# Newest param
mvad_model_id = None
resampling_rate = None

StatementMeta(newSpark3, 59, 2, Finished, Available)

In [None]:
kv_linked_service = "ADT_AnomalyDetector_KeyVault"
mvad_kv_secret_name = "ad-poc"
adls_kv_connection_string_name = "adls-connection-string"
kusto_linked_service = "ADT_Data_History"

In [6]:
filter_metrics = "' or Key == '".join(relevant_metrics.split(","))

overall_query = "evaluate azure_digital_twins_query_request('" + adt_endpoint + "', ```" + customer_adt_query + "```)" + \
" | extend Id = tostring(tid) | join kind=inner (" + adx_table + ") on Id" + " | where Key == '" + filter_metrics + "'" 

timerange = "| where SourceTimeStamp between (datetime(" + inference_start + ") .. datetime(" + end_time + "))"
new_query = overall_query + timerange

StatementMeta(newSpark3, 59, 5, Finished, Available)

In [None]:
sc = SparkSession.builder.getOrCreate()
token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
anomalyKey = token_library.getSecret(kv_name, mvad_kv_secret_name, kv_linked_service)
connectionString = token_library.getSecret(kv_name, adls_kv_connection_string_name, kv_linked_service)

# Check MVAD Model Status

In [None]:
headers = {
    # Request headers
    'Ocp-Apim-Subscription-Key': anomalyKey,
}
params = urllib.parse.urlencode({})
conn = http.client.HTTPSConnection(mvad_region + ".api.cognitive.microsoft.com")
conn.request("GET", "/anomalydetector/v1.1-preview.1/multivariate/models/" + mvad_model_id + "?%s", "{body}", headers)
response = conn.getresponse()
data = response.read()
conn.close()

try:
    model_status = json.loads(data)['modelInfo']['status']
except Exception as e:
    mssparkutils.notebook.exit("Failure;Model does not exist [HaltSubsequentInferenceRuns]")

if model_status == "RUNNING":
    mssparkutils.notebook.exit("Success;Model training still in progress")
if model_status == "FAILED":
    exit_message = "Failure;" + json.loads(data)['modelInfo']['errors'][0]['code']+":"+json.loads(data)['modelInfo']['errors'][0]['message'] + " [HaltSubsequentInferenceRuns]"
    mssparkutils.notebook.exit(exit_message)


# Query and Data Preparation for MVAD Training

In [7]:
def query_adt_data(kusto_linked_service, kusto_database, query):
    """ Query ADT data and return the data as Spark dataframe
    :param kusto_linked_service: name of the ADX (historized data store) linked service registered in Synapse workspace
    :type: string

    :param kusto_database: ADX database name containing historized ADX data
    :type: string

    :param query: ADT-ADX joint query
    :type: string

    :return: dataframe containing queried data
    :type: Spark dataframe
    """
    df  = spark.read \
        .format("com.microsoft.kusto.spark.synapse.datasource") \
        .option("spark.synapse.linkedService", kusto_linked_service) \
        .option("kustoDatabase", kusto_database) \
        .option("kustoQuery", query) \
        .option("authType", "LS") \
        .load()
    return df

StatementMeta(newSpark3, 59, 6, Finished, Available)

In [None]:
def preprocess(purpose=None, \
               raw_df=None, \
               invalid_values=['None', 'NaN', 'NA', 'nan', '', ' ', -1], \
               resampling_rate='1min', \
               num_agg_fc='mean', \
               cat_agg_fc='mode', \
               missing_tolerance=1.0, \
               limit=None, \
               num_range_dic_train=None, \
               num_uniqueKeys_train=None, \
               cat_uniqueKeys_values_dic_train=None, \
               values_to_fill_for_inference=None) -> tuple or pd.DataFrame:
    """
    Implement data pre-processing for raw data from ADT-ADX cross query in training pipeline, according to the guidelines indicated in PR FAQ.

    Parameters
    ----------
    purpose : indicate the purpose of data pre-processing, one of 'training' or 'inference',
        str
    raw_df : raw training or inference data from ADX, with columns 'Id', 'ModelId', 'Key', 'Timestamp' and 'Value',
        Spark DataFrame
    invalid_values : list of entries that considered as invalid,
        list, default=['None', 'NaN', 'NA', 'nan', '', ' ', -1]
    resampling_rate : resampling rate of timestamp,
        str, default='1min'
    num_agg_fc : numerical variables aggregation function when resampling,
        str or function, default='mean'
    cat_agg_fc : categorical variables aggregation function when resampling,
        str or function, default='mode'
    missing_tolerance : tolerance of missing ratio for each variable, only columns with missing ratio not exceeding this threshold will be kept,
        float, default=1.0
    limit : max number of consecutive missings to fill,
        int
    num_range_dic_train, num_uniqueKeys_train, cat_uniqueKeys_values_dic_train, values_to_fill_for_inference : params only passed to inference which obtained from training,
        for details see Return below.

    Return
    ----------
    ret_df : pre-processed training or inference data,
        Spark DataFrame
    num_range_dic_train : dict of all numerical features' min and max summarized from the training dataset,
        dict (e.g. {'dtmi:syntheticfactory:feedmachine;1_C_Amps_Ia': {'min': 0.0, 'max': 100},
                    'dtmi:syntheticfactory:feedmachine;1_C_Amps_Ib': {'min': 0.0, 'max': 200},
                    'dtmi:syntheticfactory:feedmachine;1_C_Amps_Ic': {'min': 0.0, 'max': 300})
    num_uniqueKeys : list of numerical unique keys seen in the training dataset, each unique keys is formatted as 'ModelId_Id_Key',
        list of str (e.g. ['dtmi:syntheticfactory:feedmachine;1_C_Amps_Ia', 
                           'dtmi:syntheticfactory:feedmachine;1_C_Amps_Ib', 
                           'dtmi:syntheticfactory:feedmachine;1_C_Amps_Ic'])
    cat_uniqueKeys_values_dic : dict of all values seen in the training dataset for each categorical unique key,
        dict (e.g. {'dtmi:syntheticfactory:sourcemachine;1_A_PowerLevel': ['High', 'Low', 'Mid'],
                    'dtmi:syntheticfactory:sourcemachine;1_B_PowerLevel': ['High', 'Low', 'Mid']}})
    values_to_fill_for_inference : dict of default value to fill in during inference for each unique key,
        dict (e.g. {'dtmi:syntheticfactory:feedmachine;1_C_Amps_Ia': 0.0,
                    'dtmi:syntheticfactory:feedmachine;1_C_Amps_Ib': 0.0,
                    'dtmi:syntheticfactory:feedmachine;1_C_Amps_Ic': 0.0})
    """

    """
    Example input data for data pre-processing:
    index   Id                   ModelId                      Key                  Timestamp                      Value
    0       E     dtmi:syntheticfactory:feedmachine;1       Amps_Ic        2020-12-31 23:59:59.028164              0.0
    1       J     dtmi:syntheticfactory:feedmachine;1       Amps_Ib        2020-12-31 23:59:59.273131     0.0059823441449033355
    2       A     dtmi:syntheticfactory:sourcemachine;1     Amps_Ia        2020-12-31 23:59:59.285524     0.002924511047766594
    3       A     dtmi:syntheticfactory:sourcemachine;1    PowerLevel      2021-01-01 00:00:00.840780              Mid
    4       B     dtmi:syntheticfactory:sourcemachine;1    PowerLevel      2021-01-01 00:00:01.250716              Low
    ...    ...                     ...                        ...                     ...                          ...

    Example output of pre-processed data:
    index       timestamp        dtmi:syntheticfactory:sourcemachine;1_A_Amps_Ia    dtmi:syntheticfactory:sourcemachine;1_A_PowerLevel_High     ...
    0      2021-01-01 00:00:00                          0                                                      0                                ...
    1      2021-01-01 00:10:00                          0                                                      1                                ...
    2      2021-01-01 00:20:00                       0.000114                                                  1                                ...
    3      2021-01-01 00:30:00                       0.000097                                                  0                                ...
    4      2021-01-01 00:40:00                          0                                                      0                                ...
    """
    ## Step 1. Query & Basic Data Quality Checks
    # Step 1.1. Convert Spark to Pandas df, reformat the raw data with columns: 'timestamp', 'UniqueKey', 'value'
    print('Data Shape before pre-processing:', (raw_df.count(), len(raw_df.columns)))
    if (raw_df.count() == 0):
        print('Empty Dataset to pre-process.')
        return
    
    raw_df = raw_df.drop('TimeStamp')
    raw_df = raw_df.withColumnRenamed('SourceTimeStamp', 'TimeStamp')

    # Format timestamp to MVAD accepted format
    raw_df = raw_df.withColumn(
        'TimeStamp', 
        F.date_format(F.col('TimeStamp'), "yyyy-MM-dd'T'HH:mm:ss'Z'")
        )
    
    # Change non alphanumeric or _ characters to _ for ModelId and Id columns
    raw_df = raw_df.withColumn("ModelId", F.regexp_replace(F.col("ModelId"), "[^a-zA-Z0-9_]", "_"))
    raw_df = raw_df.withColumn("Id", F.regexp_replace(F.col("Id"), "[^a-zA-Z0-9_]", "_"))

    # Create UniqueKey column to identify unique key per ModelId, Id, Key combinations
    raw_df = raw_df.withColumn('UniqueKey', 
                    F.concat(F.col('ModelId'), F.lit('_'), F.col('Id'), F.lit('_'), F.col('Key')))

    # Select needed columns and make column names lowercase
    raw_df = raw_df.select("TimeStamp", "UniqueKey", "Value").withColumnRenamed("TimeStamp", "timestamp").withColumnRenamed("Value", "value")

    # Convert Spark to Pandas df
    raw_df = raw_df.toPandas()
    
    raw_df['timestamp'] = pd.to_datetime(raw_df['timestamp'], infer_datetime_format=True) 
    raw_df = raw_df[['timestamp', 'UniqueKey', 'value']] \
             .sort_values('timestamp').reset_index(drop=True)

    # Step 1.2. Drop duplicate rows in raw historized dataset
    df = raw_df.drop_duplicates()
    # Step 1.3. Data validity checks
    df = df[~df['value'].isin(invalid_values)].reset_index(drop=True)

    ## Step 2. Resampling (timestamp alignment) & Pivoting
    # Step 2.1. Timestamp binning
    df['timestamp'] = df['timestamp'].dt.round(resampling_rate)
    # Step 2.2. Table pivoting
    tmp_initial_twins = df.groupby('UniqueKey')['value'].apply(lambda x: list(x)[0]).reset_index()
    tmp_initial_twins_dic = dict(zip(tmp_initial_twins['UniqueKey'], tmp_initial_twins['value']))
    if purpose=='training':
        num_uniqueKeys, cat_uniqueKeys = [], []
        for k, v in tmp_initial_twins_dic.items():
            try:
                float(v)
                num_uniqueKeys.append(k)
            except:
                cat_uniqueKeys.append(k)
        num_uniqueKeys, cat_uniqueKeys = sorted(num_uniqueKeys), \
                                         sorted(cat_uniqueKeys)
    else:
        num_uniqueKeys_inf, cat_uniqueKeys_inf = [], []
        for k, v in tmp_initial_twins_dic.items():
            try:
                float(v)
                num_uniqueKeys_inf.append(k)
            except:
                cat_uniqueKeys_inf.append(k)
        num_uniqueKeys_inf, cat_uniqueKeys_inf = sorted(num_uniqueKeys_inf), \
                                                 sorted(cat_uniqueKeys_inf)
        num_uniqueKeys_unknown, num_uniqueKeys_missing = [col for col in num_uniqueKeys_inf if col not in num_uniqueKeys_train], \
                                                        [col for col in num_uniqueKeys_train if col not in num_uniqueKeys_inf]
        cat_uniqueKeys_train = sorted(list(cat_uniqueKeys_values_dic_train.keys()))
        cat_uniqueKeys_unknown, cat_uniqueKeys_missing = [col for col in cat_uniqueKeys_inf if col not in cat_uniqueKeys_train], \
                                                        [col for col in cat_uniqueKeys_train if col not in cat_uniqueKeys_inf]
        num_uniqueKeys, cat_uniqueKeys = [col for col in num_uniqueKeys_inf if col in num_uniqueKeys_train], \
                                         [col for col in cat_uniqueKeys_inf if col in cat_uniqueKeys_train]

    num_df, cat_df = df[df['UniqueKey'].isin(num_uniqueKeys)], \
                     df[df['UniqueKey'].isin(cat_uniqueKeys)]
    num_df['value'] = num_df['value'].astype(float)

    num_df_after_groupby = num_df.groupby(['timestamp', 'UniqueKey'])['value'].mean().reset_index() if num_agg_fc=='mean' \
                           else num_df.groupby(['timestamp', 'UniqueKey'])['value'].apply(num_agg_fc).reset_index()
    cat_df_after_groupby = cat_df.groupby(['timestamp', 'UniqueKey'])['value'].agg(lambda x: pd.Series.mode(x)[0]).reset_index() if cat_agg_fc=='mode' \
                           else cat_df.groupby(['timestamp', 'UniqueKey'])['value'].apply(cat_agg_fc).reset_index()
    df_after_groupby = pd.concat([num_df_after_groupby, cat_df_after_groupby])
    df_after_groupby = df_after_groupby.sort_values('timestamp').reset_index(drop=True)
    df_pivot = df_after_groupby.pivot(index='timestamp', columns='UniqueKey', values='value')
    df_pivot.columns.name = None
    df_pivot = df_pivot.sort_index().reset_index()

    if purpose=='inference':
        print(f'Inference - Unknown Numerical Keys and Dropped: {num_uniqueKeys_unknown}')
        print(f'Inference - Unknown Categorical Keys and Dropped: {cat_uniqueKeys_unknown}')
        print(f'Inference - Missing Numerical Keys: {num_uniqueKeys_missing}, Filled with {[values_to_fill_for_inference[col] for col in num_uniqueKeys_missing]}')
        print(f'Inference - Missing Categorical Keys: {cat_uniqueKeys_missing}, Filled with {[values_to_fill_for_inference[col] for col in cat_uniqueKeys_missing]}')
        for col in num_uniqueKeys_missing:
            df_pivot[col] = values_to_fill_for_inference[col]
            df_pivot[col] = df_pivot[col].astype(float)
        for col in cat_uniqueKeys_missing:
            df_pivot[col] = values_to_fill_for_inference[col]
        df_pivot = df_pivot[sorted(df_pivot.columns)]
        
        cat_uniqueKeys_values_dic_inf = {}
        cat_uniqueKeys_values_unknown_dic, cat_uniqueKeys_values_missing_dic = {}, {}
        for col in cat_uniqueKeys_train:
            cat_uniqueKeys_values_dic_inf[col] = sorted(df_pivot[col][df_pivot[col].notnull()].unique())
            cat_uniqueKeys_values_unknown_dic[col] = [v for v in cat_uniqueKeys_values_dic_inf[col] if v not in cat_uniqueKeys_values_dic_train[col]]
            cat_uniqueKeys_values_missing_dic[col] = [v for v in cat_uniqueKeys_values_dic_train[col] if v not in cat_uniqueKeys_values_dic_inf[col]]
            if cat_uniqueKeys_values_unknown_dic[col]!=[]:
                print(f"\nInference - Unknown Categorical Values for '{col}' and Ignored: {cat_uniqueKeys_values_unknown_dic[col]}")
                df_pivot[col] = df_pivot[col].replace(cat_uniqueKeys_values_unknown_dic[col], np.nan)

    # Step 2.3. Timestamp standardization
    missing_timestamp_df = pd.DataFrame(columns=df_pivot.columns)
    if df_pivot.shape[0]>1:   
        full_time_range = pd.date_range(df_pivot['timestamp'].iloc[0], \
                                        df_pivot['timestamp'].iloc[-1], \
                                        freq=resampling_rate)
        full_time_range.freq=None
        missing_timestamp_df['timestamp'] = [timestamp for timestamp in full_time_range \
                                            if timestamp not in df_pivot['timestamp'].values]
        df_pivot = pd.concat([df_pivot, missing_timestamp_df])
    df_pivot = df_pivot.sort_values('timestamp').set_index('timestamp')
    # Step 2.4. Calculate missing ratios for each feature
    if purpose=='training':
        missing_ratios = (df_pivot.isnull().sum()/df_pivot.shape[0]).sort_values(ascending=False)
        df_pivot = df_pivot[missing_ratios[missing_ratios<=missing_tolerance].index]
        df_pivot = df_pivot[sorted(df_pivot.columns)]

        num_uniqueKeys, cat_uniqueKeys = sorted([col for col in num_uniqueKeys if col in df_pivot.columns]), \
                                         sorted([col for col in cat_uniqueKeys if col in df_pivot.columns])
        values_to_fill_for_inference = {}
        for col in num_uniqueKeys:
            values_to_fill_for_inference[col] = df_pivot[col].median()
        for col in cat_uniqueKeys:
            values_to_fill_for_inference[col] = df_pivot[col].mode()[0]
        
        cat_uniqueKeys_values_dic = {}
        for cat_uniqueKey in cat_uniqueKeys:
            cat_uniqueKeys_values_dic[cat_uniqueKey] = sorted(df_pivot[cat_uniqueKey][df_pivot[cat_uniqueKey].notnull()].unique())

    ## Step 3. Data-preprocessing on pivoted and standardized table
    # Step 3.1. Handling missing NaN values
    for col in df_pivot:
        if col in num_uniqueKeys:
            df_pivot[col] = df_pivot[col].astype(float).interpolate(method='linear', limit=limit)
        df_pivot[col] = df_pivot[col].fillna(method='ffill', limit=limit).fillna(method='bfill', limit=limit)
    assert df_pivot.isnull().sum().sum()==0
    # Step 3.2. Normalization & Encoding
    num_df_pivot, cat_df_pivot = df_pivot.select_dtypes(include='float'), \
                                 df_pivot.select_dtypes(include='object')

    if purpose=='training':
        num_range_dic_train, num_min_dic, num_max_dic = dict(), \
                                                  dict(num_df_pivot.min()), \
                                                  dict(num_df_pivot.max())
        for k in num_min_dic:
            num_range_dic_train[k] = dict()
            num_range_dic_train[k]['min'], num_range_dic_train[k]['max'] = num_min_dic[k], \
                                                               num_max_dic[k]
    for col in num_df_pivot.columns:
        if num_range_dic_train[col]['max']==num_range_dic_train[col]['min']:
            num_df_pivot[col] = num_df_pivot[col]-num_range_dic_train[col]['min']
        else:
            num_df_pivot[col] = (num_df_pivot[col]-num_range_dic_train[col]['min'])/(num_range_dic_train[col]['max']-num_range_dic_train[col]['min']) 
    num_df_pivot = num_df_pivot.reset_index()
    
    if not cat_df_pivot.empty:
        cat_df_pivot = pd.get_dummies(cat_df_pivot)
    else:
        cat_df_pivot = pd.DataFrame(index=cat_df_pivot.index)
    cat_df_pivot = cat_df_pivot.reset_index()
    if purpose=='inference':
        for col in cat_uniqueKeys_train:
            if cat_uniqueKeys_values_missing_dic[col]!=[]:
                print(f"Inference - Missing Categorical Values for '{col}', Completed with: {cat_uniqueKeys_values_missing_dic[col]}")
                for missing_value in cat_uniqueKeys_values_missing_dic[col]:
                    new_dummy_col_name = col + '_' + missing_value
                    cat_df_pivot[new_dummy_col_name] = 0
    ret_df = num_df_pivot.merge(cat_df_pivot, on='timestamp', how='inner')
    ret_df = ret_df.sort_values('timestamp').reset_index(drop=True)
    ret_df = ret_df[sorted(ret_df.columns)]
    print(f'Data Shape after pre-processing: {ret_df.shape}')
    print(ret_df.head())

    ret_df['timestamp'] = ret_df['timestamp'].apply(lambda x: x.isoformat())

    # Convert dataframe to Spark
    ret_df = spark.createDataFrame(ret_df)
    ret_df = ret_df.withColumn('timestamp', F.concat_ws("",F.col("timestamp"),F.lit("Z")))
    
    if purpose=='training':
        return ret_df, num_range_dic_train, num_uniqueKeys, cat_uniqueKeys_values_dic, values_to_fill_for_inference
    else:
        return ret_df


In [10]:
try:
    df = query_adt_data(kusto_linked_service, kusto_database, new_query)
except Exception as e:
    mssparkutils.notebook.exit("Failure;Unable to query data from ADX. Full error log: " + str(e))

if (df.count() == 0):
    mssparkutils.notebook.exit("Failure;No Data Queried from ADX")

StatementMeta(newSpark3, 59, 9, Finished, Available)

In [None]:
try:
    # Check if training data preprocessing configurations are passed in as pipeline parameter
    if training_args is not None:
        train_args = ast.literal_eval(training_args)
    # Check if training data preprocessing configurations in Kusto if it's not passed in as pipeline parameter
    else:
        train_args = query_adt_data(kusto_linked_service, kusto_database, 'metadataTable | take 1 | where scenarioName == "' + scenario_name + '" | project additionalNote')
        train_args = ast.literal_eval(train_args.select('additionalNote').collect()[0][0])
except Exception:
    mssparkutils.notebook.exit("Failure;No training data preprocessing configurations")
    

In [None]:
try:
    to_save_df = preprocess(purpose='inference',
                    raw_df=df, \
                    resampling_rate=train_args['resampling_rate'], \
                    num_agg_fc=train_args['num_agg_fc'], \
                    cat_agg_fc=train_args['cat_agg_fc'], \
                    num_range_dic_train=train_args["num_range_dic_train"], \
                    num_uniqueKeys_train=train_args["num_uniqueKeys_train"], \
                    cat_uniqueKeys_values_dic_train=train_args["cat_uniqueKeys_values_dic_train"], \
                    values_to_fill_for_inference=train_args["values_to_fill_for_inference"])
except Exception as e:
    mssparkutils.notebook.exit("Failure;Data processing failure. Full error log: " + str(e))

if (to_save_df.count() == 0):
    mssparkutils.notebook.exit("Failure;No Data After Data Processing")

# MVAD Model Inference

In [13]:
#Specify information about your data.
timestampColumn = "timestamp" 
# inputColumns = train_args["num_uniqueKeys_train"] + list(train_args["cat_uniqueKeys_values_dic_train"].keys())
inputColumns = to_save_df.columns
inputColumns.remove("timestamp")

#Specify the container you created in Storage account. 
containerName = adls_container

#Set a folder name in Storage account to store the intermediate data. 
intermediateSaveDir = "mvad_temporary_files"


StatementMeta(newSpark3, 59, 12, Finished, Available)

In [25]:
try:
    retrievedModel = (DetectMultivariateAnomaly()
        .setSubscriptionKey(anomalyKey)
        .setLocation(mvad_region)
        .setStartTime(start_time)
        .setEndTime(end_time)
        .setContainerName(containerName)
        .setIntermediateSaveDir(intermediateSaveDir)
        .setInputCols(inputColumns)
        .setConnectionString(connectionString)
        .setModelId(mvad_model_id))
except Exception as e:
    mssparkutils.notebook.exit("Failure;Unable to create DetectMultivariateAnomaly object. Full error log: " + str(e))

StatementMeta(newSpark3, 59, 24, Finished, Available)

In [None]:
startInferenceTime = inference_start
endInferenceTime = end_time

try:
      result = (retrievedModel
            .setStartTime(startInferenceTime)
            .setEndTime(endInferenceTime)
            .setOutputCol("result")
            .setTimestampCol(timestampColumn)
            .setInputCols(inputColumns)
            .transform(to_save_df))
      display(result)
except Exception as e:
    mssparkutils.notebook.exit("Failure;Inference Failure. Full error log: " + str(e))



In [27]:
retrievedModel.cleanUpIntermediateData()

StatementMeta(newSpark3, 59, 26, Finished, Available)

In [None]:
result = result.filter(result.timestamp > start_time)

# convert timestamp column from string to timestamp datatype
result = result.withColumn(
        'timestamp', 
        F.date_format(F.col('timestamp'), "yyyy-MM-dd'T'HH:mm:ss'Z'")
        )
display(result)

# Write Result to ADX

In [None]:
# Write to ADX

try:
        sp = sc._jvm.com.microsoft.kusto.spark.datasink.SparkIngestionProperties(
                True, ["dropByTags"], ["ingestByTags"], ["tags"], ["ingestIfNotExistsTags"], None, None, None)
        result.write \
        .format("com.microsoft.kusto.spark.synapse.datasource") \
        .option("spark.synapse.linkedService", kusto_linked_service) \
        .option("kustoDatabase", kusto_database) \
        .option("kustoTable", scenario_name) \
        .option("sparkIngestionPropertiesJson", sp.toString()) \
        .option("tableCreateOptions","CreateIfNotExist") \
        .mode("Append") \
        .save()
except Exception as e:
        mssparkutils.notebook.exit("Failure;Unable to Write Inference Result to ADX. Full error log: " + str(e))


In [None]:
mssparkutils.notebook.exit("Success;Successful Inference")

StatementMeta(, , , Cancelled, )