## Load context and EventsPersistenceManager

In [11]:
from custom_events_lib.context import Context
from custom_events_lib.events_persistance_manager import EventsPersistenceManager
from custom_events_lib.utils import get_package_version


context = Context(
    activity_name = f"{mssparkutils.runtime.context['currentNotebookName']}_{mssparkutils.runtime.context['activityId']}",
    user_name = mssparkutils.env.getUserName(),
    # any other useful environment information could be specified here
    environment={
        "notebook_name": mssparkutils.runtime.context['currentNotebookName'], 
        "is_for_pipeline": str(mssparkutils.runtime.context['isForPipeline']),
        "custom_python_libs_version": get_package_version("custom_python_libs"),
        "department": "ABCD"
        }
    )

events_persistance_manager = EventsPersistenceManager()
custom_events_table = "custom_events"

StatementMeta(, 7713a8a6-16ab-4eee-ac45-cbca41700ce8, 15, Finished, Available)

## Find and save MissingData events with identifiers

In [12]:
from custom_events_lib.utils import find_missing_values
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType


# create df with nulls
data = [
    Row(None, "abc", 123, "aa"),
    Row(222, "def", 456, "aa"),
    Row(111, "ghi", 789, "aa"),
    Row(None, "jkl", 101, "aa"),
]

my_data_schema = StructType([
    StructField("column_with_nulls", IntegerType(), True),
    StructField("id_1", StringType(), True),
    StructField("id_2", StringType(), True),
    StructField("ignored_column", StringType(), True),
])
df_with_nulls = spark.createDataFrame(data, my_data_schema)
display(df_with_nulls)


# specify in which column nulls should be found, and then which identifers should be saved
missing_values_df = find_missing_values(df_with_nulls, "column_with_nulls", "id_1", "id_2")
display(missing_values_df)

events_persistance_manager.save_missing_data_events(spark, missing_values_df, context, custom_events_table)


StatementMeta(, 7713a8a6-16ab-4eee-ac45-cbca41700ce8, 16, Finished, Available)

SynapseWidget(Synapse.DataFrame, 8a67c56f-ae13-4bcf-a743-1d2e891eb525)

SynapseWidget(Synapse.DataFrame, 11156752-2413-443d-92ef-d98ab15fc3c6)

## Save custom events


In [13]:
from datetime import datetime
import pytz

# create a data frame that follows the event_schema
data = [
    Row("MyEvent1", {"foo": "bar"}, datetime.now(pytz.utc)),
    Row("InvalidData", {"my_identifier_1": 1234}, datetime.now(pytz.utc)),
]


df = spark.createDataFrame(data, events_persistance_manager.event_schema)
display(df)

events_persistance_manager.save_events(df, context, custom_events_table)

StatementMeta(, 7713a8a6-16ab-4eee-ac45-cbca41700ce8, 17, Finished, Available)

SynapseWidget(Synapse.DataFrame, 2d92ccf0-0b87-40e1-9fc9-d8d02ffb3925)

## Save exception event

In [14]:
from custom_exceptions_lib.exceptions import DeltaTableWriteException
import logging


try:
    logging.warning("About to raise an exception")
    raise DeltaTableWriteException("Testing exception")
except DeltaTableWriteException as ex:
    logging.error(f"Got exception: {ex}")
    events_persistance_manager.save_exception_event(spark, ex, context, custom_events_table)
    # After saving, re-raise the exception to make it visible 
    raise ex



StatementMeta(, 7713a8a6-16ab-4eee-ac45-cbca41700ce8, 18, Finished, Available)

About to raise an exception
Got exception: Testing exception


DeltaTableWriteException: Testing exception

## Display custom_events table content

In [15]:
df_table = spark.table(custom_events_table)
display(df_table)


StatementMeta(, 7713a8a6-16ab-4eee-ac45-cbca41700ce8, 19, Finished, Available)

SynapseWidget(Synapse.DataFrame, 92528a77-a359-49f1-8c10-745dae3dc964)