# Data Standardization and Merging Pipeline
This notebook standardizes and merges data for the Triton Flow Plans dataset, performing the following key steps:
- **Configuration and Environment Setup**: Initializes and manages settings.
- **Data Validation and Loading**: Reads and verifies JSON schemas and source data.
- **Flattening and Transforming Data**: Flattens nested JSON structures.
- **Data Quality Checks**: Ensures data integrity by detecting duplicates.
- **Temporary View Creation**: Takes the most recent data version if multiple files are read.
- **Delta Table Management**: Creates and manages Delta tables, merging data when needed.
- **Feedback Timestamp Generation**: Tracks data processing intervals for auditing.


## Widget List
The notebook uses the following widgets for dynamic configuration:
- **`SourceStorageAccount`**: Source storage account name.
- **`DestinationStorageAccount`**: Destination storage account name.
- **`SourceContainer`**: Container storing source data.
- **`SourceDatasetidentifier`**: Identifier for the dataset being processed.
- **`DepthLevel`**: The depth level for flattening nested structures.


In [ ]:
%pip install git+https://github.com/Open-Dataplatform/utils-databricks.git@v0.6.0

In [ ]:
from pyspark.sql import SparkSession

# Importing functions from the custom utility package
from custom_utils import dataframe, helper
from custom_utils.dp_storage import reader, writer, initialize_config, table_management, merge_management, feedback_management, quality
from custom_utils.validation import PathValidator
from pyspark.sql.utils import AnalysisException

## Setup
### Configuration Handling with `initialize_config`
The `initialize_config` function handles dynamic configuration management by pulling values from widgets and ADF pipelines. This setup allows the notebook to adapt to different environments seamlessly, configuring paths, environments, and dataset settings based on input parameters.

In [ ]:
# Initialize configuration and helper objects
config = initialize_config(dbutils, helper, '<source_environment>', '<destination_environment>', '<source_container>', '<source_datasetidentifier>')
spark = config.spark_session
config.unpack(globals())
config.print_params()

## Get Parameters
This step retrieves critical parameters from widgets or ADF, ensuring dynamic behavior and adaptability across different environments.

## Verify Paths and Files
The notebook validates and verifies paths for schema and data files. This step ensures that all required files are present before proceeding with further processing.

In [ ]:
# Validate paths and files
validator = PathValidator(config)
schema_file_path, data_file_path, file_type = validator.verify_paths_and_files()

## Read Data
This section reads the JSON schema and loads the source data. The schema is converted to PySpark's `StructType`, allowing easy validation and transformation of nested data structures.

In [ ]:
# Read and parse the JSON content using schema
schema_json, spark_schema = reader.json_schema_to_spark_struct(schema_file_path)
df_raw = reader.read_json_from_binary(spark, spark_schema, data_file_path)
display(df_raw)

## Flattening and Processing Nested JSON Data
The `process_and_flatten_json` function orchestrates data flattening. It determines the schema depth level and applies appropriate flattening rules while handling nested structures, arrays, and objects.

### Explanation of Key Functions:
- **`flatten_df`**: Recursively flattens nested data structures up to a specified depth.
- **`get_json_depth`**: Determines the maximum depth of nested JSON structures, which is key for configuring flattening behavior.
- **`rename_and_cast_columns`**: Handles renaming and casting columns to the appropriate types, avoiding conflicts with reserved SQL keywords.

In [ ]:
# Flatten and standardize the DataFrame
df, df_flattened, columns_of_interest, view_name = dataframe.process_and_flatten_json(
    spark=spark,
    config=config,
    schema_file_path=schema_file_path,
    data_file_path=data_file_path,
    helper=helper
)

# Rename 'Timestamp' to 'EventTimestamp' to avoid SQL conflicts and cast it to timestamp
if 'Timestamp' in df_flattened.columns:
    df_flattened = dataframe.rename_and_cast_columns(
        df_flattened,
        column_mapping={'Timestamp': 'EventTimestamp'},
        cast_type_mapping={'EventTimestamp': 'timestamp'},
    )
display(df_flattened)

## Quality Check - Abort if Duplicates Exist in New Data
This step ensures that no duplicate records are introduced when ingesting new data. It checks key columns for duplicates and aborts the process if any are found, ensuring data integrity.

In [ ]:
# Perform the quality check
quality.perform_quality_check(spark=spark, key_columns=key_columns, view_name=view_name, helper=helper)

## Taking Most Recent Version of Data if Multiple Files Are Read
This step creates a temporary view based on the most recent version of records. By specifying key columns and ordering criteria, the notebook ensures that only the latest version of records is processed.

In [ ]:
# Define the columns used for ordering if multiple files are read
order_by_columns = ["input_file_name DESC", "EventTimestamp DESC"]

# Create a temporary view with the most recent records
dataframe.create_temp_view_with_most_recent_records(
    spark=spark,
    view_name=view_name,
    key_columns=key_columns,
    columns_of_interest=columns_of_interest,
    order_by_columns=order_by_columns,
    helper=helper
)

## Write - Create Target Table If Not Exists
This code block manages the creation of a Delta table if it doesn’t already exist. The logic handles checking table existence and executing SQL queries to create it only if needed.

In [ ]:
# Manage table creation if it does not exist
table_management.manage_table_creation(
    spark=spark,
    destination_environment=destination_environment,
    source_datasetidentifier=source_datasetidentifier,
    helper=helper
)

## Merging Data - Update if Exists, Insert Otherwise
This block handles merging new data into an existing Delta table. The process identifies matching records for updates and inserts new records otherwise. Delta table versions are tracked before and after the merge to monitor changes.

In [ ]:
# Manage data merge
merge_management.manage_data_merge(
    spark=spark,
    destination_environment=destination_environment,
    source_datasetidentifier=source_datasetidentifier,
    view_name=view_name,
    key_columns=key_columns,
    helper=helper
)

## Return Period (from_datetime, to_datetime) Covered by Data Read
The final step generates feedback timestamps to track the periods covered by the ingested data. This is crucial for auditing and monitoring the data pipeline’s performance.

In [ ]:
# Generate feedback timestamps
feedback_management.generate_feedback_timestamps(
    spark=spark,
    view_name=view_name,
    feedback_column=feedback_column,
    dbutils=dbutils,
    helper=helper
)

## Notebook Completion
The notebook exits with the final feedback output, signaling successful processing.

In [ ]:
# Exit the notebook with success message
dbutils.notebook.exit("Notebook completed successfully.")