Metadata (measurement files) for the datastreams are stored in a Cosmos DB. There is a set of REST api to simplify the access and discovery of the metadata. Azure key vault is used for storing any secrets/keys.

The following code will demonstrate how to make a REST call to query the measurement files.

References -
1. Secret management - https://learn.microsoft.com/en-us/azure/databricks/security/secrets/
2. Databricks secrets using azure Key vault - https://hevodata.com/learn/databricks-secret/#51 

Reference to install the SSL certificates:
https://learn.microsoft.com/en-us/azure/databricks/kb/python/import-custom-ca-cert

In [0]:
# Constants
API_ENDPOINT = "https://web-app-linuxrqpzw.azurewebsites.net"

In [0]:
import requests
# Example - https://web-app-linuxrqpzw.azurewebsites.net/v0.0.2/datastreams?measurementId=7e5dd17c-d299-41b9-8bb0-cac28b4c2b72&type=EXTRACTED&status=CREATED&tags=tag1%2Ctag2
def GetDatastreamsDataUriByMeasurement(measurementId, searchQuery):
    api_url = f"{API_ENDPOINT}/v1/datastreams?measurementId={measurementId}&{searchQuery}"
    response = requests.get(api_url, verify = False)
    if response.json()['size'] < 1: return
    # Replace https with abfss Format the Data Url from the response
    return "{0}/{1}".format(response.json()['items'][0]['baseUriPath'].replace("https://", "abfss://"), response.json()['items'][0]['relativeUriPath'])

In [0]:
import requests
requests.packages.urllib3.disable_warnings() 
# Example - https://web-app-linuxrqpzw.azurewebsites.net/v1/datastreams?type=EXTRACTED&status=CREATED&tags=tag1
# Returns Datastream Id and Datastream Data URI
def GetDatastreamsDataUri(searchQuery):
    api_url = f"{API_ENDPOINT}/v1/datastreams?{searchQuery}"
    response = requests.get(api_url, verify = False)
    if response.json()['size'] < 1: return
    # Replace https with abfss Format the Data Url from the response
    return response.json()['items'][0]['id'],response.json()['items'][0]['measurementId'],"{0}{1}@{2}/{3}".format("abfss://",response.json()['items'][0]['relativeUriPath'].split("/",1)[0],response.json()['items'][0]['baseUriPath'].replace("https://", "").replace("blob","dfs"),response.json()['items'][0]['relativeUriPath'].split("/",1)[1])

In [0]:
import requests
requests.packages.urllib3.disable_warnings()

def CreateDatastream(measurementId, data):
    api_endpoint = f"{API_ENDPOINT}/v1/measurements/{measurementId}/datastreams"
    headers = {"Content-Type": "application/json"}
    response = requests.post(url = api_endpoint, headers= headers, data = data, verify = False)
    curatedDatastreamDataUri = "{0}{1}@{2}/{3}".format("abfss://",response.json()['relativeUriPath'].split("/",1)[0],response.json()['baseUriPath'].replace("https://", "").replace("blob","dfs"),response.json()['relativeUriPath'].split("/",1)[1])
    return curatedDatastreamDataUri

The datastreams are stored in a datalake(ADLS). Once the datastream url has been retrieved from the measurement storage(Cosmos DB), it can be used to access the datastream. There are multiple ways to authenticate to ADLS from databricks. A recommended way is to via OAuth with an Azure service principal.

The following code demonstrate how to authenticate and read a datastream from the datalake.

References -
1. Access Azure Data Lake Storage Gen2 or Blob Storage using OAuth 2.0 with an Azure service principal - https://learn.microsoft.com/en-us/azure/databricks/external-data/azure-storage#--access-azure-data-lake-storage-gen2-or-blob-storage-using-oauth-20-with-an-azure-service-principal

In [0]:
def set_spark_conf(storage_account_name,service_principal_id,service_principal_secret,tenant_id): 
    spark.conf.set(f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net", "OAuth")
    spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account_name}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account_name}.dfs.core.windows.net", service_principal_id)
    spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account_name}.dfs.core.windows.net", service_principal_secret)
    spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account_name}.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

In [0]:
def ReadDatastream(datastreamDataUri, seperator, file_format):
    df = spark.read.option("sep", seperator).csv(f"{datastreamDataUri}/*.{file_format}")
    return df

The curated data can be presented as a table in databricks. These tables external tables, so the actual data still sits in ADLS datalake. But they give a sql friendly semantic layer for working with it easily.

In [0]:
def write_table(dataframe, table_name, mode, location):
    dataframe.write.\
        mode(mode).\
        format("parquet").\
        option("path", location).\
        saveAsTable(table_name)