# Standardize data for Parking Sensors

About:

- This notebook ingests data from the source needed by parking sensors sample. It then performs cleanup and standardization step. See [parking sensors page](https://github.com/Azure-Samples/modern-data-warehouse-dataops/tree/main/fabric/fabric_dataops_sample/README.md) for more details about Parking Sensor sample using Microsoft Fabric.

Assumptions/Pre-requisites:

- Currently there is a known issue running cross workspace queries when workspace name has special characters. See [schema limitation](https://learn.microsoft.com/en-us/fabric/data-engineering/lakehouse-schemas#public-preview-limitations) for more details. Avoid special characters if planning to query across workspaces with schema support. 
- All the assets needed are created by IaC step during migration.
    - Config file needed: Files/sc-adls-main/config/application.cfg (derived using application.cfg.template during ci/cd process). Ensure "standardize" section is updated with the required parameters for this notebook.
- All the required lakehouse schemas and tables are created by the nb-setup.ipynb notebook.
- Environment with common library otel_monitor_invoker.py and its associated python dependencies
- Parking Sensor Lakehouse
- Datasource: ADLS made available as a shortcut in Parking Sensor Lakehouse or Direct access to REST APIs.
- Monitoring sink: AppInsights
- Secrets repo: Key vault to store AppInsights connection information

- All Lakehouses have schema support enabled (in Public preview as of Nov, 2024).
- Execution
  - A default lakehouse is associated during runtime where the required files and data are already staged. Multiple ways of invoking:
    - [Api call](https://learn.microsoft.com/fabric/data-engineering/notebook-public-api#run-a-notebook-on-demand)
    - [Part of a data pipeline](https://learn.microsoft.com/fabric/data-engineering/author-execute-notebook#parameterized-session-configuration-from-a-pipeline)
    - [Using `%run` from another notebook](https://learn.microsoft.com/fabric/data-engineering/author-execute-notebook#reference-run-a-notebook)


## Parameters and Library imports

### Reading parameters (external from Fabric pipeline or default values)

In [None]:
import configparser
import logging
import os
from datetime import datetime
from typing import Any, Tuple

import ddo_transform_standardize as s
import otel_monitor_invoker as otel  # custom module part of env
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
    DataContextConfig,
    DatasourceConfig,
    FilesystemStoreBackendDefaults,
)
from opentelemetry.trace import SpanKind
from opentelemetry.trace.status import StatusCode
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType
from ruamel import yaml

In [None]:
# Unless `%%configure` is used to read external parameters - this cell should be the first one after imports

# This cell is tagged as Parameters cell. Parameters mentioned here are usually \
#    passed by the user at the time of notebook execution.
# Ref: https://learn.microsoft.com/fabric/data-engineering/notebook-public-api#run-a-notebook-on-demand

# Control how to run the notebook - "all" for entire notebook or "module" mode to use \
#      this notebook like a module (main execution will be skipped). Useful when performing
#      testing using notebooks or functions from this notebook need to be called from another notebook.

# execution_mode = "module" will skip the execution of the main function. Use it for module like treatment
#   "all" perform execution as well.
execution_mode = "all"
# Helpful if user wants to set a child process name etc. will be derived if not set by user
job_exec_instance = ""
# Helpful to derive any stage based globals
env_stage = "dev"
# Common config file path hosted on attached lakehouse - path relative to Files/
config_file_path = "sc-adls-main/config/application.cfg"

# Parameters from the pipeline
infilefolder = "2024_12_17_11_46_14"
load_id = "e8099e5c-16f6-4104-9009-814b75587d99"
# For local execution, it derives values from the runtime context.
# Or value passed from parameters from the pipeline during execution
runtime_context = notebookutils.runtime.context
workspace_id = runtime_context["currentWorkspaceId"]
lakehouse_id = runtime_context["defaultLakehouseId"]
workspace_name = runtime_context["currentWorkspaceName"]
lakehouse_name = runtime_context["defaultLakehouseName"]
local_mount_name = "/local_data"

In [None]:
# # only need to run when developing the notebook to format the code
# import jupyter_black

# jupyter_black.load()

In [None]:
# Validate input parameters
in_errors = []
if execution_mode not in ["all", "module"]:
    in_errors.append(f"Invalid value: {execution_mode = }. It must be either 'all' or 'module'.")
if not notebookutils.fs.exists(f"Files/{config_file_path}"):
    in_errors.append(f"Specified config - `Files/{config_file_path}` doesn't exist.")

if in_errors:
    raise ValueError(f"Input parameter validation failed. Errors are:\n{in_errors}")
else:
    print("Input parameter verification completed successfully.")

### File mounts

- Scope is set to Job/session - so these need to be run once per session


In [None]:
# Helps to read config files from onelake location
# Optionally this can be done in using %%configure.
local_mount = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files"
notebookutils.fs.mount(
    source=local_mount,
    mountPoint=local_mount_name,
    extraConfigs={"Scope": "job"},
)
local_data_mount_path = notebookutils.fs.getMountPath(local_mount_name)

In [None]:
## Temporary workaround. Remove when issue with OneLake authentication in Notebook is resolved.
file_path = f"{local_data_mount_path}/{config_file_path}"
os.listdir(f"{local_data_mount_path}/")
os.listdir(f"{local_data_mount_path}/sc-adls-main")
os.listdir(f"{local_data_mount_path}/sc-adls-main/config")
with open(file_path, "r") as file:
    file_content = file.read()

# print(file_content)

### Read user provided config values


In [None]:
config = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation())
config.read(f"{local_data_mount_path}/{config_file_path}")

In [None]:
# When we config parser if the value is not present in the specified section, it will be
#   read from "DEFAULT" section.
config_section_name = "standardize"
process_name = config.get(config_section_name, "process_name")
parking_ws = config.get(config_section_name, "workspace_name")
parking_ws_id = config.get(config_section_name, "workspace_id")
parking_lakehouse = config.get(config_section_name, "lakehouse_name")

# Add any other parameters that need to be read
landing_directory = config.get(config_section_name, "landing_directory")

### Internal (derived) parameters

In [None]:
# default is micro-seconds, changing to milli-seconds
current_ts = datetime.utcnow().strftime("%Y%m%d%H%M%S%f")[:-3]
job_exec_instance = job_exec_instance if job_exec_instance else f"{process_name}#{current_ts}"
execution_user_name = runtime_context["userName"]

# Add any other parameters needed by the process
loaded_on = datetime.now()

## Monitoring and observability

### AppInsights connection

In [None]:
connection_string = notebookutils.credentials.getSecret(
    config.get("keyvault", "uri"), config.get("otel", "appinsights_connection_name")
)
otlp_exporter = otel.OpenTelemetryAppInsightsExporter(conn_string=connection_string)

### Populate resource information

In [None]:
# Resource references
# - Naming conventions: https://opentelemetry.io/docs/specs/semconv/general/attribute-naming/
# - For a complete list of reserved ones: https://opentelemetry.io/docs/concepts/semantic-conventions/
#  NOTE: service.namespace,service.name,service.instance.id triplet MUST be globally unique.
#     The ID helps to distinguish instances of the same service that exist at the same time
#     (e.g. instances of a horizontally scaled service)
resource_attributes = {
    # ---------- Reserved attribute names
    "service.name": config.get(config_section_name, "service_name"),
    "service.version": config.get(config_section_name, "service_version"),
    "service.namespace": "parking-sensor",
    "service.instance.id": notebookutils.runtime.context["activityId"],
    "process.executable.name": process_name,
    "deployment.environment": env_stage,
    # ---------- custom attributes - we can also add common attributes like appid, domain id etc
    #     here or get them from process reference data using processname as the key.
    # runtime context has a lot if useful info - adding it as is.
    "jobexec.context": f"{notebookutils.runtime.context}",  # convert to string otherwise it will fail
    "jobexec.cluster.region": spark.sparkContext.getConf().get("spark.cluster.region"),
    "jobexec.app.name": spark.sparkContext.getConf().get("spark.app.name"),
    "jobexec.instance.name": job_exec_instance,
}

# Typically, logging is performed within the context of a span.
#   This allows log messages to be associated with trace information through the use of trace IDs and span IDs.
#   As a result, it's generally not necessary to include resource information in log messages.
# Note that trace IDs and span IDs will be null when logging is performed outside of a span context.
log_attributes = {"jobexec.instance.name": job_exec_instance}
trace_attributes = resource_attributes

tracer = otlp_exporter.get_otel_tracer(trace_resource_attributes=trace_attributes, tracer_name=f"tracer-{process_name}")
logger = otlp_exporter.get_otel_logger(
    log_resource_attributes=log_attributes,
    logger_name=f"logger-{process_name}",
    add_console_handler=False,
)
logger.setLevel("INFO")  # default is WARN

## Code

### Code functions

- When using %run we can expose these functions to the calling notebook.

In [None]:
def get_lakehouse_details(lakehouse_name: str) -> dict:
    logger.info("Performing lakehouse existence check.")
    try:
        # Checks only current workspace
        details = notebookutils.lakehouse.get(name=lakehouse_name)
    except Exception:
        logger.exception(f"Specified lakehouse - {lakehouse_name} doesn't exist. Aborting..")
        raise
    return details

In [None]:
def read_dataframe(schema: StructType, file_path: str, bad_records_file_path: str) -> DataFrame:

    df = (
        spark.read.schema(schema)
        .option("badRecordsPath", bad_records_file_path)
        .option("multiLine", True)
        .json(file_path)
    )

    return df


def standardize_parking_bay(
    file_path: str, bad_records_file_path: str, load_id: str, loaded_on: datetime
) -> Tuple[DataFrame, DataFrame]:

    schema = s.get_schema("in_parkingbay_schema")

    df = read_dataframe(schema, file_path, bad_records_file_path)

    # Standardize
    sdf, malformed_sdf = s.standardize_parking_bay(df, load_id, loaded_on)

    # Insert new rows
    sdf.write.format("delta").mode("append").save(f"{lh_table_path}/interim/parking_bay")

    # Insert bad rows
    malformed_sdf.write.format("delta").mode("append").save(f"{lh_table_path}/malformed/parking_bay")

    return sdf, malformed_sdf


def standardize_sensordata(
    file_path: str, bad_records_file_path: str, load_id: str, loaded_on: datetime
) -> Tuple[DataFrame, DataFrame]:
    schema = s.get_schema("in_sensordata_schema")

    df = read_dataframe(schema, file_path, bad_records_file_path)

    # Standardize
    sdf, malformed_sdf = s.standardize_sensordata(df, load_id, loaded_on)

    # Insert new rows
    sdf.write.format("delta").mode("append").save(f"{lh_table_path}/interim/sensor")

    # Insert bad rows
    malformed_sdf.write.format("delta").mode("append").save(f"{lh_table_path}/malformed/sensor")

    return sdf, malformed_sdf

In [None]:
def validate_parking_bay(parkingbay_sdf: DataFrame) -> Any:

    root_directory = f"{local_data_mount_path}/standardize_validation"

    # 1. Configure DataContext
    # https://docs.greatexpectations.io/docs/terms/data_context
    data_context_config = DataContextConfig(
        datasources={
            "parkingbay_data_source": DatasourceConfig(
                class_name="Datasource",
                execution_engine={"class_name": "SparkDFExecutionEngine", "force_reuse_spark_context": True},
                data_connectors={
                    "parkingbay_data_connector": {
                        "module_name": "great_expectations.datasource.data_connector",
                        "class_name": "RuntimeDataConnector",
                        "batch_identifiers": [
                            "environment",
                            "pipeline_run_id",
                        ],
                    }
                },
            )
        },
        store_backend_defaults=FilesystemStoreBackendDefaults(root_directory=root_directory),
    )
    context = BaseDataContext(project_config=data_context_config)
    # 2. Create a BatchRequest based on parkingbay_sdf dataframe.
    # https://docs.greatexpectations.io/docs/terms/batch
    batch_request = RuntimeBatchRequest(
        datasource_name="parkingbay_data_source",
        data_connector_name="parkingbay_data_connector",
        data_asset_name="paringbaydataaset",  # This can be anything that identifies this data_asset for you
        batch_identifiers={
            "environment": "stage",
            "pipeline_run_id": "pipeline_run_id",
        },
        runtime_parameters={"batch_data": parkingbay_sdf},  # Your dataframe goes here
    )
    # 3. Define Expecation Suite and corresponding Data Expectations
    # https://docs.greatexpectations.io/docs/terms/expectation_suite
    expectation_suite_name = "parkingbay_data_exception_suite_basic"
    context.create_expectation_suite(expectation_suite_name=expectation_suite_name, overwrite_existing=True)
    validator = context.get_validator(
        batch_request=batch_request,
        expectation_suite_name=expectation_suite_name,
    )
    # Add Validatons to suite
    # Check available expectations: validator.list_available_expectation_types()
    # https://legacy.docs.greatexpectations.io/en/latest/autoapi/great_expectations/expectations/index.html
    # https://legacy.docs.greatexpectations.io/en/latest/reference/core_concepts/expectations/standard_arguments.html#meta
    validator.expect_column_values_to_not_be_null(column="meter_id")
    validator.expect_column_values_to_not_be_null(column="marker_id")
    validator.expect_column_values_to_be_of_type(column="rd_seg_dsc", type_="StringType")
    validator.expect_column_values_to_be_of_type(column="rd_seg_id", type_="IntegerType")
    # validator.validate() # To run run validations without checkpoint
    validator.save_expectation_suite(discard_failed_expectations=False)
    # 4. Configure a checkpoint and run Expectation suite using checkpoint
    # https://docs.greatexpectations.io/docs/terms/checkpoint
    my_checkpoint_name = "Parkingbay Data DQ"
    checkpoint_config = {
        "name": my_checkpoint_name,
        "config_version": 1.0,
        "class_name": "SimpleCheckpoint",
        "run_name_template": "%Y%m%d-%H%M%S-my-run-name-template",
    }
    context.test_yaml_config(yaml.dump(checkpoint_config))
    context.add_checkpoint(**checkpoint_config)
    # Run Checkpoint passing in expectation suite.
    checkpoint_result = context.run_checkpoint(
        checkpoint_name=my_checkpoint_name,
        validations=[
            {
                "batch_request": batch_request,
                "expectation_suite_name": expectation_suite_name,
            }
        ],
    )

    return checkpoint_result

### Data Quality Metric Reporting
This parses the results of the checkpoint and sends it to AppInsights / Azure Monitor for reporting.

In [None]:
def data_quality_metric_reporting(checkpoint_result: Any, load_id: str) -> None:
    result_dic = checkpoint_result.to_json_dict()
    key_name = [key for key in result_dic["run_results"].keys()][0]
    results = result_dic["run_results"][key_name]["validation_result"]["results"]

    checks = {"check_name": checkpoint_result["checkpoint_config"]["name"], "pipelinerunid": load_id}
    for i in range(len(results)):
        validation_name = (
            results[i]["expectation_config"]["expectation_type"]
            + "_on_"
            + results[i]["expectation_config"]["kwargs"]["column"]
        )
        checks[validation_name] = results[i]["success"]

    properties = {"custom_dimensions": str(checks)}

    if checkpoint_result.success is True:
        logger.setLevel(logging.INFO)
        logger.info("verifychecks", extra=properties)
    else:
        logger.setLevel(logging.ERROR)
        logger.error("verifychecks", extra=properties)

In [None]:
def main() -> None:
    global lh_table_path, lh_file_path
    root_span_name = f"root#{process_name}#{current_ts}"

    with tracer.start_as_current_span(root_span_name, kind=SpanKind.INTERNAL) as root_span:
        try:
            root_span.add_event(
                name="010-verify-lakehouse",
                attributes={"lakehouse_name": parking_lakehouse},
            )
            # Note: The following command will still be good as this
            #  only retrieves the lakehouse in the CURRENT workspace
            #  regardless of the default lakehouse (with the same name from
            #  a different workspace) attached to the notebook.
            lh_details = get_lakehouse_details(parking_lakehouse)
            # Always use absolute paths when referring to Onelake locations
            lh_table_path = f'{lh_details["properties"]["abfsPath"]}/Tables'
            lh_file_path = f'{lh_details["properties"]["abfsPath"]}/Files'

            # print(lh_table_path, lh_file_path)
            landing_directory_path = f"{lh_file_path}/{landing_directory}/{infilefolder}"

            # Standardize Parking Bay Data
            parking_bay_file_path = f"{landing_directory_path}/parking_bay_data.json"
            parking_bay_bad_records_file_path = f"{landing_directory_path}/__corrupt/parking_bay_data"
            root_span.add_event(
                name="020-standardize-parking-bay",
                attributes={
                    "parking_bay_file_path": parking_bay_file_path,
                    "parking_bay_bad_records_file_path": parking_bay_bad_records_file_path,
                },
            )
            parking_bay_df, _ = standardize_parking_bay(
                parking_bay_file_path, parking_bay_bad_records_file_path, load_id, loaded_on
            )

            # Standardize Sensor Data
            sensor_file_path = f"{landing_directory_path}/parking_sensor_data.json"
            sensor_bad_records_file_path = f"{landing_directory_path}/__corrupt/parking_sensor_data"
            root_span.add_event(
                name="030-standardize-sensor",
                attributes={
                    "sensor_file_path": sensor_file_path,
                    "sensor_bad_records_file_path": sensor_bad_records_file_path,
                },
            )
            standardize_sensordata(sensor_file_path, sensor_bad_records_file_path, load_id, loaded_on)

            # Validate parking Bay Data
            checkpoint_result = validate_parking_bay(parking_bay_df)

            # Data Quality Report
            data_quality_metric_reporting(checkpoint_result, load_id)

        except Exception as e:
            error_message = f"{process_name} process failed with error {e}"
            logger.exception(error_message)
            root_span.set_status(StatusCode.ERROR, error_message)
            root_span.record_exception(e)
            raise
        else:
            root_span.set_status(StatusCode.OK)
            logger.info(f"{process_name} process is successful.")
        finally:
            logger.info(f"\n** {process_name} process is complete. Check the logs for execution status. **\n\n")

    return None

### Code execution

In [None]:
if execution_mode == "all":
    print(f"{execution_mode = }. Proceeding with the code execution.")
    main()
else:
    print(f"Skipping the main function execution as {execution_mode = } and running it like a code module.")

# Tables in AppInsights to view trace/log events
# dependencies
# | where name hasprefix "root#nb-020"
# //
# exceptions
# //
# traces