In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, explode
from pyspark.sql.types import StructType, StructField, StringType, LongType, ArrayType

In [2]:
spark = SparkSession.builder.appName("JSON to Parquet").getOrCreate()
df = spark.read.json("../data/xaa", multiLine=True)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/07 15:25:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [71]:
# df.write.parquet("data.parquet")

In [3]:
df.columns

['@timestamp',
 '@version',
 'agent',
 'datacenter',
 'ecs',
 'event.type',
 'fields',
 'host',
 'input',
 'log',
 'log.logger',
 'message',
 'process.pid',
 'tags',
 'timestamp']

In [4]:
print(type(df))

<class 'pyspark.sql.dataframe.DataFrame'>


In [5]:
df.show(truncate=False)

+------------------------+--------+---------------------------------------------------------------------------------------------------------------------------------------+--------------+-------+----------+---------------------+-----------------------------------------+-----+--------------------------------------------------------------------+---------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------+-----------------------+
|@timestamp              |@version|agent                                                                                                                                  |datacenter    |ecs    |event.type|fields               |host                                     |input|log                                                                 |log.log

In [6]:
df.limit(6).toPandas()

Unnamed: 0,@timestamp,@version,agent,datacenter,ecs,event.type,fields,host,input,log,log.logger,message,process.pid,tags,timestamp
0,2025-02-27T08:13:18.897Z,1,"(659873d3-3c20-4f2c-837d-0a1ff90964c5, 678fe5e...",s42-branchburg,"(8.0.0,)",info,"(neutron, production)","(computedpdkcl-1.vim4-7.bbg-1.vzwops.com,)","(log,)",((/var/log/containers/neutron/openvswitch-agen...,neutron.plugins.ml2.drivers.openvswitch.agent....,[req-f9fe7deb-2d8c-4799-b362-eb1b86ed8c65 - - ...,10547,[server],2025-02-27 08:13:18.136


In [7]:
df.head()

Row(@timestamp='2025-02-27T08:13:18.897Z', @version='1', agent=Row(ephemeral_id='659873d3-3c20-4f2c-837d-0a1ff90964c5', id='678fe5e2-7e69-4c93-bb69-c577f80d6f51', name='computedpdkcl-1.vim4-7.bbg-1.vzwops.com', type='filebeat', version='8.11.3'), datacenter='s42-branchburg', ecs=Row(version='8.0.0'), event.type='info', fields=Row(log_type='neutron', node_status='production'), host=Row(name='computedpdkcl-1.vim4-7.bbg-1.vzwops.com'), input=Row(type='log'), log=Row(file=Row(path='/var/log/containers/neutron/openvswitch-agent.log'), level='info', offset=7216250), log.logger='neutron.plugins.ml2.drivers.openvswitch.agent.ovs_neutron_agent', message="[req-f9fe7deb-2d8c-4799-b362-eb1b86ed8c65 - - - - -] Agent rpc_loop - iteration:345687 completed. Processed ports statistics: {'regular': {'added': 0, 'updated': 0, 'removed': 0}}. Elapsed:0.002", process.pid='10547', tags=['server'], timestamp='2025-02-27 08:13:18.136')

In [8]:
df.toPandas().head(6)

Unnamed: 0,@timestamp,@version,agent,datacenter,ecs,event.type,fields,host,input,log,log.logger,message,process.pid,tags,timestamp
0,2025-02-27T08:13:18.897Z,1,"(659873d3-3c20-4f2c-837d-0a1ff90964c5, 678fe5e...",s42-branchburg,"(8.0.0,)",info,"(neutron, production)","(computedpdkcl-1.vim4-7.bbg-1.vzwops.com,)","(log,)",((/var/log/containers/neutron/openvswitch-agen...,neutron.plugins.ml2.drivers.openvswitch.agent....,[req-f9fe7deb-2d8c-4799-b362-eb1b86ed8c65 - - ...,10547,[server],2025-02-27 08:13:18.136


In [9]:
df.printSchema()
# df.show(truncate=False)

root
 |-- @timestamp: string (nullable = true)
 |-- @version: string (nullable = true)
 |-- agent: struct (nullable = true)
 |    |-- ephemeral_id: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- type: string (nullable = true)
 |    |-- version: string (nullable = true)
 |-- datacenter: string (nullable = true)
 |-- ecs: struct (nullable = true)
 |    |-- version: string (nullable = true)
 |-- event.type: string (nullable = true)
 |-- fields: struct (nullable = true)
 |    |-- log_type: string (nullable = true)
 |    |-- node_status: string (nullable = true)
 |-- host: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |-- input: struct (nullable = true)
 |    |-- type: string (nullable = true)
 |-- log: struct (nullable = true)
 |    |-- file: struct (nullable = true)
 |    |    |-- path: string (nullable = true)
 |    |-- level: string (nullable = true)
 |    |-- offset: long (nullable = true)
 |-- log

In [10]:
df_flattened = df.selectExpr(
    "`@timestamp`",
    "`@version` as version",
    "agent.ephemeral_id as agent_ephemeral_id",
    "agent.id as agent_id",
    "agent.name as agent_name",
    "agent.type as agent_type",
    "agent.version as agent_version",
    "datacenter",
    "ecs.version as ecs_version",
    "`event.type` as event_type",
    "fields.log_type as fields_log_type",
    "fields.node_status as fields_node_status",
    "host.name as host_name",
    "input.type as input_type",
    "log.file.path as log_file_path",
    "log.level as log_level",
    "log.offset as log_offset",
    "`log.logger` as log_logger",
    "message",
    "`process.pid` as process_pid",
    "tags",
    "timestamp"
)

In [11]:
df_flattened.printSchema()

root
 |-- @timestamp: string (nullable = true)
 |-- version: string (nullable = true)
 |-- agent_ephemeral_id: string (nullable = true)
 |-- agent_id: string (nullable = true)
 |-- agent_name: string (nullable = true)
 |-- agent_type: string (nullable = true)
 |-- agent_version: string (nullable = true)
 |-- datacenter: string (nullable = true)
 |-- ecs_version: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- fields_log_type: string (nullable = true)
 |-- fields_node_status: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- input_type: string (nullable = true)
 |-- log_file_path: string (nullable = true)
 |-- log_level: string (nullable = true)
 |-- log_offset: long (nullable = true)
 |-- log_logger: string (nullable = true)
 |-- message: string (nullable = true)
 |-- process_pid: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- timestamp: string (nullable = true)



### Trying as text file

In [18]:
# Read file as plain text
df_text = spark.read.text("../data/xaa")

df_text.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [19]:

schema = StructType([
    StructField("@timestamp", StringType(), True),
    StructField("@version", StringType(), True),
    StructField("agent", StructType([
        StructField("ephemeral_id", StringType(), True),
        StructField("id", StringType(), True),
        StructField("name", StringType(), True),
        StructField("type", StringType(), True),
        StructField("version", StringType(), True)
    ]), True),
    StructField("datacenter", StringType(), True),
    StructField("ecs", StructType([
        StructField("version", StringType(), True)
    ]), True),
    StructField("event.type", StringType(), True),
    StructField("fields", StructType([
        StructField("log_type", StringType(), True),
        StructField("node_status", StringType(), True)
    ]), True),
    StructField("host", StructType([
        StructField("name", StringType(), True)
    ]), True),
    StructField("input", StructType([
        StructField("type", StringType(), True)
    ]), True),
    StructField("log", StructType([
        StructField("file", StructType([
            StructField("path", StringType(), True)
        ]), True),
        StructField("level", StringType(), True),
        StructField("offset", StringType(), True)
    ]), True),
    StructField("log.logger", StringType(), True),
    StructField("message", StringType(), True),
    StructField("process.pid", StringType(), True),
    StructField("tags", ArrayType(StringType()), True),
    StructField("timestamp", StringType(), True)
])

In [20]:
df_parsed = df_text.withColumn("json_data", from_json(col("value"), schema)).select("json_data.*")

df_parsed.show(truncate=False)
df_parsed.printSchema()

+------------------------+--------+--------------------------------------------------------------------------------------------------------------------------------------------------+--------------+-------+----------+---------------------+----------------------------------------------------+-----+--------------------------------------------------------------------+---------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------+-----------------------+
|@timestamp              |@version|agent                                                                                                                           

In [21]:
df_flattened = df_parsed.selectExpr(
    "`@timestamp`",
    "`@version` as version",
    "agent.ephemeral_id as agent_ephemeral_id",
    "agent.id as agent_id",
    "agent.name as agent_name",
    "agent.type as agent_type",
    "agent.version as agent_version",
    "datacenter",
    "ecs.version as ecs_version",
    "`event.type` as event_type",
    "fields.log_type as fields_log_type",
    "fields.node_status as fields_node_status",
    "host.name as host_name",
    "input.type as input_type",
    "log.file.path as log_file_path",
    "log.level as log_level",
    "log.offset as log_offset",
    "`log.logger` as log_logger",
    "message",
    "`process.pid` as process_pid",
    "tags",
    "timestamp"
)

In [22]:
df_flattened.printSchema()

root
 |-- @timestamp: string (nullable = true)
 |-- version: string (nullable = true)
 |-- agent_ephemeral_id: string (nullable = true)
 |-- agent_id: string (nullable = true)
 |-- agent_name: string (nullable = true)
 |-- agent_type: string (nullable = true)
 |-- agent_version: string (nullable = true)
 |-- datacenter: string (nullable = true)
 |-- ecs_version: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- fields_log_type: string (nullable = true)
 |-- fields_node_status: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- input_type: string (nullable = true)
 |-- log_file_path: string (nullable = true)
 |-- log_level: string (nullable = true)
 |-- log_offset: string (nullable = true)
 |-- log_logger: string (nullable = true)
 |-- message: string (nullable = true)
 |-- process_pid: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- timestamp: string (nullable = true)



In [24]:
df_flattened.select(col("tags")).limit(10).show()

+--------+
|    tags|
+--------+
|[server]|
|[server]|
|[server]|
|[server]|
|[server]|
|[server]|
|[server]|
|[server]|
|[server]|
|[server]|
+--------+



In [25]:
df_flattened.createOrReplaceTempView("serverdb")

In [26]:
count = spark.sql("SELECT COUNT(*) as total_rows FROM serverdb")
count.show()



+----------+
|total_rows|
+----------+
|  22268352|
+----------+



                                                                                

In [29]:
tags = spark.sql("SELECT distinct tags as T FROM serverdb")
tags.show(truncate=False)



+---------------------------+
|T                          |
+---------------------------+
|[server]                   |
|[server, _grokparsefailure]|
|[udp]                      |
|[udp, _grokparsefailure]   |
+---------------------------+



                                                                                

In [34]:
ecs_version = spark.sql("SELECT distinct ecs_version as T FROM serverdb")
ecs_version.show(truncate=False)



+-----+
|T    |
+-----+
|8.0.0|
|1.8.0|
|NULL |
+-----+



                                                                                

In [35]:
version = spark.sql("SELECT distinct version as T FROM serverdb")
version.show(truncate=False)



+---+
|T  |
+---+
|1  |
+---+



                                                                                

In [None]:
datacenter = spark.sql("SELECT count(datacenter) as T FROM serverdb")
datacenter.show()

