In [1]:
import pyspark.sql.functions as f
import sys

## Network Logs ETL

Reads data from JSON files and transform field that is JSON into structure to be able to analyze it with spark

In [2]:
df_network_logs_transient = spark.read.json('filebeat/logs')

                                                                                

In [3]:
json_schema = spark.read.json(df_network_logs_transient.rdd.map(lambda row: row.message)).schema
df_network_logs_final = df_network_logs_transient.withColumn('message_json', f.from_json(f.col('message'), json_schema)).drop(f.col('message'))

21/11/15 19:33:38 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [4]:
## Prints schema of the final product
#df_network_logs_final.printSchema()

Writes logs into datalake in Parquet format (more performant, smaller)

In [5]:
df_network_logs_final.write\
.partitionBy('dt','hr')\
.parquet('datalake/network_logs')

                                                                                

Data quality check (checks if the resulting datalake has rows. If not, raise error.

In [6]:
df = spark.read.parquet('datalake/network_logs')
row_count = df.count()
if row_count < 1:
    raise("Data Quality Failed, number of rows lesser than 1")
else:
    print(f"[+] Data Quality Check Passed for Network Logs, row_count: {row_count} [+]")

[+] Data Quality Check Passed for Network Logs, row_count: 1000560 [+]


Cleans dataframes from memory

In [7]:
f = open('/dev/null', 'w')
old_stdout = sys.stdout
sys.stdout = f

df_network_logs_transient.unpersist()
df_network_logs_final.unpersist()
df.unpersist()

sys.stdout = old_stdout

# Host logs ETL

In [8]:
df_host_logs = spark.read.json('auditbeat/logs')

                                                                                

In [9]:
## Prints schema of the final product
#df_host_logs.printSchema()

Writes logs into datalake in Parquet format (more performant, smaller)

In [10]:
df_host_logs.write\
.partitionBy('dt','hr')\
.parquet('datalake/host_logs')

                                                                                

Data quality check (checks if the resulting datalake has rows. If not, raise error.

In [11]:
df = spark.read.parquet('datalake/host_logs')
row_count = df.count()
if row_count < 1:
    raise("Data Quality Failed, number of rows lesser than 1")
else:
    print(f"[+] Data Quality Check Passed for Host Logs, row_count: {row_count} [+]")

[+] Data Quality Check Passed for Host Logs, row_count: 84856 [+]


Cleans dataframes from memory

In [12]:
f = open('/dev/null', 'w')
old_stdout = sys.stdout
sys.stdout = f

df_host_logs.unpersist()
df.unpersist()

sys.stdout = old_stdout