In [None]:
from pyspark.sql.functions import col, explode


df = spark.sql("SELECT * FROM test_lh.scan_response LIMIT 1000")
display(df)

In [None]:
df = spark.read.option("multiline", "true").json("Files/scan_response.json")
# df now is a Spark DataFrame containing JSON data from "Files/scan_response.json".
display(df)

In [None]:
df_exploded = df.select(explode(df.workspaces).alias("workspace"))
display(df_exploded)

In [None]:
# explode the datasets within each workspace
df_exploded = df_exploded.select(
    "workspace.id",
    "workspace.name",
    "workspace.type",
    "workspace.state",
    "workspace.isOnDedicatedCapacity",
    explode("workspace.datasets").alias("dataset")
)

display(df_exploded)

In [None]:
display(df_exploded)

In [None]:
# explode datasets into their own records

# explode dataset_refreshSchedule into individual rows
df_datasets = df_exploded.select(
    'id',
    'name',
    'type',
    'state',
    'isOnDedicatedCapacity',
    df_exploded['dataset.id'].alias('dataset_id'),
    df_exploded['dataset.name'].alias('dataset_name'),
    df_exploded['dataset.tables'].alias('dataset_tables'),
    df_exploded['dataset.configuredBy'].alias('dataset_confiured_by'),
    df_exploded['dataset.configuredBy'].alias('dataset_confiured_by'),
    df_exploded['dataset.isEffectiveIdentityRequired'].alias('dataset_isEffectiveIdentityRequired'),
    df_exploded['dataset.isEffectiveIdentityRolesRequired'].alias('dataset_isEffectiveIdentityRolesRequired'),
    df_exploded['dataset.refreshSchedule'].alias('dataset_refreshSchedule'),
    df_exploded['dataset.refreshSchedule.days'].alias('dataset_refreshScheduleDays'),
    df_exploded['dataset.refreshSchedule.times'].alias('dataset_refreshScheduleTimes'),
    df_exploded['dataset.refreshSchedule.enabled'].alias('dataset_refreshScheduleEnabled'),
    df_exploded['dataset.refreshSchedule.localTimeZoneId'].alias('dataset_refreshScheduleLocalTimeZoneId'),
    df_exploded['dataset.refreshSchedule.notifyOption'].alias('dataset_refreshScheduleNotifyOption'),
    df_exploded['dataset.targetStorageMode'].alias('dataset_targetStorageMode'),
    df_exploded['dataset.createdDate'].alias('dataset_createdDate'),
    df_exploded['dataset.contentProviderType'].alias('dataset_contentProviderType'),

    'dataset'
)
display(df_datasets)

In [None]:
df_datasets.printSchema()

In [None]:
from pyspark.sql.functions import to_json

# convert dataset_refreshSchedule: struct (nullable = true) to string
df_datasets_new = df_datasets.withColumn('refreshScheduleString', to_json(col('dataset_refreshSchedule')))

display(df_datasets_new)

In [None]:
from pyspark.sql.functions import explode_outer

df_dropped = df_datasets_new.select(
    'id',
    'name',
    'type',
    'state',
    'isOnDedicatedCapacity',
    df_exploded['dataset.id'].alias('dataset_id'),
    df_exploded['dataset.name'].alias('dataset_name'),
    df_exploded['dataset.tables'].alias('dataset_tables'),
    df_exploded['dataset.configuredBy'].alias('dataset_configuredBy'),
    df_exploded['dataset.configuredById'].alias('dataset_configuredById'),
    df_exploded['dataset.isEffectiveIdentityRequired'].alias('dataset_isEffectiveIdentityRequired'),
    df_exploded['dataset.isEffectiveIdentityRolesRequired'].alias('dataset_isEffectiveIdentityRolesRequired'),
    df_exploded['dataset.refreshSchedule'].alias('dataset_refreshSchedule'),
    'refreshScheduleString',
    explode_outer('dataset.refreshSchedule.days').alias('dataset_individualRefreshDays'),
    df_exploded['dataset.refreshSchedule.enabled'].alias('dataset_refreshScheduleEnabled'),
    df_exploded['dataset.refreshSchedule.localTimeZoneId'].alias('dataset_refreshScheduleLocalTimeZoneId'),
    df_exploded['dataset.refreshSchedule.notifyOption'].alias('dataset_refreshScheduleNotifyOption'),
    df_exploded['dataset.targetStorageMode'].alias('dataset_targetStorageMode'),
    df_exploded['dataset.createdDate'].alias('dataset_createdDate'),
    df_exploded['dataset.contentProviderType'].alias('dataset_contentProviderType')
)

display(df_dropped)

In [None]:
# ready data for load to azure sql
# drop dataset_tables, drop dataset

df_write = df_dropped.drop('dataset_tables', 'dataset_refreshSchedule', 'dataset_refreshScheduleDays', 'dataset_refreshScheduleTimes', 'dataset')



In [None]:
display(df_write)

In [None]:
df_write.printSchema()

In [None]:
from notebookutils import mssparkutils

# Replace with your Key Vault URL and secret name
key_vault_url = ""
secret_name = ""

# Retrieve the secret
secret_value = mssparkutils.credentials.getSecret(key_vault_url, secret_name)


In [None]:
# write to azure sql db

# Azure SQL Database connection properties
url = ""
table = ""
properties = {
    "user": "",
    "password": secret_value,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# Write DataFrame to Azure SQL Database
df_write.write.jdbc(url=url, table=table, mode="overwrite", properties=properties)