The export done by the *Synaps Link for Dataverse* is stored in the *dataverse-ENVIRONMENT-GUID* container. This notebook copies this CSV export to the **BRONZE** layer container.

1. Read the model json file from the *dataverse-ENVIRONMENT-GUID* => *Microsoft.Athena.TrickleFeedService* directory and extract the column names. 
2. Read the csv export data under the table name directory, add the column names to the dataframe.
3. Save the dataframe to the Bronze layer. Use date as partition folder name.
2. Manually upload the external resident data to the residents_external directiry in the raw container.

In [None]:
# configs
storage_account_name = "[storage_account_name]"
tenant_id = "[tenant_id]"
client_id = "[client_id]"
client_secret = "[client_secret]"

# spark configuration options
spark.conf.set(f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net","OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account_name}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account_name}.dfs.core.windows.net", client_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account_name}.dfs.core.windows.net", client_secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account_name}.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

In [None]:
# Dataframe from dataverse table with header

import json
from pyspark.sql.types import *

dataverse_link_container_name = "[dataverse_link_container_name]"
dataverse_customer_table_name = '[dataverse_customer_table_name]'

dataverse_customer_model_df = spark.read.text(f"abfss://{dataverse_link_container_name}@{storage_account_name}.dfs.core.windows.net/Microsoft.Athena.TrickleFeedService/{dataverse_customer_table_name}-model.json")
dataverse_customer_model_json = dataverse_customer_model_df.first()[0]
dataverse_customer_model = json.loads(dataverse_customer_model_json)

attributes = dataverse_customer_model['entities'][0]['attributes']
dataverse_customer_table_header = [attribute['name'] for attribute in attributes]

# print(dataverse_customer_table_header)

schema = StructType(
    [StructField(f, StringType(), True) for f in dataverse_customer_table_header]
)

dataverse_customer_table_df = spark.read.option("header", "false").schema(schema).option("multiLine", "true").csv(f"abfss://{dataverse_link_container_name}@{storage_account_name}.dfs.core.windows.net/{dataverse_customer_table_name}/")


In [None]:
# Save to bronze layer with partition

from datetime import datetime

bronze_container_name = 'bronze'

partition_date = datetime.now().strftime("%Y-%m-%d")    #partition

dataverse_customer_table_df.write.format("csv").option("header",True) .mode("overwrite").save(f"abfss://{bronze_container_name}@{storage_account_name}.dfs.core.windows.net/{dataverse_customer_table_name}/{partition_date}")