### Example Exploratory Notebook

Use this notebook to explore the data generated by the pipeline in your preferred programming language.

**Note**: This notebook is not executed as part of the pipeline.

In [0]:
# Create a streaming DataFrame by reading from the profiles table
df = (
  spark.table("profiles_bronze")
    .select("topic", "partition", "offset", "value_str", "timestampType", "timestamp", "ingestTime")
    .withColumn(
      "latency_ms"
      ,unix_millis(col("ingestTime")) - unix_millis(col("timestamp"))
    )
    .orderBy(col("timestamp"), ascending=False)
    .limit(1000)
  )

# Display the streaming DataFrame
display(df)

In [0]:
# Calculate the average latency_ms from streaming_df
average_latency_ms_df = df.groupBy().avg("latency_ms")

# Display the result
display(average_latency_ms_df)

In [0]:
df = (
  spark.read.option("readChangeFeed", "true")
  .option("startingVersion", 11)
  .table(f"profiles_bronze")
  .filter("_change_type = 'delete'")
)
display(df)

In [0]:
%sql
DESCRIBE EXTENDED profiles AS JSON;

In [0]:
%sql 
FROM profiles_scd2 |>
AGGREGATE COUNT(*) as rcrd_cnt 
GROUP BY __START_AT._commit_version 

In [0]:
%fs ls s3://adc-lv-dev-devops.libreview-data-migration/patient-1_Measurement/patient-1_Measurement/

In [0]:
%fs  head -100 s3://adc-lv-dev-devops.libreview-data-migration/patient-1_Measurement/patient-1_Measurement/patient-1_Measurement_1.tsv

In [0]:
%fs ls s3://adc-lv-dev-devops.libreview-data-migration/

In [0]:
# bronze_patientpractice
# D
# {"practiceId":"b10e40f5-5521-42c1-9b6f-4f18c74cd940","shard":2007,"patientId":"070470ae-be33-4ed8-a42a-01b4b89051ff","referrerId":"9621d57e-e27d-e711-818b-06f1e198757d","name":"3k Patients","address1":"Alafaya","address2":"","city":"Orlando","state":"FL","zipCode":"32826","country":"US","phoneNumber":"4075555555","created":1502379813,"createdBy":"migrate-lv","updated":1571712004,"updatedBy":"migrate-lv"}

# {"address1":"852 Wilderman Spring","address2":"2226 Emard Fields","anonymous":false,"businessId":"idr8xjgufr","city":"West Mozellemouth","country":"US","created":1586286353,"createdBy":"cda040f5-7902-11ea-858e-0242ac110002","name":"practice0w66gx5w","patientId":"cda040f5-7902-11ea-858e-0242ac110002","phoneNumber":"01651369554","practiceId":"cccd9346-7902-11ea-858e-0242ac110002","referrerId":"c9a6ff49-7902-11ea-b9ce-0242ac11000a","shard":325,"state":"South Carolina","updated":1586286353,"updatedBy":"cda040f5-7902-11ea-858e-0242ac110002","zipCode":"50224"}


# P
# {"b10e40f5-5521-42c1-9b6f-4f18c74cd940":"w","070470ae-be33-4ed8-a42a-01b4b89051ff":"w"}

In [0]:
# bronze_measurement
#{"d1":22,"d8":0,"d31":70,"d32":250,"d33":true,"d34":true,"d35":true}

# {"shard":1465,"id":"7230f654-de04-11ed-8f95-0242ac110006","p":"71514a73-de04-11ed-892b-0242ac110002","did":"722ede0a-de04-11ed-8f95-0242ac110006","uid":"7223f647-de04-11ed-a5fc-0242ac110006","ts":"2023-04-18T11:16:59.321-05:00","t":6,"fts":"2023-04-18T16:16:59.321Z","rn":775,"created":1681834619,"d1":22,"d2":"","d8":0,"d31":70,"d32":250,"f":{},"l":{},"h":{},"s":{}}

In [0]:
# pipelines.streamingFlowReadOptionsEnabled
# spark.databricks.streaming.realTimeMode.enabled

In [0]:
# Query [id = 32a5f0cc-af4a-479b-9c11-4dde4f41fa3a, runId = 2fa934e8-e59b-4692-a2e5-240b0932a63d] terminated with exception: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: {.
# Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.  SQLSTATE: 22023

In [0]:
# Cannot resolve "semi_structured_extract_json_multi(D_variant, $.connectionId)" due to data type mismatch: The first parameter requires the "STRING" type, however "D_variant" has the type "STRUCT<connectionId: STRING, patientId: STRING, status: STRING, source: STRING, created: BIGINT, createdBy: STRING, updated: BIGINT, updatedBy: STRING>".

In [0]:
# Query [id = aa1c1fce-caab-4375-8678-6431da06a347, runId = 1a35e0b4-cb8e-4f74-b418-59405416c49a] terminated with exception: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: .
# Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.  SQLSTATE: 22023


# {"address1":"010 Dicki Union","address2":"22025 Marlin Light","anonymous":false,"businessId":"idxdlfxc71","city":"North Nicolas","country":"US","created":1587214191,"createdBy":"17fdc071-8173-11ea-97b7-0242ac110008","name":"practicevjalt6k7","patientId":"17fdc071-8173-11ea-97b7-0242ac110008","phoneNumber":"01653453370","practiceId":"171ecdf0-8173-11ea-97b7-0242ac110008","referrerId":"13f70c75-8173-11ea-844f-0242ac11000b","shard":1836,"state":"Iowa","updated":1587214191,"updatedBy":"17fdc071-8173-11ea-97b7-0242ac110008","zipCode":"02665"}

In [0]:
%sql 

use schema `temp01`

In [0]:
%sql 

select D from dev-adc-superior-lv-tmp-us-east-2.temp01.silver_events_patient_data_scd2

In [0]:
%fs ls s3://adc-lv-dev-devops.libreview-data-migration/PatientPractice

In [0]:
%fs ls s3://adc-lv-dev-devops.libreview-data-migration/PatientPractice

In [0]:
df = (spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("sep", "\t")
      .option("header", "false")
      .option("cloudFiles.schemaLocation", "/Vtmp/a")

      .load("s3://adc-lv-dev-devops.libreview-data-migration/PatientPractice/"))

In [0]:
display(df)

In [0]:
%sql
SELECT
  from_json(
    REPLACE(
      REGEXP_REPLACE(substring(D, 2, length(D)-2), '[\\x00-\\x1F\\x7F]', ''),
      '\\u0000', ''
    ),
    NULL,
    map("schemaLocationKey", "silver_patient_schema123")
  ) AS parsed_struct
FROM STREAM(patient_cdf)
LIMIT 5;

In [0]:
%sql 

with cte as (
select from_json('{
    "attributes": [
        {
            "name": "a",
            "v1": 1,
            "v2": 10
        },
        {
            "name": "b",
            "v1": 2,
            "v2": 20
        }
    ]
}','struct<attributes ARRAY<STRUCT<name: STRING, v1: BIGINT, v2: BIGINT>>>') as attributes_array
)
, exploded_data as (

select * 
  , posexplode(array(* except (attributes_array, name)))
  from (
    select inline(attributes_array.attributes)
      , *
    from cte
  )
)
, get_keys as (
select attributes_array
  , json_object_keys(to_json(attributes_array)) as outer_key
  , json_object_keys(to_json(attributes_array.attributes[0])) as inner_keys
  , inner_keys[pos+1] as v1_or_v2 -- this could probably be cleaner, but just adding 1 because "name" is the first value
  , concat(outer_key[0], '.', array_join(inner_keys, '.')) as full_path
  , *
from exploded_data
)

select 
  name
  , pos
  , v1_or_v2
  , concat(outer_key[0], '.', name, '.', v1_or_v2) as new_key
  , col as new_value
  , map(new_key, new_value) as new_json
from get_keys

In [0]:
%sql 

CREATE OR REPLACE TABLE ankurnayyar_cat1.default.patient_cdf (
  ID STRING,
  Shard STRING,
  Private STRING,
  Name STRING,
  Address1 STRING,
  Address2 STRING,
  City STRING,
  State STRING,
  ZipCode STRING,
  Country STRING,
  PhoneNumber STRING,
  BusinessID STRING,
  Created STRING,
  CreatedBy STRING,
  Updated STRING,
  UpdatedBy STRING,
  D STRING
)
USING DELTA;


In [0]:
%sql 
describe ankurnayyar_cat1.default.bronze_events_patient_data

In [0]:
%sql 



INSERT INTO ankurnayyar_cat1.default.bronze_events_patient_data  VALUES (
  '123', -- ID
  '2115',                                  -- Shard
  '0',                                     -- Private
  '9T5FA1',                                -- Name
  'Main Ave 123',                          -- Address1
  '',                                      -- Address2
  'Phoenix',                               -- City
  'Arizona',                               -- State
  '50995',                                 -- ZipCode
  'US',                                    -- Country
  '5551232255',                            -- PhoneNumber
  '7af92634b046e8bb',                      -- BusinessID
  '2023-09-11 13:20:08',                   -- Created
  'c4af8d09-50a5-11ee-8a8f-26f091458896', -- CreatedBy
  '2023-09-11 13:20:08',                   -- Updated
  'c4af8d09-50a5-11ee-8a8f-26f091458896', -- UpdatedBy
  '000000000000B897',                       -- V
  '{"address1":"Main Ave 123","address2":"","businessId":"7af92634b046e8bb","city":"Phoenix","country":"US","created":1694438408,"createdBy":"c4af8d09-50a5-11ee-8a8f-26f091458896","id":"ed9921e2-50a5-11ee-8205-02223a2aacf2","name":"9T5FA1","phoneNumber":"5551232255","private":false,"programs":null,"records":null,"shard":2115,"state":"Arizona","updated":1694438408,"updatedBy":"c4af8d09-50a5-11ee-8a8f-26f091458896","zipCode":"50995", "zipCode123":"50995"}',  -- D
  '{"c4af8d09-50a5-11ee-8a8f-26f091458896":"w","ed9921e2-50a5-11ee-8205-02223a2aacf2":"r","ed9921e2-50a5-11ee-8205-02223a2aacf2-admin":"w"}', -- P
  current_timestamp()                         -- ingestTime
);


In [0]:
%sql
SELECT *
FROM ankurnayyar_cat1.default.bronze_events_patient_data where ID IN (123);



In [0]:
%sql
delete
FROM ankurnayyar_cat1.default.bronze_events_patient_data where ID IN (123);

In [0]:
%sql
SELECT *
FROM ankurnayyar_cat1.default.silver_events_patient_data_scd2 where ID IN (123);

In [0]:
%sql
SELECT *
FROM ankurnayyar_cat1.default.dq_events_patient_data_keys where ID IN (123); 



In [0]:
%sql

drop table if exists ankurnayyar_cat1.default.silver_events_patient_data_extra_flat
