In [0]:
source = 'OSCE'
xml_veraion = '2.0'

PROJECT_NAME = 'BIFC_OSCE'
DB = 'BIF_CONNECT'
START_DATE = '2022-01-01' 
END_DATE = '2022-02-01'

In [0]:
import json
import os
import pyspark.sql.functions as F
from pyspark.sql.types import StructField, StructType, StringType, ArrayType, LongType, TimestampType, BinaryType, IntegerType, DateType
from datetime import datetime, timedelta
import re
from pyspark.sql import Window

In [0]:
# get raw data
with open("/dbfs/mnt/trendmicrobif/TBL_DATA_RAW.json", "r") as f:
    raw_str = f.read()
    raw_json = json.loads(raw_str)
    raw_array = raw_json[0]['data']
    raw_schema = raw_json[0]['schema']

def string_to_date(x): 
    return datetime.strptime(x.split('.')[0], "%Y-%m-%dT%H:%M:%S")

string_to_date_udf = udf(string_to_date, DateType())

# Create StructType
raw_schema = StructType([
    StructField('file_id', StringType(), True),
    StructField('source', StringType(), True),
    StructField('xml_version', StringType(), True),
    StructField('file_name', StringType(), True),
  StructField('file_content', StringType(), True),
  StructField('file_date', StringType(), True),
  StructField('ip', StringType(), True)
])
rdd = spark.sparkContext.parallelize(raw_array)

# Create DataFrame
df_raw = (spark.createDataFrame(rdd,raw_schema)
          .withColumn('file_date', string_to_date_udf(F.col('file_date')))
          .write.format('delta')
          .mode('overwrite')
          .save('/mnt/trendmicrobif/TBL_DATA_RAW')
         )
display(df_raw)

In [0]:
df_raw = (spark.read.format('delta')
          .load('/mnt/trendmicrobif/TBL_DATA_RAW')
          .filter(F.col('source') == source)
          .filter(F.col('xml_version') == xml_veraion )
     )
df_raw.createOrReplaceTempView('TBL_DATA_RAW')
display(df_raw)

file_id,source,xml_version,file_name,file_content,file_date,ip
1086684193,OSCE,2.0,2b99524634_202201200630591561_2.0_14.98.44.14,12.02022-01-20 06:30:561ff50061-507c-4037-b03f-e3341c41ebb614.0.96750512210.50.4000.0;SP2;Express Edition (64-bit)000000000en116.3.960016393967501001001101http11100000000000000000null 450110111111-111,2022-01-20,14.98.44.14
1086685978,OSCE,2.0,ffc87264ea_202201200609579365_2.0_103.54.206.1,12.02022-01-20 06:09:1464f1a88e-caac-4d9e-900b-0609aa11a01314.0.96750512113.0.5026.0;SP2;Express Edition (64-bit)000020000ja116.3.96001041967500001001101http11101001000000000000null 56030111111-111,2022-01-20,103.54.206.1
1086684090,OSCE,2.0,20b375a174_202201200701378254_2.0_194.199.34.178,12.02022-01-20 07:01:005ab499f6-2068-4eee-854c-ed0f1be1086b14.0.91670412113.0.5026.0;SP2;Express Edition (64-bit)000000000fr116.2.9200103691670100100101http11100000000000000860100110,2022-01-20,194.199.34.178
1086656686,OSCE,2.0,9e04306918_202201200300060709_2.0_148.244.251.27,12.02022-01-20 03:00:02d319c178-d8bd-4f04-842f-6d3e6159304c14.0.8417012215.0.2080.9;RTM;Standard Edition (64-bit)0000011600en1010.0.17763205884170100100131http11111000000000000950083.0.0.3034en-US000010111,2022-01-20,148.244.251.27
1086685949,OSCE,2.0,fc65c25a8a_202201200700228414_2.0_218.227.223.238,12.02022-01-20 07:00:19d7874301-4c29-4a36-9c67-8f8993ce7c1414.0.92040412213.0.5026.0;SP2;Express Edition (64-bit)000000000ja1110.0.14393104192040000100101http1000000000000000015010110,2022-01-20,218.227.223.238
1086685964,OSCE,2.0,fd837d849a_202201200700201129_2.0_200.57.3.10,"12.02022-01-20 07:00:15f864dc56-85be-4ff4-9adb-51c506b269c714.0.100480612112.0.4100.1;SP1;Standard Edition (64-bit)000001100en106.3.9600103310048010110011506048003601440000701http111010000000001000000{""CustomPolicyTrackingInfo"":[{""Policy"":""{\""AddAllPolicy\"":{\""Policies\"":[],\""SectionID\"":\""BM-Customized-Policy\""}}"",""PolicyID"":""02a059f8-69e4-4273-a252-af28d6447444"",""PolicyName"":""Pol�tica General"",""UsedCount"":""1380""},{""Policy"":""{\""AddAllPolicy\"":{\""Policies\"":[],\""SectionID\"":\""BM-Customized-Policy\""}}"",""PolicyID"":""87b6a9c9-78e0-4c5e-a26a-111bfc8f58a6"",""PolicyName"":""General AV-VP"",""UsedCount"":""1380""}]} 11-1-1-1-10-1001290230101proxy.sat.gob.mx31280SACIC2AM10008000800000http://0!CRYPTNG!6E16F52C7BD2ABAEEA6F3394E8CD814D1601380181043.0.0.3063en-US000010110111-1https://api-us1.xbc.trendmicro.com/api/public/download_xbc_agent/v1.0/c7037330-f1bd-44c9-afbf-4b992c01550dhttps://api-us1.xbc.trendmicro.com/api/public/download_xbc_agent/v1.0/8a4ba2ff-45e7-4cb6-ad32-65913294590a1-11",2022-01-20,200.57.3.10
1086684356,OSCE,2.0,3d79b52af1_202201200701237061_2.0_93.122.90.232,12.02022-01-20 07:01:221d0e2bab-2b58-447d-ade3-d9b925e8214814.0.96450512113.0.5103.6;SP2;Express Edition (64-bit)000000000de1010.0.177631031964501101001111https11100000000000000000null 25490420110111-111,2022-01-20,93.122.90.232
1086685969,OSCE,2.0,fe0b83947d_202201200700018704_2.0_78.119.230.161,12.02022-01-20 07:00:210f307429-d1fe-43f5-b1ca-595121b12cf014.0.96750512215.0.2000.5;RTM;Express Edition (64-bit)000000000fr1010.0.143931036967501001001101http10000000000000000000null 114060111111-111,2022-01-20,78.119.230.161
1086657144,OSCE,2.0,c126a44af6_202201200300484176_2.0_223.29.124.222,12.02022-01-20 03:00:42afe7df7a-a616-4713-9929-d2c76a09785014.0.96010512113.0.5102.14;SP2;Express Edition (64-bit)000000000ja1010.0.177631041960101001001101http10000000000000000000755040110111-111,2022-01-20,223.29.124.222
1086639784,OSCE,2.0,f9acaeceaf_202201200000144106_2.0_61.86.202.170,12.02022-01-20 00:00:387a59188e-f25e-4430-93c2-393b5f7b5af514.0.96750512212.0.5000.0;SP2;Express Edition000000000ja1110.0.143931041967500001001101http10000000000000000000null 33070110111-111,2022-01-20,61.86.202.170


In [0]:
%fs ls /mnt/trendmicrobif/

path,name,size,modificationTime
dbfs:/mnt/trendmicrobif/TBL_DATA_RAW/,TBL_DATA_RAW/,0,0
dbfs:/mnt/trendmicrobif/TBL_DATA_RAW.json,TBL_DATA_RAW.json,140907,1647486111000
dbfs:/mnt/trendmicrobif/TBL_PROJECT_GROUP.csv,TBL_PROJECT_GROUP.csv,30978,1649753353000
dbfs:/mnt/trendmicrobif/TBL_RAW_XPATH.csv,TBL_RAW_XPATH.csv,56099,1647594371000
dbfs:/mnt/trendmicrobif/TBL_XML_PARSER_CONFIG.csv,TBL_XML_PARSER_CONFIG.csv,166988,1649755855000
dbfs:/mnt/trendmicrobif/TBL_XML_RAW_CONFIG.csv,TBL_XML_RAW_CONFIG.csv,930230,1647573227000
dbfs:/mnt/trendmicrobif/xmlsource/,xmlsource/,0,0


In [0]:
df_group= (spark.read.format('csv')
         .option('header', True)
         .load(f'/mnt/trendmicrobif/TBL_PROJECT_GROUP.csv')
         .filter(F.col('Project_Name') == PROJECT_NAME)
         
        )

df_config = (spark.read.format('csv')
         .option('header', True)
         .load(f'/mnt/trendmicrobif/TBL_XML_PARSER_CONFIG.csv')
#          .filter(F.col('Project_Name') == PROJECT_NAME)
          .withColumn("sort",F.col("sort").cast(IntegerType()))
        )

In [0]:
#ResultType 1:Name/Value Pair

df1 = (df_group
    .filter('Name_Value_Pair_Table_Name is not null')
    .withColumn('ResultType', F.lit('1'))
    .withColumn('Table_Name', F.col('Name_Value_Pair_Table_Name'))
    .withColumn('With_File_Date', F.when(F.col('With_File_Date').isNull(), F.lit('0')).otherwise(F.col('With_File_Date'))) 
    .select('Group_Name','Table_Schema','Table_Name','ResultType','Name_Value_Pair_Table_Name','Data_Source_View','With_File_Date','project_group_id')
     )

display(df1.limit(5))

Group_Name,Table_Schema,Table_Name,ResultType,Name_Value_Pair_Table_Name,Data_Source_View,With_File_Date,project_group_id
OSCE_2.0_Endpoint_Setting,OSCE,OSCE_Endpoint_Setting,1,OSCE_Endpoint_Setting,,True,159
OSCE_2.0_Endpoint_BM,OSCE,OSCE_Endpoint_BM,1,OSCE_Endpoint_BM,,True,158
OSCE_2.0_Server_Integrated_Server,OSCE,OSCE_Server_Integrated_Server,1,OSCE_Server_Integrated_Server,,True,164
OSCE_2.0_WR_External,OSCE,OSCE_WR_External,1,OSCE_WR_External,,True,160
OSCE_2.0_Server_Agent_Global_Setting,OSCE,OSCE_Server_Agent_Global_Setting,1,OSCE_Server_Agent_Global_Setting,,True,163


In [0]:
# select @category = coalesce(p.category, '')
# , @data_type = IIF(@data_type = 'nvarchar', @data_type, IIF(charindex('int', p.data_type, 1) = 0, 'nvarchar', 'bigint'))
# , @data_length = IIF(IIF(ISNUMERIC(p.data_length)=1, cast(p.data_length as numeric), 10) > @data_length
# 					, IIF(ISNUMERIC(p.data_length)=1, cast(p.data_length as numeric), 10), @data_length)
# from BIFC_ETL.TBL_XML_PARSER_CONFIG p
# join BIFC_ETL.TBL_PROJECT_GROUP g 
# 		on p.project_group_id = g.Project_Group_id 
# 		and g.group_name = @groupName
# 		and g.Project_Name = @Project_Name


# if (@data_type = 'nvarchar' and @data_length < 10)
# 	set @data_length = 10

# if (len(@category) = 0)
# begin

# select @sql = @sql + ', 
# [group] [varchar](100) NOT NULL,
# [row_id] [uniqueidentifier] NOT NULL,
# [column_name] [varchar](1000) NULL,
# [column_value] [nvarchar](max) NULL
# '
# end
# else
# begin

# select @sql = @sql + ', 
# [Category] [varchar](300) NOT NULL,
# [Name] [varchar](1000) NULL,
# ' + IIF(charindex('int', @data_type, 1) > 0, '[count] [bigint] NULL'
# ,'[Value] [nvarchar](' + cast(@data_length as varchar(20)) + ') NULL') + '
# '

# end

In [0]:
#ResultType 1:Name/Value Pair

# for row in df1_rdd:

data={}
groupname = 'OSCE_2.0_Endpoint_Setting'
tablename = 'OSCE_Endpoint_Setting'
schema = 'OSCE'
sql = '[File_ID] [bigint]'
col = '[File_ID]'
fullTableName = f'{DB}.{tablename}'
print(fullTableName)


df_config_table = (df_config.alias('p')
                  .filter('project_group_id==159')
                  )

df_config_table_rdd = df_config_table.select('category','data_type','data_length').collect()

category = df_config_table_rdd[1][0]
data_type = df_config_table_rdd[1][1]
data_length = df_config_table_rdd[1][2]

if (len(category)) > 0:
    sql = sql + ', [group] [varchar](100) NOT NULL,[row_id] [uniqueidentifier] NOT NULL, [column_name] [varchar](1000) NULL, [column_value] [nvarchar](max) NULL'
else:
    col = '[count] [bigint] NULL' if data_type == 'int' else f'[Value] [nvarchar](20) NULL'
    sql = sql + ', [Category] [varchar](300) NOT NULL,[Name] [varchar](1000) NULL,' + col
print(sql)

display(df_config_table)

project_group_id,xpath_id,column_name,data_type,data_length,sort,boolean_string,category
159,260,64_Bit,int,,6,,CPU_Type
159,131,Convention,int,,3,,Client_Scan_Mode
159,115,Enabled,int,,2,,Firewall
159,259,32_Bit,int,,5,,CPU_Type
159,114,Disabled,int,,1,,Firewall
159,109,Enabled,int,,8,,Cisco_NAC
159,108,Disabled,int,,7,,Cisco_NAC
159,132,SmartScan,int,,4,,Client_Scan_Mode


In [0]:
#ResultType 2:Flat table

df2 = (df_group
    .filter('Flat_Table_Name is not null')
    .withColumn('ResultType', F.lit('2'))
    .withColumn('Table_Name', F.col('Flat_Table_Name'))
    .withColumn('With_File_Date', F.when(F.col('With_File_Date').isNull(), F.lit('0')).otherwise(F.col('With_File_Date'))) 
    .select('Group_Name','Table_Schema','Table_Name','ResultType','Name_Value_Pair_Table_Name','Data_Source_View','With_File_Date','project_group_id')
     )
display(df2.limit(5))

Group_Name,Table_Schema,Table_Name,ResultType,Name_Value_Pair_Table_Name,Data_Source_View,With_File_Date,project_group_id
OSCE_2.0_Server_Plugin,OSCE,OSCE_Server_Plugin,2,,,True,140
OSCE_2.0_Endpoint_Plugin,OSCE,OSCE_Endpoint_Plugin,2,,,True,137
OSCE_2.0_AC,OSCE,OSCE_AC,2,,,True,133
OSCE_2.0_Endpoint_Config,OSCE,OSCE_Endpoint_Config,2,,[BIFC_ETL].[vw_OSCE_Endpoint_Config],True,136
OSCE_2.0_Feedback_Main,OSCE,OSCE_Feedback_Main,2,,,True,138


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

windowSpec  = Window.partitionBy("p.column_name").orderBy(F.col("data_type").desc())

df2_rdd = df2.collect()

# ResultType 2:Flat table ceate table schema

create_list = []

for row in df2_rdd:

    data={}
    Group_Name = row.Group_Name
    Table_Name = row.Table_Name
    Table_Schema = row.Table_Schema
    Data_Source_View = row.Data_Source_View
    create = ''
    sql = '[File_ID] [bigint]'
    col = '[File_ID]'
    fullTableName = f'{DB}.{row.Table_Name}'
    project_group_id = row.project_group_id
    if row.With_File_Date:
        sql = sql + ',[File_Date] [date]'
        col = col + ',[File_Date]'
    
    print(fullTableName)

    if ( Data_Source_View == None ):

        df_config_sort = df_config.groupBy('project_group_id','column_name').agg(F.min('sort').alias('sort'))
        df_config_table = (df_config.alias('p')
                          .filter(f'project_group_id=={project_group_id}')
                          .join(df_config_sort.alias('s'), [F.col('p.project_group_id')==F.col('s.project_group_id'),
                                                                        F.col('p.column_name')==F.col('s.column_name')], 'inner')
                          .select('p.column_name','data_type','data_length','s.sort')
                          .withColumn("data_length",F.col("data_length").cast(IntegerType()))
                          .withColumn("id",row_number().over(windowSpec))
                          .filter('id == 1')
                          .orderBy(F.col("sort").asc())
                          )

        df_config_table_rdd = df_config_table.collect()
        for t in df_config_table_rdd:
            #print(t)
            lenght = f'({t.data_length})' if t.data_length != None else ''
            sql = sql + f',[{t.column_name}] [{t.data_type}]{lenght}' 
            #col = col + f',[{t.column_name}]'
        create = f'create table {fullTableName}({sql})WITH(DISTRIBUTION = ROUND_ROBIN,CLUSTERED COLUMNSTORE INDEX)'
    
#     else:
#         query = f"""select COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH 
#             from INFORMATION_SCHEMA.COLUMNS
#             where TABLE_SCHEMA = {DB} and TABLE_NAME = {Table_Name}
#             -- QUOTENAME(TABLE_SCHEMA) + '.' + QUOTENAME(TABLE_NAME) = @data_source_view 
#             and [COLUMN_NAME] not in ('file_id', 'file_date')"""
        
#         df_config_table = (spark.read
#                 .format('com.databricks.spark.sqldw')
#                 .option('url', _url)
#                 .option('tempDir', 'abfss://bifconnect@gdwpocdls.dfs.core.windows.net/tempDirs')
#                 .option('useAzureMSI', 'true')
#                 .option('query', query)
#                 .load()
#                );
        
#         df_config_table_rdd = df_config_table.collect()
#         for t in df_config_table_rdd:
#         #     print(t)
#             lenght = f'({t.CHARACTER_MAXIMUM_LENGTH})' if t.CHARACTER_MAXIMUM_LENGTH != None else ''
#             sql = sql + f',[{t.column_name}] [{t.data_type}]{lenght}' 

    data['fullTableName'] = fullTableName
    data['project_group_id'] = project_group_id
    data['group_name'] = Group_Name
    data['result_type'] = 2
    data['create'] = create
    data['data_source_view'] = Data_Source_View
    create_list.append(data)
    

In [0]:
create_rdd= spark.sparkContext.parallelize(create_list)
create_df = (spark
             .createDataFrame(create_rdd)
            )
display(create_df)

create,data_source_view,fullTableName,project_group_id,result_type
"create table BIF_CONNECT.OSCE_Server_Plugin([File_ID] [bigint],[File_Date] [date],[Plugin_Name] [varchar](39),[Plugin_Status] [int],[Plugin_Version] [varchar](23))WITH(DISTRIBUTION = ROUND_ROBIN,CLUSTERED COLUMNSTORE INDEX)",,BIF_CONNECT.OSCE_Server_Plugin,140,2
"create table BIF_CONNECT.OSCE_Endpoint_Plugin([File_ID] [bigint],[File_Date] [date],[Plugin_Count] [int],[Plugin_Name] [varchar](21),[Plugin_Status] [int])WITH(DISTRIBUTION = ROUND_ROBIN,CLUSTERED COLUMNSTORE INDEX)",,BIF_CONNECT.OSCE_Endpoint_Plugin,137,2
"create table BIF_CONNECT.OSCE_AC([File_ID] [bigint],[File_Date] [date],[AC_AP_Name] [varchar](10),[AC_Expired_Days] [int],[AC] [varchar](47),[AC_Language] [varchar](10),[AC_OS] [varchar](10),[AC_Product_Code] [varchar](10),[AC_Service] [varchar](17))WITH(DISTRIBUTION = ROUND_ROBIN,CLUSTERED COLUMNSTORE INDEX)",,BIF_CONNECT.OSCE_AC,133,2
,[BIFC_ETL].[vw_OSCE_Endpoint_Config],BIF_CONNECT.OSCE_Endpoint_Config,136,2
"create table BIF_CONNECT.OSCE_Feedback_Main([File_ID] [bigint],[File_Date] [date],[Last_Updated_Time] [datetime],[Feedback_Version] [varchar](15),[Seat] [int],[S_Agent_Grouping] [varchar](1),[TMCM_Enabled] [varchar](1),[Product_Database_Mode] [int],[GUID] [varchar](54),[Product_HTTP_Server_Type] [int],[Product_OS_Language] [int],[Product_OS_Type] [int],[Product_OS_Code] [varchar](15),[Product_PccNT_Build] [varchar](15),[Product_Language] [varchar](10),[Product_Version_Build] [varchar](14),[Product_Upgrade] [varchar](1),[Protocol_Version] [int],[Product_SQL_Auth_Mode] [int],[E_Meerkat_Log_Only_Disabled_Count] [int],[E_Meerkat_Log_Only_Enabled_Count] [int],[S_BT_SO_Use_CM] [varchar](1),[S_Role_Service] [int],[DAC_Policy_Count] [int],[DLP_Policy_Count] [int],[S_Agent_OUS_Count] [int],[S_Allow_Agent_Update_From_OUS] [int],[S_Custom_Scan_Server_List_Count] [int],[S_Endpoint_Location] [int],[S_Hybrid_Mode] [int],[S_Internal_Proxy_Type] [int],[S_Use_Custom_Scan_Server] [int],[iAC_Agent_Enabled_Count] [int],[iAC_Base_Action] [int],[iAC_Policy] [int],[iAC_Product_Language] [varchar](10),[iAC_Product_Version] [varchar](20),[iAC_Assign_Rule_Enabled] [varchar](1),[iAC_Assign_Rule_With_AD_Enabled] [varchar](1),[iAC_Config_Rule_Enabled] [varchar](1),[iAC_OS_Arch] [int],[iAC_OS_Branch] [varchar](20),[iAC_OS_Build] [int],[iAC_OS_Platform] [varchar](13),[iAC_OS_Service_Pack] [varchar](10),[iAC_OS_Version] [varchar](58),[iAC_OS_Version_Number] [varchar](20),[iATAS_Agent_Enabled_Count] [int],[iES_Is_MDR_Enabled] [varchar](1),[iES_Maximum_Metadata_Storage] [int],[iES_Product_Build] [int],[iES_Product_Version] [varchar](20),[iVP_Agent_Enabled_Count] [int],[TMCM_Proxy_Sock] [int],[S_NAT_Enabled] [varchar](1),[Product_Patch] [varchar](10),[Product_Service_Pack_Patch] [varchar](10),[Product_Service_Pack] [int],[S_Sample_Submission_Count_In_BIF_Period] [int],[Product_SQL_Server_Verion] [varchar](116),[S_Update_Source_Type] [int],[S_Use_CM_Proxy] [varchar](1),[iAC_Config_Policy_Enabled] [varchar](1),[iVP_Endpoint_IPS_Enabled_Client_Count] [int],[iVP_Manual_Enable_IPS_Rule] [varchar](1),[iES_Maximum_Redis_Storage] [int],[iES_SaaS_Storage_License] [int],[XDR_Onboard_Accept_XBC] [varchar](1),[XDR_Onboard_Get_Onboard_URL] [varchar](1),[XDR_Onboard_Is_Onboarded] [varchar](1),[XDR_Onboard_Prepare_XBC_Patch] [varchar](1),[BIF_Status] [int],[CLP_ID] [varchar](63),[BIF_Feedback_Days] [int],[XDR_Onboard_Enable_Onboard_Promotion] [varchar](1),[XDR_Onboard_Enable_Pre_Onboard] [varchar](1),[XDR_Onboard_Enable_Scheduled_Preonboard] [varchar](1),[XDR_Onboard_XBC_Download_Mac_Universal] [varchar](500),[XDR_Onboard_XBC_Download_URL_Win32] [varchar](500),[XDR_Onboard_XBC_Uninstall] [int])WITH(DISTRIBUTION = ROUND_ROBIN,CLUSTERED COLUMNSTORE INDEX)",,BIF_CONNECT.OSCE_Feedback_Main,138,2
"create table BIF_CONNECT.OSCE_Server_Hotfix([File_ID] [bigint],[File_Date] [date],[Hotfix_Build] [varchar](15),[Hotfix_Installed_Date] [varchar](10),[Hotfix_Released_Date] [varchar](10),[Hotfix_Version] [varchar](15))WITH(DISTRIBUTION = ROUND_ROBIN,CLUSTERED COLUMNSTORE INDEX)",,BIF_CONNECT.OSCE_Server_Hotfix,139,2
"create table BIF_CONNECT.OSCE_Endpoint_Version([File_ID] [bigint],[File_Date] [date],[Client_Version_Count] [int],[Client_Version_Build] [varchar](17))WITH(DISTRIBUTION = ROUND_ROBIN,CLUSTERED COLUMNSTORE INDEX)",,BIF_CONNECT.OSCE_Endpoint_Version,135,2
"create table BIF_CONNECT.OSCE_Endpoint_OS([File_ID] [bigint],[File_Date] [date],[Client_OS_Count] [int],[Client_OS_Embedded_Platform] [int],[Client_OS_Type] [int],[Client_OS_Code] [varchar](15))WITH(DISTRIBUTION = ROUND_ROBIN,CLUSTERED COLUMNSTORE INDEX)",,BIF_CONNECT.OSCE_Endpoint_OS,134,2
"create table BIF_CONNECT.OSCE_AC([File_ID] [bigint],[File_Date] [date],[AC] [varchar](2000))WITH(DISTRIBUTION = ROUND_ROBIN,CLUSTERED COLUMNSTORE INDEX)",,BIF_CONNECT.OSCE_AC,238,2
"create table BIF_CONNECT.OSCE_Client_OS_10([File_ID] [bigint],[File_Date] [date],[Client_OS_1.0] [varchar](305))WITH(DISTRIBUTION = ROUND_ROBIN,CLUSTERED COLUMNSTORE INDEX)",,BIF_CONNECT.OSCE_Client_OS_10,239,2


In [0]:
# create table if not exist

for row in create_df.collect():
    
    fullTableName = row.fullTableName
    data_source_view = row.data_source_view
    project_group_id = row.project_group_id
    create = row.create
    
    result_type = row.result_type
    
    query = f"""select OBJECT_ID(N'{fullTableName}') as ObjectId """
        
    df_table_exist = (spark.read
                    .format('com.databricks.spark.sqldw')
                    .option('url', _url)
                    .option('tempDir', 'abfss://bifconnect@gdwpocdls.dfs.core.windows.net/tempDirs')
                    .option('useAzureMSI', 'true')
                    .option('query', query)
                    .load()
                   );
        
    df_table_exist_rdd = df_table_exist.collect()
    is_table_exist = True if df_table_exist_rdd[0][0] != None else False
    print(f'{fullTableName} {is_table_exist}')
    

In [0]:
#insert data

for row in create_df.collect():

   
    fullTableName = row.fullTableName
    data_source_view = row.data_source_view
    project_group_id = row.project_group_id
    create = row.create
    sql = ''
    result_type = row.result_type
    
    if (data_source_view == None)
        sql = """exec BIFC_ETL.PRC_SEL_XML_PARSER_RESULT '''+ @Project_Name +''', ''' + @groupName + ''', ' + cast(@resultType as varchar(1)) + ', null, ''' + @sdate + ''', ''' + @edate + ''', ' + cast(@with_file_date as varchar(1)) """

    else :
        if (result_type == 1):
            
#             select @sql = @sql + IIF(len(@sql) = 0, '', '
# , ') + 'cast([' + column_name + '] as nvarchar(max)) as [' + column_name + ']'
# , @columns = @columns + IIF(len(@columns) = 0, '', ', ') + '[' + column_name + ']'
# from BIF_CONNECT_RAW.INFORMATION_SCHEMA.COLUMNS
# where QUOTENAME(TABLE_SCHEMA) + '.' + QUOTENAME(TABLE_NAME) = @data_source_view
# and COLUMN_NAME not in ('file_id', 'file_date')
# order by ORDINAL_POSITION;

# select @sql = 'select m.[file_id], ' + IIF(@with_file_date = 1,'m.[file_date], ' , '') + 'newid() as [row_id], ' + @sql + '
# from ' + @data_source_view + ' m
# where m.[file_date] >= ''' + format(@StartDate, 'yyyy-MM-dd') + ''' and m.[file_date] < ''' + format(DateAdd(day, 1, @EndDate), 'yyyy-MM-dd') + '''
# and not exists(select top 1 1 from ' + @fullTableName + ' r with (nolock) where m.[file_id] = r.[file_id])'

# select @sql = 'select [file_id], ' + IIF(@with_file_date = 1,'m.[file_date], ' , '') + '''' +  @groupName + ''' as [group], [row_id], [column_name], [column_value]
# from (
# ' + @sql + '
# ) p unpivot ([column_value] for [column_name] in (' + @columns + ')) x'


        else if (result_type == 2):
            sql = f"""select * from {data_source_view}  m with (nolock) 
            where m.[file_date] >= ''' format({START_DATE}, 'yyyy-MM-dd')''' and m.[file_date] < '''format(DateAdd(day, 1, {END_DATE}), 'yyyy-MM-dd') '''
            and not exists(select top 1 1 from {fullTableName} r with (nolock) where m.[file_id] = r.[file_id])"""
    
    print()

In [0]:
spark.conf.set('fs.azure.account.key.gdwpocdls.dfs.core.windows.net', dbutils.secrets.get(scope = 'BIFC-asagdwpocws-etl', key = 'StorageAccount-gdwpocdls-AccessKey'));
_url = dbutils.secrets.get(scope = 'BIFC-asagdwpocws-etl', key = 'Synapse-asagdwpocetl-ConnectionString');

_df_test = (spark.read
            .format('com.databricks.spark.sqldw')
            .option('url', _url)
            .option('tempDir', 'abfss://bifconnect@gdwpocdls.dfs.core.windows.net/tempDirs')
            .option('useAzureMSI', 'true')
            .option('query', 'select * from ACC_MART.Dim_Suite')
            .load()
           );

display(_df_test)

Suite_ID9,Suite_Description,Suite_ID6,Overall_Level0,Suite_Group_Level1,Suite_Group_Level2,BIF_Product_Group_Category,Product_Segment,Is_Excluded,Form5_Product_Group_Category,Suite_Solution,Suite_Group_Level2_ID,Suite_Group_Level1_ID,Overall_Level0_ID,Suite_Solution_ID,Is_SaaS_Suite,Is_Subscription
CXC6ZZMSC,Cloud One - Application Security - per container,CXC6ZZ,Cloud One Portfolio,Cloud One,Cloud One - Application Security,Cloud One,Enterprise,0,Cloud One,Cloud One Portfolio,119,27,1,1,Yes,Yes
DCDAZZE3X,Damage Cleanup Services,DCDAZZ,Workforce One Portfolio,Enterprise EndPoint Standalone Sales,Damage Cleanup Service & Outbreak Prevention Service,Damage Cleanup Services,Enterprise,0,Damage Cleanup Services,Workforce One Portfolio,93,35,3,3,No,Yes
DDZYZBE5X,"Deep Discovery Inspector (Virtual Appliance model, 250 Mbps) w Analyzer as a Service Add-on",DDZYZB,Network One Portfolio,Deep Discovery,Deep Discovery Inspector,Deep Discovery,Enterprise,0,Deep Discovery,Network One Portfolio,12,3,2,2,No,No
DX3PMMJ75,Deep Security Full Bundle Pack - Manager plus Agent appliance no AV,DX3PMM,Cloud One Portfolio,Deep Security,Deep Security Software,Deep Security,Enterprise,0,Deep Security,Cloud One Portfolio,76,4,1,1,No,No
EIEYWWM5X,Trend Micro Endpoint Encryption - Full Disk Encryption and File Encryption,EIEYWW,Workforce One Portfolio,Enterprise EndPoint Standalone Sales,Endpoint Encryption Standalone Sales,Endpoint Encryption,Enterprise,0,Endpoint Encryption,Workforce One Portfolio,19,35,3,3,No,No
GPZZVSJ1X,Trend Micro Enterprise Security for Gateways Japanese,GPZZVS,Workforce One Portfolio,Legacy Enterprise Suites,Enterprise Security for Gateway,Enterprise Security for Gateway,Enterprise,0,Enterprise Security for Gateway,Workforce One Portfolio,26,7,3,3,No,No
MSDTMMM8X,Trend Micro Mobile Security - Suite Edition,MSDTMM,Workforce One Portfolio,User Protection - Others,Mobile Security Standalone Sales,Mobile Security,Enterprise,0,Mobile Security,Workforce One Portfolio,39,11,3,3,No,No
NAZZZZE3X,NeatSuite Enterprise Edition III for Lotus Notes (NeatSuite II + DCS + OPS; Asia South),NAZZZZ,Workforce One Portfolio,Legacy Enterprise Suites,NeatSuite Legacy,NeatSuite Legacy,Enterprise,0,NeatSuite Legacy,Workforce One Portfolio,40,7,3,3,No,No
POZBLLMXX,Policy Manager option Universal Controller,POZBLL,Workforce One Portfolio,User Protection - Others,Policy Manager,Policy Manager,Enterprise,0,Policy Manager,Workforce One Portfolio,49,11,3,3,No,No
PSTSTCZ2X,Technical Account Management Services Additional Contact,PSTSTC,Service One Portfolio,PSP,Premium Support,TAM Services,Enterprise,0,TAM Services,Service One Portfolio,51,40,4,4,No,Yes


In [0]:

# select @sql = 'Create Table ' + @fullTableName + '(
# ' + @sql + ') on [BIF_Connect_Data]
# with (Data_Compression = Page);
# CREATE CLUSTERED INDEX [cx_' + @groupName + '] ON ' + @fullTableName + '
# (
# 	[file_id] DESC
# ) WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON)
# '

In [0]:
# select @sql = '[File_ID] [bigint]' + IIF(@with_file_date = 1, ', [File_Date] [date]', '')
# , @cols = '[File_ID]' + IIF(@with_file_date = 1, ', [File_Date]', '')

# if (@data_source_view = '')


# select @sql = @sql + ', ' + QUOTENAME(m.column_name) + ' ' + QUOTENAME(m.data_type)
# + IIF(len(m.data_length) > 0, '(' + m.data_length + ')', '') + '
# ' , @cols = @cols + ', ' + QUOTENAME(COLUMN_NAME)
# select ROW_NUMBER() over (partition by p.column_name order by data_type desc) as ix ,p.column_name, p.data_type, p.data_length, s.sort
# from BIFC_ETL.TBL_XML_PARSER_CONFIG p
# join (
# 	select project_group_id, column_name, min(sort) as sort from BIFC_ETL.TBL_XML_PARSER_CONFIG group by project_group_id, column_name
# ) s on p.project_group_id=s.project_group_id and p.column_name=s.column_name
# join BIFC_ETL.TBL_PROJECT_GROUP g 
# 	on p.project_group_id = g.Project_Group_id
# 	and g.group_name = 'OSCE_1.0_Feedback_Main'
# 	and g.Project_Name = 'BIFC_OSCE'
# ) m where m.ix = 1
# order by m.Sort 




Group_Name,Table_Schema,ResultType,Name_Value_Pair_Table_Name,Data_Source_View,With_File_Date
OSCE_2.0_Endpoint_Setting,OSCE,1,OSCE_Endpoint_Setting,,True
OSCE_2.0_Endpoint_BM,OSCE,1,OSCE_Endpoint_BM,,True
OSCE_2.0_Server_Integrated_Server,OSCE,1,OSCE_Server_Integrated_Server,,True
OSCE_2.0_WR_External,OSCE,1,OSCE_WR_External,,True
OSCE_2.0_Server_Agent_Global_Setting,OSCE,1,OSCE_Server_Agent_Global_Setting,,True
OSCE_2.0_Server_OPP,OSCE,1,OSCE_Server_OPP,,True
OSCE_2.0_Server_Smart_Protection,OSCE,1,OSCE_Server_Smart_Protection,,True
OSCE_2.0_Server_BT_SO,OSCE,1,OSCE_Server_BT_SO,,True
OSCE_2.0_WR_Internal,OSCE,1,OSCE_WR_Internal,,True
OSCE_2.0_Server_Edge_Agent,OSCE,1,OSCE_Server_Edge_Agent,,True


In [0]:
# select ROW_NUMBER() over (order by isnull(Data_Source_View, ''), group_name) as [ID], * 
# from (
#  select Group_Name
# , Table_Schema
# , 1 as ResultType
# , Name_Value_Pair_Table_Name as [Table_Name]
# , Data_Source_View
# , isnull(With_File_Date,0) With_File_Date
# from BIFC_ETL.TBL_PROJECT_GROUP
# where Project_Name = 'BIFC_OSCE'
# and Name_Value_Pair_Table_Name is not null
# union 
# --flat 
# select Group_Name
# , Table_Schema
# , 2 as ResultType
# , Flat_Table_Name as [Table_Name]
# , Data_Source_View
# , isnull(With_File_Date,0) With_File_Date
# from BIFC_ETL.TBL_PROJECT_GROUP
# where Project_Name = 'BIFC_OSCE'
# and Flat_Table_Name is not null
# )aaa
# where ResultType = 1