```markdown
To run this Jupyter notebook, use Python 3.13 or later. The recommended environment is a virtual environment or a conda environment with the necessary packages installed, as specified int he requirements.txt file. Ensure that you have Jupyter Notebook installed to execute the cells interactively.
```

### Install Required Packages

In [None]:
%run installrequirements.ipynb

### Import Reusable Code

In [None]:
%run reuseGenerateData.ipynb

In [None]:
import ipywidgets as widgets
from IPython.display import display

# Create a dropdown widget for environment selection
env_dropdown = widgets.Dropdown(
    options=["dev", "uat", "prod"],
    value="dev",
    description="Environment:",
)

outputdirectory = widgets.Text("testdata")

# Display the widget
display(widgets.HBox([env_dropdown, outputdirectory]))

In [None]:
selected_env = env_dropdown.value
outputdir = outputdirectory.value
healthData = HealthData(selected_env, outputdir)  # type: ignore
healthData.generate_data()

In [None]:
import pandas as pd

healthdatadf = pd.read_parquet(f"./{outputdir}/health_data_{selected_env}.parquet")

In [None]:
####!pyspark --packages io.delta:delta-core_2.11:0.4.0
import pyspark
from pyspark.sql import SparkSession
from delta import *

builder = (
    SparkSession.builder.appName("DeltaTableCreation_new")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.debug.maxToStringFields", 1000)

In [None]:
filtered_healthdatadf = healthdatadf[healthdatadf["BloodPressure"] > 120]
print(filtered_healthdatadf)

# Use Spark SQL to filter the data and convert to Pandas DataFrame
spark.createDataFrame(healthdatadf).createOrReplaceTempView("sparkhealthdatadf")
filtered_healthdatadf_sql = spark.sql(
    "SELECT * FROM sparkhealthdatadf WHERE BloodPressure > 120"
).toPandas()
print(filtered_healthdatadf_sql)

In [None]:
# Create a Spark DataFrame from the healthdatadf DataFrame
sparkdf = spark.createDataFrame(healthdatadf)

# Write the DataFrame as a Delta table
sparkdf.write.format("delta").mode("overwrite").save(
    f"./{outputdir}/health_data_{selected_env}"
)

In [None]:
# Read the Delta table
delta_table = spark.read.format("delta").load(
    f"./{outputdir}/health_data_{selected_env}"
)

filtered_data = delta_table.filter(delta_table.BloodPressure > 120)
filtered_data.show()

In [None]:
# Create a temporary view
delta_table.createOrReplaceTempView(f"health_data")

# Use Spark SQL to filter the data
filtered_data_sql = spark.sql("SELECT * FROM health_data WHERE BloodPressure > 120")
filtered_data_sql.show()

In [None]:
# Execute the SQL query and display the results
result = spark.sql(
    "SELECT * FROM `parquet`.`./testdata/health_data_dev` WHERE BloodPressure > 120"
)
result.show()

In [None]:
import os

# List all files in the subfolders of the testdata directory
file_list = [
    os.path.join(root, file)
    for root, dirs, files in os.walk(outputdir)
    for file in files
    if file.endswith(".csv") or file.endswith(".parquet")
]

# Create a dropdown widget for file selection
file_dropdown = widgets.Dropdown(
    options=[file.replace("testdata/", "") for file in file_list],
    description="Files:",
)

# Display the widget
display(file_dropdown)

In [None]:
file_extension = file_dropdown.value.split(".")[-1]

spark.sql(
    f"SELECT * FROM `{file_extension}`.`./{outputdir}/{file_dropdown.value}`"
).show()

In [None]:
# List columns and data types
sparkdf.printSchema()

In [None]:
# Describe the data
display(sparkdf.describe().show())

In [None]:
if "/" in file_dropdown.value:
    database_name = (
        file_dropdown.value.split("/")[0].replace("-", "")
        if file_dropdown.value.split("/")[0]
        else "TestDB"
    )
    table_name = file_dropdown.value.split("/")[1].split(".")[0].replace("_", "")
else:
    database_name = "TestDB"
    table_name = file_dropdown.value.split(".")[0].replace("_", "")

In [None]:
sparkdf = (
    spark.read.format(f"{file_extension}")
    .option("header", "true")
    .load(f"./{outputdir}/{file_dropdown.value}")
)

# Rename columns to remove invalid characters
for col in sparkdf.columns:
    new_col = (
        col.replace(" ", "_")
        .replace("(", "")
        .replace(")", "")
        .replace("\n", "")
        .replace("\t", "")
        .replace("=", "")
    )
    sparkdf = sparkdf.withColumnRenamed(col, new_col)

# Create a database
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")

# Use the created database
spark.sql(f"USE {database_name}")

table_name = (
    table_name.replace(" ", "_")
    .replace("(", "")
    .replace(")", "")
    .replace("\n", "")
    .replace("\t", "")
    .replace("=", "")
    .replace("-", "")
)

# Write the DataFrame as a Delta table in the created database
sparkdf.write.format("delta").mode("overwrite").saveAsTable(f"{table_name}")

# Verify that the table has been created and data has been loaded

In [None]:
spark.sql("SHOW TABLES").show()

In [None]:
spark.sql(f"SELECT COUNT(*) FROM {table_name}").show()
import matplotlib.pyplot as plt

# Get the count of rows in the table
count = spark.sql(f"SELECT COUNT(*) as count FROM {table_name}").collect()[0]["count"]

# Plot the count
plt.figure(figsize=(6, 4))
plt.bar([table_name], [count], color="blue")
plt.xlabel("Table Name")
plt.ylabel("Row Count")
plt.title("Row Count in Table")
plt.show()