In [1]:
### Notebook ingests exploded VNET logs into ADLS & Kusto by listing files recursively at source

# Imports

In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
import subprocess
from notebookutils import mssparkutils
import pyspark.sql.functions as F

In [2]:
%run ADFJobs/CommonUtilFunctions/LinkedService_UtilFunctions

# Parameters

In [25]:
slice_date = '2024-06-26 02'
slice_start_date = '2024-06-17 15:00:00'
slice_end_date = '2024-06-18 15:00:00'
SOURCE_BLOB_CONTAINER_NAME = "insights-logs-flowlogflowevent"
SOURCE_BLOB_ACCOUNT_NAME = "vnetflowstorageeus" 
SOURCE_LINKED_SERVICE_NAME = "AzureDataLakeStorage_vnetflowstorageeus2_LinkedService"
TARGET_ADLS_CONTAINER_NAME = "prod"
TARGET_ADLS_ACCOUNT_NAME = "testadlsgen20"
TARGET_ADLS_LINKED_SERVICE_NAME = "AzureDataLakeStorage_testadlsgen20_LinkedService"
TARGET_KUSTO_LINKED_SERVICE_NAME = "AzureDataExplorer_Sipvnetlogsprod_LinkedService"
TARGET_KUSTO_DATABASE_NAME = "vnetlogs"
TARGET_KUSTO_TABLE_NAME = "logs"

In [5]:
# SOURCE_BLOB_ACCOUNT_NAME = "vnetflowstoragewus2"
# SOURCE_BLOB_ACCOUNT_NAME = "vnetflowstorageeus"
# SOURCE_BLOB_ACCOUNT_NAME = "vnetflowstoragecus"
# SOURCE_BLOB_ACCOUNT_NAME = "vnetflowstoragescus"
# TARGET_ADLS_ACCOUNT_NAME = "testadlsgen20"

#TARGET_ADLS_LINKED_SERVICE_NAME = "AzureDataLakeStorage_testadlsgen20_LinkedService"


# Source Configuration

In [26]:
spark.conf.set("fs.azure.account.oauth.provider.type", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider")
source_full_account_name = f"{SOURCE_BLOB_ACCOUNT_NAME}.dfs.core.windows.net"
spark.conf.set(f"spark.storage.synapse.{source_full_account_name}.linkedServiceName", SOURCE_LINKED_SERVICE_NAME)
SOURCE_RELATIVE_PATH_TEMPLATE = "flowLogResourceID=/"
source_url = f"abfss://{SOURCE_BLOB_CONTAINER_NAME}@{source_full_account_name}/{SOURCE_RELATIVE_PATH_TEMPLATE}"
print(source_url)

# Listing source files recursively

In [27]:
fl_res_ids = mssparkutils.fs.ls(source_url)

hour_file_list = []
slice_time = datetime.strptime(slice_date, '%Y-%m-%d %H')
file_name_append_text = '/y=%s/m=%s/d=%s/h=%s/m=00'% (
    datetime.strftime(slice_time, '%Y'),
    datetime.strftime(slice_time, '%m'),
    datetime.strftime(slice_time, '%d'),
    datetime.strftime(slice_time, '%H')
)
fl_res_ids = mssparkutils.fs.ls(source_url)
for res_id in fl_res_ids:
    vnets_list = mssparkutils.fs.ls(res_id.path)
    for vnet in vnets_list:
        file_hour_url = vnet.path + file_name_append_text
        hour_file_list.append(file_hour_url)

file_list = []
ex_count = 0
mac_count = 0
file_count = 0
for hour in hour_file_list:
    try:
        macs_list = mssparkutils.fs.ls(hour)
        mac_count += 1
        for mac in macs_list:            
            mac_full_url = mac.path + "/PT1H.json"           
            file_list.append(mac_full_url)
            file_count += 1
    except Exception as e:
        # print("Exception occure. Ignoring location : "+hour)
        ex_count += 1

print("Count of paths not found: %s found: %s file_count: %s" % (int(ex_count), int(mac_count), int(file_count))) 

# Source schema definition

In [28]:
# Defining the schema
schema = StructType([
    StructField("records", 
        ArrayType(StructType([
            StructField("category", StringType(),True),
            StructField("flowLogGUID", StringType(),True),
            StructField("flowLogResourceID", StringType(),True),
            StructField("flowLogVersion", LongType(),True),
            StructField("flowRecords", 
                StructType([
                    StructField("flows", 
                        ArrayType(StructType
                        ([
                        StructField("aclID",StringType(),True),
                        StructField("flowGroups", 
                            ArrayType(StructType([
                            StructField("flowTuples", ArrayType(StringType(),True)),
                            StructField("rule",StringType(),True)])),True)
                        ])),True)]),True),
            StructField("macAddress", StringType(),True),
            StructField("operationName", StringType(),True),
            StructField("targetResourceID", StringType(),True),
            StructField("time", StringType(),True)])),True)])

# Read and Process

In [29]:
# Time to read an entire hour
df_hour = spark.read.schema(schema).option("multiline","true").json(file_list).withColumn("input_file_name", input_file_name())

In [30]:
# Splitting df_hour to records, repartition for proper distribution
df_records = df_hour.withColumn("records", explode("records"))
df_records = df_records.repartition(160)
df_records.persist()
print(df_records.count())

In [15]:
# Display distribution across partitions
# rcrd_cnt_part = df_records.withColumn("partitionid",spark_partition_id()).groupBy("partitionid").count()
# print(rcrd_cnt_part.show(160))

In [31]:
# Creating df_exploded
df_exploded = (df_records
               .withColumn("category",col("records.category"))
               .withColumn("flowLogGUID",col("records.flowLogGUID"))
               .withColumn("flowLogResourceID",col("records.flowLogResourceID"))
               .withColumn("flowLogVersion",col("records.flowLogVersion"))
               .withColumn("macAddress",col("records.macAddress"))
               .withColumn("operationName",col("records.operationName"))
               .withColumn("targetResourceID",col("records.targetResourceID"))
               .withColumn("flowlog_collected_time",col("records.time"))
               .withColumn("flowRecords",explode(col("records.flowRecords.flows")))
               .withColumn("aclID",col("flowRecords.aclID"))
               .withColumn("flowGroups",explode(col("flowRecords.flowGroups")))
               .withColumn("flowTuples",explode(col("flowGroups.flowTuples")))               
               .withColumn("rule",col("flowGroups.rule"))).select("category",
                "flowLogGUID",
                "flowLogResourceID",
                "flowLogVersion",
                "aclID",
                "flowGroups",
                "rule",
                "macAddress",
                "operationName",
                "targetResourceID",
                "flowlog_collected_time",
                "input_file_name",
                "flowTuples")
               

In [32]:
# Creating df_exploded_full
df_exploded_full = (df_exploded.withColumn("flowTupleslist", split(df_exploded.flowTuples,","))
                            .withColumn("ETL_DATE", date_format(lit(slice_date), 'yyyy-MM-dd'))
                            .withColumn("Region",regexp_replace(lit(SOURCE_BLOB_ACCOUNT_NAME),"vnetflowstorage",""))
                            .withColumn("FLOWLOG_WINDOW_START_TIME", date_format(lit(slice_start_date), 'yyyy-MM-dd HH:00:00'))
                            .withColumn("FLOWLOG_WINDOW_END_TIME", date_format(lit(slice_end_date), 'yyyy-MM-dd HH:00:00')))
df_exploded_full = (df_exploded_full.select("category",
                            "flowLogGUID",
                            "flowLogResourceID",
                            "flowLogVersion",
                            "aclID",
                            "macAddress",
                            "operationName",
                            "targetResourceID",
                            col("flowlog_collected_time").cast("timestamp"),
                            "flowTuples",
                            "rule",
                            df_exploded_full.flowTupleslist.__getitem__(0).alias('Timestamp'),
                            df_exploded_full.flowTupleslist.__getitem__(1).alias('Source_IP'),
                            df_exploded_full.flowTupleslist.__getitem__(2).alias('Destination_IP'),
                            df_exploded_full.flowTupleslist.__getitem__(3).alias('Source_Port'),
                            df_exploded_full.flowTupleslist.__getitem__(4).alias('Destination_Port'),
                            df_exploded_full.flowTupleslist.__getitem__(5).alias('Protocol'),
                            df_exploded_full.flowTupleslist.__getitem__(6).alias('TrafficFlow'),
                            df_exploded_full.flowTupleslist.__getitem__(7).alias('Flow_State'),
                            df_exploded_full.flowTupleslist.__getitem__(8).alias('Flow_encryption_status'),
                            df_exploded_full.flowTupleslist.__getitem__(9).cast("long").alias('Packets_Source_to_Destination'),
                            df_exploded_full.flowTupleslist.__getitem__(10).cast("long").alias('Bytes_Source_to_Destination'),
                            df_exploded_full.flowTupleslist.__getitem__(11).cast("long").alias('Packets_Destination_to_Source'),
                            df_exploded_full.flowTupleslist.__getitem__(12).cast("long").alias('Bytes_Destination_to_Source'),
                            "input_file_name",
                            "Region",
                            "ETL_DATE",
                            "FLOWLOG_WINDOW_START_TIME",
                            "FLOWLOG_WINDOW_END_TIME")
                        )
df_exploded_full.persist()
print(df_exploded_full.count())

# Write to ADLS

In [33]:
#write to adls location
TARGET_ADLS_PATH = "feeds/logs"

target_sink_url = "abfss://%s@%s.dfs.core.windows.net/%s" % (
    TARGET_ADLS_CONTAINER_NAME,
    TARGET_ADLS_ACCOUNT_NAME,
    TARGET_ADLS_PATH
)
configure_account_token(
    f'{TARGET_ADLS_ACCOUNT_NAME}.dfs.core.windows.net',
    TARGET_ADLS_LINKED_SERVICE_NAME
)

print(target_sink_url)
df_exploded_full.write.format("delta").option("mergeSchema", "true").mode("append").partitionBy("ETL_DATE","Region").save(target_sink_url)

In [14]:
print(target_sink_url)

# Write to Kusto

In [14]:
#write to kusto table
(df_exploded_full
        .write
	    .format("com.microsoft.kusto.spark.synapse.datasource")
	    .option("spark.synapse.linkedService", TARGET_KUSTO_LINKED_SERVICE_NAME)
	    .option("kustoDatabase", TARGET_KUSTO_DATABASE_NAME)
	    .option("kustoTable", TARGET_KUSTO_TABLE_NAME)
	    .mode("Append")
        .save()
    )