# Create PySpark session

In [1]:
from delta import *
import pyspark
import os


external_jars = [
    "hadoop-aws-3.3.4.jar",
    "aws-java-sdk-core-1.12.599.jar",
    "aws-java-sdk-s3-1.12.599.jar",
    "delta-storage-3.0.0.jar",
    "delta-spark_2.12-3.0.0.jar",
    "aws-java-sdk-dynamodb-1.12.599.jar",
    "hadoop-common-3.3.4.jar",
]


SPARK_ENDPOINT = str()
for name, value in os.environ.items():
    if name.endswith("SPARK_MASTER_SVC_SERVICE_HOST"):
        SPARK_ENDPOINT=f"spark://{value}:7077"
print(f"SPARK_ENDPOINT: {SPARK_ENDPOINT}")


MINIO_ENDPOINT = str()
for name, value in os.environ.items():
    if name.endswith("_MINIO_PORT"):
        MINIO_ENDPOINT=value.replace("tcp://", "")
print(f"MINIO_ENDPOINT: {MINIO_ENDPOINT}")


builder = (
    pyspark.sql.SparkSession.builder.appName("Myapp")
    # Sets the Spark master/captain URL to connect too.
    .master(SPARK_ENDPOINT)
    # JARs on disk to load into our Spark session
    .config("spark.jars", ",".join(external_jars))
    # k8s service for Jupyter driver
    .config("spark.driver.host", "jupyter-driver")
    # Port for Jupyter driver
    .config("spark.driver.port", 2222)
    # Extending the capabilities of SQL searching with Delta tables
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
    ####### AWS setup and creds #######
    .config("spark.hadoop.fs.s3a.access.key", "analyst")
    .config("spark.hadoop.fs.s3a.secret.key", "analyst123")
    .config("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT)
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.attempts.maximum", "1")
    .config("spark.hadoop.fs.s3a.connection.establish.timeout", "5000")
    .config("spark.hadoop.fs.s3a.connection.timeout", "10000")
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()

print ("done")

SPARK_ENDPOINT: spark://10.152.183.194:7077
MINIO_ENDPOINT: 10.152.183.94:9000
done


# Osquery bronze schema

This segment of code will read the raw log data from S3 and into dataframe using a schema

In [2]:
from pyspark.sql.types import (
    StructField,
    StringType,
    MapType,
    LongType,
    IntegerType,
    ArrayType,
    StructType
)

osquery_bronze_schema = StructType([
    StructField("@timestamp",StringType(),True),
    StructField("@version",StringType(),True),
    StructField('agent', StructType([
         StructField('ephemeral_id', StringType(), True),
         StructField('hostname', StringType(), True),
         StructField('name', StringType(), True),
         StructField('type', StringType(), True),
         StructField('version', StringType(), True),
    ])),
    StructField('ecs', StructType([
         StructField('version', StringType(), True),
    ])),
    StructField('event', StructType([
         StructField('dataset', StringType(), True),
         StructField('module', StringType(), True),
    ])),
    StructField('fileset', StructType([
         StructField('name', StringType(), True),
    ])),
    StructField('host', StructType([
         StructField('name', StringType(), True),
    ])),
    StructField('input', StructType([
         StructField('type', StringType(), True),
    ])),
    StructField('json', StructType([
         StructField('action', StringType(), True),
        StructField('calendarTime', StringType(), True),
        StructField('columns', MapType(StringType(),StringType()), True),
        StructField('counter', IntegerType(), True),
        StructField('epoch', LongType(), True),
        StructField('unixTime', LongType(), True),
        StructField('numerics', IntegerType(), True),
        StructField('name', StringType(), True),
        StructField('hostIdentifier', StringType(), True),
    ])),
    StructField('log', StructType([
        StructField('file', StructType([
            StructField('path', StringType(), True),
        ])),
        StructField('offset', LongType(), True),
    ])),
    StructField('service', StructType([
         StructField('type', StringType(), True),
    ])),
    StructField("tags",ArrayType(StringType()),True),
])

df = spark.read.schema(osquery_bronze_schema).json("s3a://logs-bronze/osquery/osquery-*log")
df.show()

+--------------------+--------+--------------------+--------+--------------------+--------+------------------+-----+--------------------+--------------------+---------+--------------------+
|          @timestamp|@version|               agent|     ecs|               event| fileset|              host|input|                json|                 log|  service|                tags|
+--------------------+--------+--------------------+--------+--------------------+--------+------------------+-----+--------------------+--------------------+---------+--------------------+
|2022-02-12T18:21:...|       1|{1f678581-8e27-4b...|{1.12.0}|{osquery.result, ...|{result}|{ip-172-16-55-120}|{log}|{removed, Sat Feb...|{{/var/log/osquer...|{osquery}|[beats_input_raw_...|
|2022-02-12T18:21:...|       1|{1f678581-8e27-4b...|{1.12.0}|{osquery.result, ...|{result}|{ip-172-16-55-120}|{log}|{added, Sat Feb 1...|{{/var/log/osquer...|{osquery}|[beats_input_raw_...|
|2022-02-12T18:21:...|       1|{1f678581-8e27-4b..

# Drop metadata columns
Filebeat includes a bunch of metadata about the agent sending the logs that we don't need. This section will drop these unnecessary fields. 

In [3]:
osquery_bronze = df
drop_metadata_columns = [
    "@timestamp",
    "@version",
    "agent",
    "ecs",
    "input",
    "log",
    "tags",
    "fileset",
    "service",
]
osquery_silver_no_metadata = osquery_bronze.drop(*drop_metadata_columns)


# Rename columns and pull data to top level

In [4]:
from pyspark.sql.functions import col


osquery_silver = osquery_silver_no_metadata.select(
    col("json.hostIdentifier").alias("hostname"),
    col("json.action").alias("action"),
    col("json.name").alias("table"),
    col("json.unixTime").alias("unixTime"),
    col("json.columns").alias("columns"),
)

osquery_silver.show()

+----------------+-------+--------------------+----------+--------------------+
|        hostname| action|               table|  unixTime|             columns|
+----------------+-------+--------------------+----------+--------------------+
|ip-172-16-55-120|removed|            cpu_time|1644690067|{user -> 26696, s...|
|ip-172-16-55-120|  added|            cpu_time|1644690067|{user -> 26699, s...|
|ip-172-16-55-120|  added|       syslog_events|1644690073|{severity -> 6, t...|
|ip-172-16-55-120|removed|            cpu_time|1644690067|{user -> 27158, s...|
|ip-172-16-55-120|  added|            cpu_time|1644690067|{user -> 27165, s...|
|ip-172-16-55-120|removed|pack_incident-res...|1644690077|{pid -> 28733, ty...|
|ip-172-16-55-120|  added|      process_events|1644690086|{path -> /usr/bin...|
|ip-172-16-55-120|removed|         memory_info|1644690094|{buffers -> 62951...|
|ip-172-16-55-120|  added|         memory_info|1644690094|{buffers -> 62959...|
|ip-172-16-55-120|  added|       syslog_

# Write dataframe as Delta Lake table

In [5]:
osquery_silver.write \
    .format("delta") \
    .mode('overwrite') \
    .save("s3a://logs-silver/osquery/osquery.delta")


In [6]:
spark.stop()