# Imports and Variables

In [0]:
from pyspark.sql import DataFrame
from typing import List
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType

#Functions

In [0]:
#function to check if table has all the columns of a input list

def checkAvailableColumns(df: DataFrame, expectedColumnsInput: List[str]) -> DataFrame:
    missing_columns=[]

    for column in expectedColumnsInput:
        if column not in df.columns:
            df_with_new_columns = df.withColumn(column, lit(None).cast(StringType()))
            missing_columns.append(column)

    if len(missing_columns) != 0: 
        return df_with_new_columns
       
    else:
        return 0

## Necessary imports

## Project Variables

In [0]:
table_names = [
    #silver
    'azure_costs_2',
    #gold
    'azure_costs_2'
]

# columns to extract
columns_paths = [
    "DepartmentName,AccountName,AccountOwnerId,SubscriptionGuid,SubscriptionName,ResourceGroup,ResourceLocation,AvailabilityZone,UsageDateTime,ProductName,MeterCategory,MeterSubcategory,MeterId,MeterName,MeterRegion,UnitOfMeasure,UsageQuantity,ResourceRate,PreTaxCost,CostCenter,ConsumedService,ResourceType,InstanceId,Tags,OfferId,AdditionalInfo,ServiceInfo1,ServiceInfo2,Currency,FilePath,LastExtractionDate,LastExtractionMonth",
    "DepartmentName,AccountName,AccountOwnerId,SubscriptionGuid,SubscriptionName,ResourceGroup,ResourceLocation,AvailabilityZone,UsageDateTime,ProductName,MeterCategory,MeterSubcategory,MeterId,MeterName,MeterRegion,UnitOfMeasure,UsageQuantity,ResourceRate,PreTaxCost,CostCenter,ConsumedService,ResourceType,InstanceId,Tags,OfferId,AdditionalInfo,ServiceInfo1,ServiceInfo2,Currency,FilePath,LastExtractionDate,LastExtractionMonth"
]

# columns alias
columns_names = [
    "DepartmentName,AccountName,AccountOwnerId,SubscriptionGuid,SubscriptionName,ResourceGroup,ResourceLocation,AvailabilityZone,UsageDateTime,ProductName,MeterCategory,MeterSubcategory,MeterId,MeterName,MeterRegion,UnitOfMeasure,UsageQuantity,ResourceRate,PreTaxCost,CostCenter,ConsumedService,ResourceType,InstanceId,Tags,OfferId,AdditionalInfo,ServiceInfo1,ServiceInfo2,Currency,FilePath,LastExtractionDate,LastExtractionMonth",
    "DepartmentName,AccountName,AccountOwnerId,SubscriptionGuid,SubscriptionName,ResourceGroup,ResourceLocation,AvailabilityZone,UsageDateTime,ProductName,MeterCategory,MeterSubcategory,MeterId,MeterName,MeterRegion,UnitOfMeasure,UsageQuantity,ResourceRate,PreTaxCost,CostCenter,ConsumedService,ResourceType,InstanceId,Tags,OfferId,AdditionalInfo,ServiceInfo1,ServiceInfo2,Currency,FilePath,LastExtractionDate,LastExtractionMonth,UniqueID"
]

columns_types = [
    "STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,TIMESTAMP,STRING",
    "STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,DATE,STRING,STRING,STRING,STRING,STRING,STRING,STRING,FLOAT,FLOAT,FLOAT,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,TIMESTAMP,STRING,STRING"
]

# columns merge source
columns_condition_source = [
    "",
    ""
]

# columns merge target
columns_condition_target = [
    "",
    ""
]


table_active = [
    1,
    1
]

delta_layers = [
    [1,1,0],
    [0,0,1]
]

In [0]:
#edit only if necessary to change table structure
integration_table_structure = ['tablename','columnstoextract','columnsalias','columnsdatatype','columnsmergesource','columnsmergetarget','active','bronze','silver','gold','lastextractiondate','integrationdate']

print(integration_table_structure)

['tablename', 'columnstoextract', 'columnsalias', 'columnsdatatype', 'columnsmergesource', 'columnsmergetarget', 'active', 'bronze', 'silver', 'gold', 'lastextractiondate', 'integrationdate']


# Create Control Layer Database (for Control and Auxiliary Tables)

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS control LOCATION 'dbfs:/Control';

## Create Control Tables

### Create Integration Control Table

In [0]:
#check if table exists and if has all columns
if spark._jsparkSession.catalog().tableExists('control', 'integrationcontrolcosts'):
    
    df_integrationcontrolgraph = spark.sql('select * from control.integrationcontrolcosts')
    #print(df_integrationcontrolgraph)
    
    df_check = checkAvailableColumns(df_integrationcontrolgraph, integration_table_structure) 
    #print(df_check)

    if df_check == 0:
      print('All columns already exists')
      
    else:
    #overwrite table to add new columns
      df_check.write.format('delta')\
        .option("overwriteSchema", "true")\
        .mode('overwrite')\
        .saveAsTable('control.integrationcontrolcosts')
        
#if not exists, create table
else:
    spark.sql(f"""CREATE TABLE IF NOT EXISTS control.integrationcontrolcosts  (     tablename          STRING,     columnstoextract   STRING,     columnsalias       STRING,     columnsdatatype    STRING,     columnsmergesource STRING,     columnsmergetarget STRING,     active             BOOLEAN,     bronze             BOOLEAN,     silver             BOOLEAN,     gold               BOOLEAN,     lastextractiondate TIMESTAMP,     integrationdate    TIMESTAMP)""")

All columns already exists


In [0]:
# Iterate over the tables names to find they are already in the Integration Control table
for idx in range(len(table_names)):

    # Get the number of rows in Integration Control table with the "name" and adds table name if the number of rows is equal to 0
    exists_table = spark.sql(f"SELECT COUNT(TableName) AS Quantity FROM control.integrationcontrolcosts WHERE Bronze = {delta_layers[idx][0]} AND Silver = {delta_layers[idx][1]} AND Gold = {delta_layers[idx][2]} AND TableName LIKE '{table_names[idx]}'").collect()

    
    
    ## INSERT row if table does not exist in the Integration Control Table or UPDATE otherwise
    if exists_table[0]["Quantity"] == 0:
        spark.sql(f"INSERT INTO  control.integrationcontrolcosts(TableName, ColumnsToExtract, ColumnsAlias, ColumnsDataType, ColumnsMergeSource, ColumnsMergeTarget, Active, Bronze, Silver, Gold, LastExtractionDate, IntegrationDate) VALUES('{table_names[idx]}', '{columns_paths[idx]}', '{columns_names[idx]}', '{columns_types[idx]}', '{columns_condition_source[idx]}', '{columns_condition_target[idx]}', {table_active[idx]}, {delta_layers[idx][0]}, {delta_layers[idx][1]}, {delta_layers[idx][2]},  '2000-01-01 00:00:00', '2000-01-01 00:00:00')")
    
    else:
        spark.sql(f"UPDATE control.integrationcontrolcosts SET ColumnsToExtract = '{columns_paths[idx]}', ColumnsAlias = '{columns_names[idx]}', ColumnsDataType = '{columns_types[idx]}', ColumnsMergeSource = '{columns_condition_source[idx]}', ColumnsMergeTarget = '{columns_condition_target[idx]}', Active = {table_active[idx]}, Bronze = {delta_layers[idx][0]}, Silver = {delta_layers[idx][1]}, Gold = {delta_layers[idx][2]} WHERE Bronze = {delta_layers[idx][0]} AND Silver = {delta_layers[idx][1]} AND Gold = {delta_layers[idx][2]} AND TableName LIKE '{table_names[idx]}'")
#

#Create Bronze Layer Database

In [0]:
%sql
create database if not exists bronze location 'dbfs:/Bronze';

#Create Silver Layer Database

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS silver LOCATION 'dbfs:/Silver';

## Create Silver Table

In [0]:
tables_to_extract = spark.sql("SELECT TableName, ColumnsAlias, ColumnsDataType FROM control.integrationcontrolcosts WHERE Active = 1 AND Silver = 1 ORDER BY TableName ASC").cache()
display(tables_to_extract)

TableName,ColumnsAlias,ColumnsDataType
azure_costs_2,"DepartmentName,AccountName,AccountOwnerId,SubscriptionGuid,SubscriptionName,ResourceGroup,ResourceLocation,AvailabilityZone,UsageDateTime,ProductName,MeterCategory,MeterSubcategory,MeterId,MeterName,MeterRegion,UnitOfMeasure,UsageQuantity,ResourceRate,PreTaxCost,CostCenter,ConsumedService,ResourceType,InstanceId,Tags,OfferId,AdditionalInfo,ServiceInfo1,ServiceInfo2,Currency,FilePath,LastExtractionDate,LastExtractionMonth","STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,TIMESTAMP,STRING"


In [0]:
# Create a Folder and Table for each extracted element
for table in tables_to_extract.collect():

    # Prepare string for the table creation
    columns_names = table["ColumnsAlias"].split(",")
    columns_types = table["ColumnsDataType"].split(",")

    creation_string = ""
    for idx in range(len(columns_names)):
        creation_string = creation_string + columns_names[idx] + " " + columns_types[idx] + ", "

    # Creates table if it does not exist
    spark.sql(f"CREATE TABLE IF NOT EXISTS silver.{table['TableName']}({creation_string[:-2]})")

#Create Gold Layer Database

In [0]:
%sql
create database if not exists gold LOCATION 'dbfs:/Gold';

## Create Gold Table

In [0]:
tables_to_extract = spark.sql("SELECT TableName, ColumnsAlias, ColumnsDataType FROM control.integrationcontrolcosts WHERE Active = 1 AND Gold = 1 ORDER BY TableName ASC").cache()
display(tables_to_extract)

TableName,ColumnsAlias,ColumnsDataType
azure_costs_2,"DepartmentName,AccountName,AccountOwnerId,SubscriptionGuid,SubscriptionName,ResourceGroup,ResourceLocation,AvailabilityZone,UsageDateTime,ProductName,MeterCategory,MeterSubcategory,MeterId,MeterName,MeterRegion,UnitOfMeasure,UsageQuantity,ResourceRate,PreTaxCost,CostCenter,ConsumedService,ResourceType,InstanceId,Tags,OfferId,AdditionalInfo,ServiceInfo1,ServiceInfo2,Currency,FilePath,LastExtractionDate,LastExtractionMonth,UniqueID","STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,DATE,STRING,STRING,STRING,STRING,STRING,STRING,STRING,FLOAT,FLOAT,FLOAT,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,STRING,TIMESTAMP,STRING,STRING"


In [0]:
# Create a Folder and Table for each extracted element

for table in tables_to_extract.collect():

   # Prepare string for the table creation
   columns_names = table["ColumnsAlias"].split(",")
   columns_types = table["ColumnsDataType"].split(",")

   creation_string = ""
   for idx in range(len(columns_names)):
       creation_string = creation_string + columns_names[idx] + " " + columns_types[idx] + ", "

   # Creates table if it does not exist
   spark.sql(f"CREATE TABLE IF NOT EXISTS gold.{table['TableName']}({creation_string[:-2]})")


# Create Staging Layer Database

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS staging LOCATION 'dbfs:/Staging';