# Create silver tables


In [0]:
__CATALOG = "btv_dc30"
__BRONZE_SCHEMA = "bronze"
__SILVER_SCHEMA = "silver"
__BRONZE_BUCKET = "btv-dc30-bronze-t7rrh"
__SILVER_BUCKET = "btv-dc30-silver-t7rrh"

# Utils

This code defines two functions for processing PySpark DataFrames with struct columns. The `infer_type()` inspects a sample string value and optionally its column name to infer its type as `boolean`, `long`, `double`, `timestamp`, `string`, or etc , treating columns like `ts` or containing `time` as `timestamps`. 

The `denullify_and_cast_struct()` takes a DataFrame and a struct column, removes any fields that are always `null`, and infers the type of each remaining field using a non-null sample. It then rebuilds the struct, casting each field to the appropriate PySpark type (`boolean`, `long`, `double`, `timestamp`, `string`, or etc) while leaving others as `strings`. The result is a cleaned and type-consistent struct column.

In [0]:
from pyspark.sql.functions import col, struct, first, from_unixtime, to_timestamp
from pyspark.sql import DataFrame
import re

long_regex = re.compile(r"-?\d+")
double_regex = re.compile(r"-?\d+\.\d+")
iso8601_regex = re.compile(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d+(\+\d{2}:\d{2})?")

def infer_type(value: str, column_name) -> str:
    """
    Infer the type of a string value, optionally using the column name.
    
    Args:
        value (str): Sample string value.
        column_name (str, optional): Name of the column for context.
        
    Returns:
        str: One of "boolean", "long", "double", "datetime", or "string".
    """
    # Check column name first for time-related columns
    if column_name.lower() in ("ts", "timestamp") or "time" in column_name.lower():
        if iso8601_regex.match(value):
            return "datetime"
        return "timestamp"
 

    if value is None:
        return "string"

    value = str(value).strip().lower()

    if value in ("true", "false"):
        return "boolean"

    if long_regex.fullmatch(value):
        return "long"

    if double_regex.fullmatch(value):
        return "double"

    return "string"


def denullify_and_cast_struct(df: DataFrame, table_name: str, struct_cols: list[str]) -> DataFrame:
    """
    Process multiple struct columns: remove null-only fields and cast types based on inferred types.

    Args:
        df (DataFrame): Input PySpark DataFrame.
        table_name (str): Name of the table for logging.
        struct_cols (list[str]): List of struct column names to process.

    Returns:
        DataFrame: Updated DataFrame with processed struct columns.
    """
    for struct_col in struct_cols:
        # Get fields in the struct
        struct_fields = df.schema[struct_col].dataType.fieldNames()

        # Determine fields that are always null
        # null_fields = []
        # for f in struct_fields:
        #     if df.filter(col(f"{struct_col}.{f}").isNotNull()).count() == 0:
        #         null_fields.append(f)

        # Keep only fields that have some non-null values
        fields_to_keep = [f for f in struct_fields]

        # Get one non-null sample for type inference
        samples = df.select(struct_col).dropna().limit(1000).collect()

        inferred_types = {}
        if samples:
            # Initialize dictionary to store lists of observed types for each field
            type_observations = {f: [] for f in fields_to_keep}

            for row in samples:
                row_dict = row[struct_col].asDict()
                for f in fields_to_keep:
                    value = row_dict.get(f)
                    if value is not None:
                        type_observations[f].append(infer_type(value, f))

            # Decide final type for each field (e.g., most common type)
            for f, types in type_observations.items():
                if types:
                    inferred_types[f] = max(set(types), key=types.count)  # most frequent type
                else:
                    inferred_types[f] = "string"
        else:
            inferred_types = {f: "string" for f in fields_to_keep}
        #print(f"{table_name} - {struct_col} inferred types:", inferred_types)
        

        # Build struct with type casting
        new_struct_cols = []
        for f in fields_to_keep:
            col_expr = col(f"{struct_col}.{f}")
            match inferred_types[f]:
                case "long":
                    new_struct_cols.append(col_expr.cast("long").alias(f))
                case "double":
                    new_struct_cols.append(col_expr.cast("double").alias(f))
                case "datetime":
                    new_struct_cols.append(to_timestamp(col_expr, "yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX").alias(f))
                case "timestamp":
                    new_struct_cols.append((col_expr.cast("bigint")).cast("timestamp").alias(f))
                case "boolean":
                    new_struct_cols.append(col_expr.cast("boolean").alias(f))
                case _:
                    new_struct_cols.append(col_expr.alias(f))

        # Replace original struct column with cleaned struct
        df = df.withColumn(struct_col, struct(*new_struct_cols))

    return df

# Create Sysmon Silver tables
This PySpark script orchestrates a standard Bronze-to-Silver transformation across multiple `sysmon_*` tables in a data lakehouse architecture. First, it lists all Bronze tables whose names match `sysmon_%`, then iterates through each one. For each table:

* It reads the Bronze Delta table into a DataFrame.
* It drops noisy or irrelevant fields (like `@timestamp`, `ecs`, `message`, etc.).
* It extracts and casts the nested string timestamp (`event.created`) into a proper timestamp column called `ts`.
* It creates a `hostname` root-level field from `host.name`.
* The script also conditionally adds more human-readable columns—like `rule`, `filename`, `registry_path`, `process_name`, and `username`, only if those nested fields exist in the Bronze schema.
* Finally, it writes the cleaned and enhanced data to the Silver layer as a Delta table, enabling schema evolution (mergeSchema) and organizing output by stripping the sysmon_ prefix and placing it into the appropriate Silver schema in S3.

This approach follows the medallion architecture’s Silver layer pattern—cleaning, standardizing, and enriching raw Bronze data to make it more analysis-ready


In [0]:
from pyspark.sql.functions import to_timestamp, col

tables = spark.sql(f"SHOW TABLES IN {__CATALOG}.{__BRONZE_SCHEMA}").filter("tableName LIKE 'sysmon_%'").collect()

for t in tables:
    bronze_table_name = f"{__CATALOG}.{__BRONZE_SCHEMA}.{t.tableName}"
    print(f"Query table: {bronze_table_name}")

    
    df = spark.read.table(bronze_table_name) \
        .drop("@timestamp","@version","agent", "ecs", "log", "message", "tags") \
        .withColumn("ts", to_timestamp(col("event.created"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")) \
        .withColumn("hostname", col("host.name")) 

    column_names =  df.columns

    if "rule" in column_names:
        df = df.withColumn("rule", col("rule.name"))

    if "file" in column_names:
        df = df.withColumn("filename", col("file.name"))
    
    if "registry" in column_names:
        df = df.withColumn("registry_path", col("registry.key")) 

    if "process" in column_names:
        df = df.withColumn("process_name", col("process.name"))

    if "user" in column_names:
        df = df.withColumn("username", col("user.name"))

    df.write.format("delta") \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .option("path", f"s3://{__SILVER_BUCKET}/sysmon/{t.tableName.replace('sysmon_', '')}") \
        .saveAsTable(f"{__CATALOG}.{__SILVER_SCHEMA}.{t.tableName}")


    

Query table: btv_dc30.bronze.sysmon_clipboardchange
Query table: btv_dc30.bronze.sysmon_createremotethread
Query table: btv_dc30.bronze.sysmon_dnseventdnsquery
Query table: btv_dc30.bronze.sysmon_driverloaded
Query table: btv_dc30.bronze.sysmon_error
Query table: btv_dc30.bronze.sysmon_filecreate
Query table: btv_dc30.bronze.sysmon_filecreatestreamhash
Query table: btv_dc30.bronze.sysmon_imageloaded
Query table: btv_dc30.bronze.sysmon_networkconnection
Query table: btv_dc30.bronze.sysmon_pipeeventpipeconnected
Query table: btv_dc30.bronze.sysmon_pipeeventpipecreated
Query table: btv_dc30.bronze.sysmon_processaccess
Query table: btv_dc30.bronze.sysmon_processchangedafilecreationtime
Query table: btv_dc30.bronze.sysmon_processcreation
Query table: btv_dc30.bronze.sysmon_processtampering
Query table: btv_dc30.bronze.sysmon_processterminated
Query table: btv_dc30.bronze.sysmon_registryeventobjectcreateanddelete
Query table: btv_dc30.bronze.sysmon_registryeventvalueset
Query table: btv_dc30

In [0]:
tables = spark.sql(f"SHOW TABLES IN {__CATALOG}.{__SILVER_SCHEMA}").filter("tableName LIKE 'sysmon_%'").collect()

for t in tables:
    table_name = f"{__CATALOG}.{__SILVER_SCHEMA}.{t.tableName}"
    print(f"Query table: {table_name}")
    spark.sql(f"""SELECT * FROM {table_name} LIMIT 3;""")

Query table: btv_dc30.silver.sysmon_clipboardchange
Query table: btv_dc30.silver.sysmon_createremotethread
Query table: btv_dc30.silver.sysmon_dnseventdnsquery
Query table: btv_dc30.silver.sysmon_driverloaded
Query table: btv_dc30.silver.sysmon_error
Query table: btv_dc30.silver.sysmon_filecreate
Query table: btv_dc30.silver.sysmon_filecreatestreamhash
Query table: btv_dc30.silver.sysmon_imageloaded
Query table: btv_dc30.silver.sysmon_networkconnection
Query table: btv_dc30.silver.sysmon_pipeeventpipeconnected
Query table: btv_dc30.silver.sysmon_pipeeventpipecreated
Query table: btv_dc30.silver.sysmon_processaccess
Query table: btv_dc30.silver.sysmon_processchangedafilecreationtime
Query table: btv_dc30.silver.sysmon_processcreation
Query table: btv_dc30.silver.sysmon_processtampering
Query table: btv_dc30.silver.sysmon_processterminated
Query table: btv_dc30.silver.sysmon_registryeventobjectcreateanddelete
Query table: btv_dc30.silver.sysmon_registryeventvalueset
Query table: btv_dc30

# Create Osquery Silver tables

The code iterates over all tables in a specified Bronze schema whose names start with `osquery_`, reads each table into a PySpark DataFrame, and performs a series of transformations to normalize and prepare the data for storage in a Silver schema. 

It drops unnecessary metadata columns, extracts relevant fields from nested JSON structures, converts a Unix timestamp to a proper timestamp column, and flattens the JSON columns struct into individual columns. 

For the `osquery_iptables` table specifically, it safely casts `src_port` and `dst_port` to integers, defaulting to `0` if the values are empty or non-numeric. Finally, it writes each transformed DataFrame as a Delta table to S3, overwriting existing data and allowing schema merging.


In [0]:
from pyspark.sql.functions import to_timestamp, col, timestamp_seconds, when


tables = spark.sql(f"SHOW TABLES IN {__CATALOG}.{__BRONZE_SCHEMA}").filter("tableName LIKE 'osquery_%'").collect()

for t in tables:
    bronze_table_name = f"{__CATALOG}.{__BRONZE_SCHEMA}.{t.tableName}"
    print(f"Query table: {bronze_table_name}")

    df = spark.read.table(bronze_table_name) \
        .drop("@timestamp","@version","agent", "ecs", "event", "fileset", "input", "log", "service", "tags") \
        .withColumn("columns", col("json.columns")) \
        .withColumn("ts", timestamp_seconds("json.unixTime")) \
        .withColumn("hostname", col("host.name")) \
        .withColumn("change", col("json.action")) 
    
    df = denullify_and_cast_struct(df, t.tableName, ["columns"])
    df = df.drop("json")


    columns_fields = df.select("columns.*").columns
    df = df.select(
        col("ts"),
        col("hostname"),
        col("change"),
        *[col(f"columns.{c}").alias(c) for c in columns_fields]
    )

    if t.tableName == "osquery_iptables":
        df = df \
            .withColumn(
                "src_port",
                when(col("src_port").rlike("^\d+$"), col("src_port").cast("int")).otherwise(0)
            ) \
            .withColumn(
                "dst_port",
                when(col("dst_port").rlike("^\d+$"), col("dst_port").cast("int")).otherwise(0)
            )

    df.write.format("delta") \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .option("path", f"s3://{__SILVER_BUCKET}/osquery/{t.tableName.replace('osquery_', '')}") \
        .saveAsTable(f"{__CATALOG}.{__SILVER_SCHEMA}.{t.tableName}")

    

Query table: btv_dc30.bronze.osquery_cpu_time
osquery_cpu_time - columns inferred types: {'core': 'long', 'guest': 'long', 'guest_nice': 'long', 'idle': 'long', 'iowait': 'long', 'irq': 'long', 'nice': 'long', 'softirq': 'long', 'steal': 'long', 'system': 'long', 'user': 'long'}
Query table: btv_dc30.bronze.osquery_device_nodes
osquery_device_nodes - columns inferred types: {'atime': 'timestamp', 'block_size': 'long', 'ctime': 'timestamp', 'gid': 'long', 'mode': 'long', 'mtime': 'timestamp', 'path': 'string', 'type': 'string', 'uid': 'long'}
Query table: btv_dc30.bronze.osquery_iptables
osquery_iptables - columns inferred types: {'bytes': 'long', 'chain': 'string', 'dst_ip': 'string', 'dst_mask': 'string', 'dst_port': 'string', 'filter_name': 'string', 'iniface': 'string', 'iniface_mask': 'string', 'match': 'string', 'outiface': 'string', 'outiface_mask': 'string', 'packets': 'long', 'policy': 'string', 'protocol': 'string', 'src_ip': 'string', 'src_mask': 'string', 'src_port': 'string

In [0]:
tables = spark.sql(f"SHOW TABLES IN {__CATALOG}.{__SILVER_SCHEMA}").filter("tableName LIKE 'osquery_%'").collect()

for t in tables:
    table_name = f"{__CATALOG}.{__SILVER_SCHEMA}.{t.tableName}"
    print(f"Query table: {table_name}")
    spark.sql(f"""SELECT * FROM {table_name} LIMIT 3;""")

Query table: btv_dc30.silver.osquery_cpu_time
Query table: btv_dc30.silver.osquery_device_nodes
Query table: btv_dc30.silver.osquery_iptables
Query table: btv_dc30.silver.osquery_kernel_modules
Query table: btv_dc30.silver.osquery_last
Query table: btv_dc30.silver.osquery_listening_ports
Query table: btv_dc30.silver.osquery_logged_in_users
Query table: btv_dc30.silver.osquery_memory_info
Query table: btv_dc30.silver.osquery_mounts
Query table: btv_dc30.silver.osquery_open_sockets
Query table: btv_dc30.silver.osquery_osquery_info
Query table: btv_dc30.silver.osquery_process_events
Query table: btv_dc30.silver.osquery_runtime_perf
Query table: btv_dc30.silver.osquery_schedule
Query table: btv_dc30.silver.osquery_smbios_tables
Query table: btv_dc30.silver.osquery_socket_events
Query table: btv_dc30.silver.osquery_syslog_events


# Create Zeek silver tables

The code iterates over all tables in a specified Bronze schema whose names start with `zeek_`, reads each table into a PySpark DataFrame, and converts the `ts` column from a Unix timestamp to a proper timestamp type. 

If the table contains Zeek network log fields such as `id.orig_h` and `id.resp_h`, it renames them to more descriptive names (`src_ip`, `src_port`, `dest_ip`, `dest_port`). After these transformations, each DataFrame is written as a Delta table to a Silver schema in S3, overwriting existing data and allowing schema merging.

In [0]:
from pyspark.sql.functions import to_timestamp, col, timestamp_seconds, when


tables = spark.sql(f"SHOW TABLES IN {__CATALOG}.{__BRONZE_SCHEMA}").filter("tableName LIKE 'zeek_%'").collect()

for t in tables:
    table_name = f"{__CATALOG}.{__BRONZE_SCHEMA}.{t.tableName}"
    print(f"Query table: {table_name}")

    df = spark.read.table(table_name)
    df = df.withColumn("ts", timestamp_seconds("ts"))

    column_names =  df.columns
    if "id.orig_h" in column_names:
        df = df.withColumnRenamed("id.orig_h", "src_ip") \
            .withColumnRenamed("id.orig_p", "src_port") \
            .withColumnRenamed("id.resp_h", "dest_ip") \
            .withColumnRenamed("id.resp_p", "dest_port")
    
    df.write.format("delta") \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .option("path", f"s3://{__SILVER_BUCKET}/zeek/{t.tableName.replace('zeek_', '')}") \
        .saveAsTable(f"{__CATALOG}.{__SILVER_SCHEMA}.{t.tableName}")



Query table: btv_dc30.bronze.zeek_cluster
['message', 'node', 'ts']
+--------------------+------+--------------------+
|             message|  node|                  ts|
+--------------------+------+--------------------+
|node down: worker...|logger|2022-02-19 23:19:...|
|node down: worker...|logger|2022-02-19 23:19:...|
|node down: worker...| proxy|2022-02-19 23:19:...|
+--------------------+------+--------------------+
only showing top 3 rows
Query table: btv_dc30.bronze.zeek_conn
['community_id', 'conn_state', 'duration', 'history', 'id.orig_h', 'id.orig_p', 'id.resp_h', 'id.resp_p', 'local_orig', 'local_resp', 'missed_bytes', 'orig_bytes', 'orig_ip_bytes', 'orig_pkts', 'proto', 'resp_bytes', 'resp_ip_bytes', 'resp_pkts', 'service', 'ts', 'tunnel_parents', 'uid']
+--------------------+----------+------------------+-------+-------------+--------+-----------+---------+----------+----------+------------+----------+-------------+---------+-----+----------+-------------+---------+-------

In [0]:
tables = spark.sql(f"SHOW TABLES IN {__CATALOG}.{__SILVER_SCHEMA}").filter("tableName LIKE 'zeek_%'").collect()

for t in tables:
    table_name = f"{__CATALOG}.{__SILVER_SCHEMA}.{t.tableName}"
    print(f"Query table: {table_name}")
    spark.sql(f"""SELECT * FROM {table_name} LIMIT 3;""")

Query table: btv_dc30.silver.zeek_cluster
Query table: btv_dc30.silver.zeek_conn
Query table: btv_dc30.silver.zeek_dce_rpc
Query table: btv_dc30.silver.zeek_dns
Query table: btv_dc30.silver.zeek_dpd
Query table: btv_dc30.silver.zeek_files
Query table: btv_dc30.silver.zeek_http
Query table: btv_dc30.silver.zeek_kerberos
Query table: btv_dc30.silver.zeek_known_certs
Query table: btv_dc30.silver.zeek_known_services
Query table: btv_dc30.silver.zeek_notice
Query table: btv_dc30.silver.zeek_ntlm
Query table: btv_dc30.silver.zeek_ntp
Query table: btv_dc30.silver.zeek_pe
Query table: btv_dc30.silver.zeek_rdp
Query table: btv_dc30.silver.zeek_reporter
Query table: btv_dc30.silver.zeek_smb_files
Query table: btv_dc30.silver.zeek_smb_mapping
Query table: btv_dc30.silver.zeek_smtp
Query table: btv_dc30.silver.zeek_software
Query table: btv_dc30.silver.zeek_ssl
Query table: btv_dc30.silver.zeek_stats
Query table: btv_dc30.silver.zeek_tunnel
Query table: btv_dc30.silver.zeek_weird


# Create hMail silver tables

## hMail SMTPD logs

In [0]:
from pyspark.sql.functions import to_timestamp, col, split, regexp_replace
from pyspark.sql.types import IntegerType

hmail_smtpd_df = spark.read.table(f"{__CATALOG}.{__BRONZE_SCHEMA}.hmail_smtpd") \
    .select("message", "host") \
    .withColumnRenamed("message", "raw_msg") \
    .withColumn("hostname", col("host.name")) \
    .drop("host")
    
hmail_smtpd_df_transformed = hmail_smtpd_df.withColumn("parts", split(col("raw_msg"), "\t")) \
    .withColumn(
        "ts",
        to_timestamp(
            regexp_replace(col("parts")[3], '^"|"$', ''),  # Remove leading and trailing double quotes
            "yyyy-MM-dd HH:mm:ss.SSS"  # Define the timestamp format
        )
    ) \
    .withColumn("service", regexp_replace(col("parts")[0], '"', '')) \
    .withColumn("client_ip", regexp_replace(col("parts")[4], '"', '')) \
    .withColumn("message", regexp_replace(col("parts")[5], '"', '')) \
    .drop("raw_msg", "parts")

# hmail_smtpd_df_transformed.printSchema()
# hmail_smtpd_df_transformed.show(truncate=False)

hmail_smtpd_df_transformed.write.format("delta") \
  .mode("overwrite") \
  .option("mergeSchema", "true") \
  .option("path", f"s3://{__SILVER_BUCKET}/hmail/smtpd") \
  .saveAsTable(f"{__CATALOG}.{__SILVER_SCHEMA}.hmail_smtpd")


## hMail SMTP logs

In [0]:
from pyspark.sql.functions import to_timestamp, col, split, regexp_replace
from pyspark.sql.types import IntegerType

hmail_smtp_df = spark.read.table(f"{__CATALOG}.{__BRONZE_SCHEMA}.hmail_smtp") \
    .select("message", "host") \
    .withColumnRenamed("message", "raw_msg") \
    .withColumn("hostname", col("host.name")) \
    .drop("host")

hmail_smtp_df_transformed = hmail_smtp_df.withColumn("parts", split(col("raw_msg"), "\t")) \
    .withColumn(
        "ts",
        to_timestamp(
            regexp_replace(col("parts")[0], '^"|"$', ''),  # Remove leading and trailing double quotes
            "yyyy-MM-dd HH:mm:ss"  # Define the timestamp format
        )
    ) \
    .withColumn("sender", col("parts")[1]) \
    .withColumn("recipient", col("parts")[2]) \
    .withColumn("client_ip", col("parts")[3]) \
    .withColumn("server_ip", col("parts")[4]) \
    .withColumn("protocol", col("parts")[5]) \
    .withColumn("status_code", col("parts")[7].cast(IntegerType())) \
    .withColumn("session_id", col("parts")[8].cast(IntegerType())) \
    .drop("raw_msg", "parts", "message")


hmail_smtp_df_transformed.write.format("delta") \
  .mode("overwrite") \
  .option("mergeSchema", "true") \
  .option("path", f"s3://{__SILVER_BUCKET}/hmail/smtp") \
  .saveAsTable(f"{__CATALOG}.{__SILVER_SCHEMA}.hmail_smtp")


root
 |-- hostname: string (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- sender: string (nullable = true)
 |-- recipient: string (nullable = true)
 |-- client_ip: string (nullable = true)
 |-- server_ip: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- status_code: integer (nullable = true)
 |-- session_id: integer (nullable = true)

+--------+-------------------+------------------------------------+------------------------------------------+-------------+---------+--------+-----------+----------+
|hostname|ts                 |sender                              |recipient                                 |client_ip    |server_ip|protocol|status_code|session_id|
+--------+-------------------+------------------------------------+------------------------------------------+-------------+---------+--------+-----------+----------+
|files   |2022-02-19 19:37:01|kama.suppetia@magnumtempus.financial|clarie.insigni@magnumtempusfinancial.com  |172.16.50.135|127.

## hMail IMAPD logs

In [0]:
from pyspark.sql.functions import to_timestamp, col, split, regexp_replace
from pyspark.sql.types import IntegerType

hmail_impad_df = spark.read.table(f"{__CATALOG}.{__BRONZE_SCHEMA}.hmail_impad") \
    .select("message", "host") \
    .withColumnRenamed("message", "raw_msg") \
    .withColumn("hostname", col("host.name")) \
    .drop("host")

imapd_df_transformed = hmail_impad_df.withColumn("parts", split(col("raw_msg"), "\t")) \
    .withColumn(
        "ts",
        to_timestamp(
            regexp_replace(col("parts")[3], '^"|"$', ''),  # Remove leading and trailing double quotes
            "yyyy-MM-dd HH:mm:ss.SSS"  # Define the timestamp format
        )
    ) \
    .withColumn("service", regexp_replace(col("parts")[0], '"', '')) \
    .withColumn("session_id", col("parts")[1].cast(IntegerType())) \
    .withColumn("client_ip", regexp_replace(col("parts")[4], '"', '')) \
    .withColumn("message", regexp_replace(col("parts")[5], '"', '')) \
    .drop("raw_msg", "parts")

#imapd_df_transformed.printSchema()
#imapd_df_transformed.show(truncate=False)

imapd_df_transformed.write.format("delta") \
  .mode("overwrite") \
  .option("mergeSchema", "true") \
  .option("path", f"s3://{__SILVER_BUCKET}/hmail/imapd") \
  .saveAsTable(f"{__CATALOG}.{__SILVER_SCHEMA}.hmail_imapd")



## hMail Application logs

In [0]:
from pyspark.sql.functions import to_timestamp, col, split, regexp_replace, regexp_extract
from pyspark.sql.types import IntegerType

hmail_app_df = spark.read.table(f"{__CATALOG}.{__BRONZE_SCHEMA}.hmail_app") \
    .select("message", "host") \
    .withColumnRenamed("message", "raw_msg") \
    .withColumn("hostname", col("host.name")) \
    .drop("host")

hmail_app_df_transformed = hmail_app_df.withColumn("parts", split(col("raw_msg"), "\t")) \
    .withColumn(
        "ts",
        to_timestamp(
            regexp_replace(col("parts")[2], '^"|"$', ''),  # Remove leading and trailing double quotes
            "yyyy-MM-dd HH:mm:ss.SSS"  # Define the timestamp format
        )
    ) \
    .withColumn("service", regexp_replace(col("parts")[0], '"', '')) \
    .withColumn("message", regexp_replace(col("parts")[3], '"', '')) \
    .drop("raw_msg", "parts")

# hmail_app_df_transformed.printSchema()
# hmail_app_df_transformed.show(truncate=False)

imapd_df_transformed.write.format("delta") \
  .mode("overwrite") \
  .option("mergeSchema", "true") \
  .option("path", f"s3://{__SILVER_BUCKET}/hmail/app") \
  .saveAsTable(f"{__CATALOG}.{__SILVER_SCHEMA}.hmail_app")


## hMail TCPIP logs

In [0]:
hmail_tcpip_df = spark.read.table(f"{__CATALOG}.{__BRONZE_SCHEMA}.hmail_tcpip") \
    .select("message", "host") \
    .withColumnRenamed("message", "raw_msg") \
    .withColumn("hostname", col("host.name")) \
    .drop("host")

hmail_tcpip_df_transformed = hmail_tcpip_df.withColumn("parts", split(col("raw_msg"), "\t")) \
    .withColumn(
        "ts",
        to_timestamp(
            regexp_replace(col("parts")[2], '^"|"$', ''),  # Remove leading and trailing double quotes
            "yyyy-MM-dd HH:mm:ss.SSS"  # Define the timestamp format
        )
    ) \
    .withColumn("service", regexp_replace(col("parts")[0], '"', '')) \
    .withColumn("message", regexp_replace(col("parts")[3], '"', '')) \
    .drop("raw_msg", "parts")

# hmail_tcpip_df_transformed.printSchema()
# hmail_tcpip_df_transformed.show(truncate=False)


hmail_tcpip_df_transformed.write.format("delta") \
  .mode("overwrite") \
  .option("mergeSchema", "true") \
  .option("path", f"s3://{__SILVER_BUCKET}/hmail/tcpip") \
  .saveAsTable(f"{__CATALOG}.{__SILVER_SCHEMA}.hmail_tcpip")

In [0]:
tables = spark.sql(f"SHOW TABLES IN {__CATALOG}.{__SILVER_SCHEMA}").filter("tableName LIKE 'hmail_%'").collect()

for t in tables:
    table_name = f"{__CATALOG}.{__SILVER_SCHEMA}.{t.tableName}"
    print(f"Query table: {table_name}")
    spark.sql(f"""SELECT * FROM {table_name} LIMIT 3;""")

Query table: btv_dc30.silver.hmail_app
Query table: btv_dc30.silver.hmail_imapd
Query table: btv_dc30.silver.hmail_smtp
Query table: btv_dc30.silver.hmail_smtpd
Query table: btv_dc30.silver.hmail_tcpip


# Create Wineventlogs silver tables

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType


tables = spark.sql(f"SHOW TABLES IN {__CATALOG}.{__BRONZE_SCHEMA}").filter("tableName LIKE 'wineventlogs_%'").collect()


for t in tables:
    table_name = f"{__CATALOG}.{__BRONZE_SCHEMA}.{t.tableName}"
    print(f"Query table: {table_name}")

    df = spark.read.table(table_name) \
        .drop("@timestamp","@version","agent", "ecs", "log", "tags") \
        .withColumn("ts", to_timestamp(col("event.created"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")) \
        .withColumn("hostname", col("host.name")) \

    df = df.select(
        "*",  # Include all existing columns
        *[col(f"winlog.{c}").alias(c) for c in df.select("winlog.*").columns]  # Flatten 'winlog' struct
    ).drop("winlog", "api", "computer_name")

    df.write.format("delta") \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .option("path", f"s3://{__SILVER_BUCKET}/wineventlogs/{t.tableName.replace('wineventlogs_', '')}") \
        .saveAsTable(f"{__CATALOG}.{__SILVER_SCHEMA}.{t.tableName}")



Query table: btv_dc30.bronze.wineventlogs_application
Query table: btv_dc30.bronze.wineventlogs_microsoft_windows_powershell
Query table: btv_dc30.bronze.wineventlogs_microsoft_windows_smbserver
Query table: btv_dc30.bronze.wineventlogs_microsoft_windows_sysmon
Query table: btv_dc30.bronze.wineventlogs_microsoft_windows_terminalservices_localsessionmanager
Query table: btv_dc30.bronze.wineventlogs_microsoft_windows_terminalservices_remoteconnectionmanager
