# Events silver

Checks to be performed:
- Primary key is unique
- Foreign key refers to existing team
- Time information can be correctly converted to Timestamp type
- Logic checks (start date is not after end data, coordinates are not half-inserted)

Enrichment: 
- Date information extracted from start/end timestamp.
- Place information extracted by latitude and longitude

Data is to be stored in SCD2 table; the assumption here (simplicistic, but just to give a sample of data writing), is that if a line gets updated in the source database it a new copy of it will be created, so it is possible to keep full track of how and how many times an event has been updated.

N.B. In UC primary and foreign keys can also be enforced directly on the tables, but then the write operation fails or succeed altogether, with this approach it would be possible to identify single rows that dont satisfy the conditions; the natural evolution of this could be a DLT implementation. As mentioned in the data quality module, the next improvement for this would be using DBX programmaticaly.

In [0]:
pip install geopy

In [0]:
from modules.geo_info import enrich_geography_by_coordinates
from modules.data_quality import (check_data_quality_id,
                                  check_data_quality_foreign_keys,
                                  check_data_quality_timestamps,
                                  check_data_quality_events_table)
from modules.enrichment  import create_integer_datekeys
from modules.write import (add_scd2_columns,
                           identify_lines_SCD2,
                           deactivate_rows_SCD2,
                           append_df_to_table)


In [0]:
dbutils.widgets.text("env", "dev")

In [0]:
environment = dbutils.widgets.get('env')
catalog = "use_case_" + environment
source_schema = "bronze_layer"
target_schema = "silver_layer"

source_table = "events"
target_table = "events_refined"
teams_cross_ref_table = "teams_refined"

source_table_reference = catalog + "." + source_schema + "." +  source_table
target_table_reference = catalog + "." + target_schema + "." + target_table
teams_cross_ref_table_reference = catalog + "." + target_schema + "." + teams_cross_ref_table

In [0]:
# source_catalog = "hive_metastore"
# source_schema = "default"
# source_table_name = "events"

# target_catalog = "hive_metastore"
# target_schema = "default"
# target_table_name = "events_silver"
# cross_check_table_name = "teams_silver"

# source_table_reference = source_catalog + "." + source_schema + "." +  source_table_name
# target_table_reference = target_catalog + "." + target_schema + "." + target_table_name
# teams_cross_ref_table_reference = target_catalog + "." + target_schema + "." + cross_check_table_name

In [0]:
events_df = spark.table(source_table_reference)
teams_df = spark.table(teams_cross_ref_table_reference)
target_table_df = spark.table(target_table_reference)

In [0]:
events_df, bad_formed_df = check_data_quality_id(events_df,"event_id")

In [0]:
check_list = [{"foreign_key_column":"team_id",
              "cross_check_table": teams_df,
              "cross_check_primary_key_column": "team_id"}]
events_df, bad_formed_df = check_data_quality_foreign_keys(events_df, check_list)

In [0]:
events_df, bad_formed_df = check_data_quality_timestamps(events_df,["event_start","event_end","created_at"],"event_id")

In [0]:
events_df, bad_formed_df = check_data_quality_events_table(events_df)

1 row has wrong start/end dates in this table. At the moment the poor quality rows are identified, different possible solutions could be implemented to manage them; a possibility is  collecting all bad quality rows from every table in one refresh cycle and save them in a table for further analysis.

In [0]:
bad_formed_df.display()

In [0]:
events_df = enrich_geography_by_coordinates(events_df, use_api=False)

In [0]:
events_df = create_integer_datekeys(events_df,["event_start","event_end","created_at"])

In [0]:
events_df = add_scd2_columns(events_df, "created_at")

In [0]:
new_and_updated_df, to_be_deactivated_df = identify_lines_SCD2(events_df,target_table_df,["event_id"])

In [0]:
deactivate_rows_SCD2(spark,new_and_updated_df, to_be_deactivated_df, target_table_reference, ["event_id"])

In [0]:
append_df_to_table(new_and_updated_df, target_table_reference)