In [0]:
dbutils.widgets.text("entityName", "", "Entity Name")
dbutils.widgets.text("dataSourceName", "", "Data Source Name")
dbutils.widgets.text("version", "", "Version")
dbutils.widgets.text("inputPath", "", "Input path")
dbutils.widgets.text("inputContainer", "", "Input container")
dbutils.widgets.text("outputPath", "", "Output path")
dbutils.widgets.text("outputContainer", "", "Output container")

#### Get input values from data factory

In [0]:
entity_name = dbutils.widgets.get("entityName")
datasource_name = dbutils.widgets.get("dataSourceName")
version = dbutils.widgets.get("version")
input_path = dbutils.widgets.get("inputPath")
input_container = dbutils.widgets.get("inputContainer")
output_path = dbutils.widgets.get("outputPath")
output_container = dbutils.widgets.get("outputContainer")

In [0]:
import os
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

from pyspark.sql.functions import col, sum, count, min, max
from ygdra.service import YService
from ygdra.storage import YStorage

#### Get service principal client secret from key vault

In [0]:
keyvault_scope = os.getenv("KEYVAULT_NAME")
client_secret = dbutils.secrets.get(keyvault_scope, "clientsecret")

#### Configure spark to access storage account

In [0]:
# Get engine from ygdra api
ygdra = YService(client_secret)
ygdra.init_access_token()
engine = ygdra.get_engine()

# Get storage details
storage_account_name = engine["storageName"]
storage_account_key = f"dsLake-{storage_account_name}"
storage_account_key_value = dbutils.secrets.get(keyvault_scope, storage_account_key)

# Configure spark to access storage account
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", storage_account_key_value)

#### Read storage data source (bronze)

In [0]:
storage = YStorage(storage_account_name)
source = storage.get_path(input_container, input_path)

data = spark.read.parquet(source)
data.show()

#### Computing metrics

In [0]:
metric = ({"name": entity_name, 
           "namespace": f"{datasource_name}.{entity_name}",
           "count": str(data.count()), 
           "dimensions": [
             {"name": "Engine Name", "value": engine['engineName']},
             {"name": "Entity Name", "value": entity_name},
             {"name": "Data Source Name", "value": datasource_name},
             {"name": "Version", "value": version }
           ] 
          })

ygdra.add_metric(metric, entity_name, datasource_name, version)

In [0]:
numeric_types = ['int', 'decimal', 'float', 'long', 'double']

for attribute in data.columns:
  dtype = data.select(attribute).dtypes[0][1]
  if (dtype in numeric_types):
    
    stats = data.select(
      sum(col(attribute)).cast('double').alias('__sum__'), 
      count(col(attribute)).cast('int').alias('__count__'), 
      min(col(attribute)).cast('double').alias('__min__'), 
      max(col(attribute)).cast('double').alias('__max__')
    ).collect()
    
    stats = stats[0]
    
    metric = ({"name" : attribute, 
     "sum" : str(stats['__sum__']), 
     "count" : str(stats['__count__']), 
     "min" : str(stats['__min__']), 
     "max" : str(stats['__max__']), 
     "namespace" :  f"{datasource_name}.{entity_name}", 
     "dimensions" : [
       { "name" : "Engine Name", "value" : engine['engineName'] } ,
       { "name" : "Entity Name", "value" : entity_name } ,
       { "name" : "Data Source Name", "value" : datasource_name } ,
       { "name" : "Version", "value" : version } ,
       { "name" : "Column Name", "value" : attribute } ,
     ] 
    })
    
    ygdra.add_metric(metric, entity_name, datasource_name, version)

#### Write storage output data (silver)

In [0]:
output = storage.get_path(output_container, output_path)
data.write.format("delta").mode("overwrite").save(output)