Connected to Python 3.11.4

In [34]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, unix_timestamp

In [25]:
from pyspark.sql.types import StructType, StructField, StringType, MapType

# Define the schema for the JSON structure
json_schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("type", StringType(), True),
    StructField("data", StructType([
        StructField("event", StringType(), True),
        StructField("result", StructType([
            StructField("message", StringType(), True)
        ]), True)
    ]), True),
    StructField("extra", StructType([
        StructField("headers", MapType(StringType(), StringType()), True)
    ]), True),
    StructField("session", StructType([
            StructField("id", StringType(), True),
            StructField("start", StringType(), True),
            StructField("end", StringType(), True),
        ]), True)
])


In [40]:
# Create a Spark session
spark = SparkSession.builder.appName("Example").config("spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()

# Sample JSON data
json_data = """
{
    "id": "rfc-recommendation-model-prod-id",
    "name": "rfc-recommendation-model-prod",
    "type": "EVENT",
    "data": {
        "event": "PREDICT_COMPLETED",
        "result": {
        "message": "pricejump endpoint completed"
        }
    },
    "session": {
        "id": "788b0cb26eb0469eb6c37b598948a2e4",
        "start": "2023-11-28 21:24:11.480165",
        "end": "2023-11-28 21:24:20.480165"
    },
    "extra": {
        "headers": {
        "Host": "svc-rfc-recommendation-v2.gvd-services.svc.cluster.local:5000",
        "Project-ID": "project-id-3"
        }
    }
}
"""

# Read JSON data with the specified schema
df = spark.read.json(spark.sparkContext.parallelize([json_data]), schema=json_schema)
selected_columns = ["id", "name", "type", "session.id", "session.start", "session.end", "data.event", "extra.headers.Project-ID"]
final_stream_df = df.select(*selected_columns)
final_stream_df = final_stream_df.withColumn("duration", (unix_timestamp("end") - unix_timestamp("start")).cast("int") * 1000)

final_stream_df.show()


# Access data within the "headers" field under "extra"
# headers_df = df.select("extra.headers")

# Show the result
# headers_df.show(truncate=False)


+--------------------+--------------------+-----+--------------------+--------------------+--------------------+-----------------+------------+--------+
|                  id|                name| type|                  id|               start|                 end|            event|  Project-ID|duration|
+--------------------+--------------------+-----+--------------------+--------------------+--------------------+-----------------+------------+--------+
|rfc-recommendatio...|rfc-recommendatio...|EVENT|788b0cb26eb0469eb...|2023-11-28 21:24:...|2023-11-28 21:24:...|PREDICT_COMPLETED|project-id-3|    9000|
+--------------------+--------------------+-----+--------------------+--------------------+--------------------+-----------------+------------+--------+



23/11/29 00:32:58 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 260302 ms exceeds timeout 120000 ms
23/11/29 00:32:58 WARN SparkContext: Killing executors is not supported by current scheduler.
23/11/29 00:32:59 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$