# Ingest dummy data

In this notebook, we will show how to create test data in Bronze layer of the data lake, and how to update it with new rows.

In [None]:
from delta import DeltaTable
table_name = "Notes"

## Creation of the initial table

First, create a table and insert 10 rows into it. 

In this example, there are 4 columns in the table: NoteID, NoteText, UserID, AppointmentDate.
- NoteID is a primary key in this column and a column that will be used to determine change in the table.
- NoteText stores patient notes (in this example they are of course synthetic). Notes can be large. In the pipeline, the text in the notes will be pseudonymised, and additional features of interest will be extracted.
- UserID is an identifier of a patient that will need to be pseudonymised in the pipeline.
- AppointmentDate is information about patients appointment and will also need to be pseudonymised to avoid being able to link the data back to the patient.

In [None]:
df = spark.createDataFrame(
    [
        (
            1,
            (
                "Jonathan appeared agitated during today's session, reporting heightened"
                " irritability and difficulty focusing at work. He shared concerns about"
                " persistent insomnia and a sense of impending doom. Jonathan is currently"
                " prescribed lorazepam (1mg as needed) for anxiety management, and we"
                " discussed incorporating relaxation techniques into his daily routine. A"
                " follow-up session is scheduled for November 19, 2023."
            ),
            483215,
            "2023-11-05",
        ),
        (
            2,
            (
                "Olivia conveyed a persistent low mood and feelings of guilt related to a"
                " recent personal loss. She described disruptions in her sleep pattern and"
                " appetite. Olivia is not currently taking any medications. We explored grief"
                " coping strategies and established a plan for ongoing support. Next session:"
                " November 22, 2023."
            ),
            176824,
            "2023-11-08",
        ),
        (
            3,
            (
                "Michael shared concerns about intrusive thoughts and compulsive behaviors"
                " indicative of obsessive-compulsive disorder. He is currently prescribed"
                " fluvoxamine (100mg daily). We discussed cognitive-behavioral strategies to"
                " manage obsessive thoughts. A follow-up is scheduled for November 24, 2023."
            ),
            742309,
            "2023-11-10",
        ),
        (
            4,
            (
                "Jasmine expressed feelings of overwhelming sadness and loss of interest in"
                " activities she once enjoyed. She is prescribed escitalopram (10mg daily) for"
                " depression. We discussed the importance of self-care and scheduled a"
                " follow-up for November 29, 2023."
            ),
            589124,
            "2023-11-15",
        ),
        (
            5,
            (
                "Lucas described acute anxiety related to social situations, impacting his"
                " daily life. He is currently taking sertraline (50mg daily). We explored"
                " exposure therapy techniques and set goals for gradual desensitization. The"
                " next session is scheduled for December 2, 2023."
            ),
            317468,
            "2023-11-18",
        ),
        (
            6,
            (
                "Zoe reported heightened stress levels due to academic pressures and"
                " challenges with time management. She is not currently on medication. We"
                " discussed stress reduction techniques and established strategies for"
                " improved work-life balance. Follow-up session: December 6, 2023."
            ),
            864502,
            "2023-11-22",
        ),
        (
            7,
            (
                "Ryan expressed symptoms of attention deficit hyperactivity disorder (ADHD),"
                " including difficulty sustaining attention and impulsivity. He is prescribed"
                " methylphenidate (20mg daily). We discussed behavioral strategies to manage"
                " ADHD symptoms. Next session: December 9, 2023."
            ),
            125739,
            "2023-11-25",
        ),
        (
            8,
            (
                "Ava shared concerns about recurrent panic attacks, particularly in crowded"
                " spaces. She is prescribed clonazepam (0.5mg as needed). We discussed"
                " breathing exercises and exposure therapy. Follow-up scheduled for December"
                " 13, 2023"
            ),
            650821,
            "2023-11-29",
        ),
        (
            9,
            (
                "Elijah reported persistent feelings of emptiness and identity disturbance. He"
                " is prescribed aripiprazole (5mg daily). We discussed the importance of mood"
                " tracking and established goals for emotional regulation. Next session:"
                " December 16, 2023."
            ),
            294617,
            "2023-12-02",
        ),
        (
            10,
            (
                "Sophia discussed challenges with impulse control and emotional dysregulation."
                " She is currently prescribed lamotrigine (50mg daily). We explored"
                " dialectical behavior therapy (DBT) skills to enhance emotion regulation."
                " Follow-up scheduled for December 19, 2023."
            ),
            817403,
            "2023-12-05",
        ),
    ],
    ["NoteID", "NoteText", "UserID", "AppointmentDate"],
)

In [None]:
df.write.format("delta").mode("overwrite").save(f"abfss://bronze@{spark.conf.get('spark.secret.datalake-uri')}/{table_name}")

Reading back the table that's been created and checking it looks as expected

In [None]:
def read_table(table_name: str, layer: str):
    datalake_uri = spark.conf.get('spark.secret.datalake-uri')
    path = f"abfss://{layer}@{datalake_uri}/{table_name}"
    return spark.read.format("delta").load(path)

In [None]:
df = read_table(table_name, "bronze")
display(df)

## Checking the result of the pipeline

After running the previous cells, head to ADF instance of your resource group (it will have a name like `adf-${flowehr_id}-dev)` and trigger the PatientsPipeline (click on `Add Trigger` - `Trigger Now`).

After it runs successfully, run the code below to check the result in the Gold layer of the Data Lake.

In [None]:
df = read_table(table_name, "silver")
display(df)

In [None]:
df = read_table(table_name, "gold")
display(df)

You can also check table history, like done below

In [None]:
layer = "silver"  # Replace with "silver" or "bronze" to check other layers

path = f"abfss://{layer}@{spark.conf.get('spark.secret.datalake-uri')}/{table_name}"
display(DeltaTable.forPath(spark, path).history())

## Inserting an update into the table

Now let's insert a new row to the table, and delete an existing one.

Note that in this pipeline, the updates are not being processed. (If you try to update a row, the pipeline will fail.)

This is because for large volumes of text, it is going to be expensive to determine which rows have updated, if the updated rows have the same primary keys as the rows already existing in the table.

Thus, we are assuming that each table update is either inserting a new Primary key, or removing a primary key.

In [None]:


update_df = spark.createDataFrame(
    [
        (
            11,
            (
                "Mia described symptoms of insomnia and racing thoughts, suggesting generalized anxiety. She is currently not taking any medication. We explored sleep hygiene practices and relaxation techniques. A follow-up appointment is scheduled for December 29, 2023."
            ),
            548290,
            "2023-12-15"
        )
    ],
    ["NoteID", "NoteText", "UserID", "AppointmentDate"]
)

display(update_df)

In [None]:
path = f"abfss://bronze@{spark.conf.get('spark.secret.datalake-uri')}/{table_name}"
delta_table = DeltaTable.forPath(spark, path)
delta_table.alias("target").merge(
    source=update_df.alias("source"),
    condition="source.NoteID = target.NoteID"
).whenNotMatchedInsertAll().execute()

Now you can re-trigger the pipeline again.
After you've done that, you can check in the logs how many rows have been processed in Silver and Gold pipeline.

## Querying data in the Gold store

The final result of the pipeline can be accessed through a Managed table in Unity Catalog.

You can query data through SQL statement in a notebook, like so:

In [None]:
%sql
SELECT * from catalog.schema.Notes order by NoteID asc;