In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=b2b524966824960db09086abb9a733ac6feee848e5efc495ac2e59e35f0cbf54
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
# Create the Spark Session
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Streaming from Kafka") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0') \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("local[*]") \
    .getOrCreate()

spark

In [3]:
# Create the streaming_df to read from kafka
df = spark.readStream\
    .format("kafka") \
    .option("kafka.bootstrap.servers", "telemetry-kafka-external-bootstrap-observability-kafka.apps.zagaopenshift.zagaopensource.com:443") \
    .option("subscribe", "apmlogs") \
    .option("startingOffsets", "earliest") \
    .load()

In [4]:
df_parsed = df.selectExpr("CAST(value AS STRING) as json_string")

In [5]:
df_parsed

DataFrame[json_string: string]

In [6]:
from pyspark.sql.functions import explode, col, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, ArrayType

In [7]:
schema = StructType([
    StructField("resourceSpans", ArrayType(StructType([
        StructField("resource", StructType([
            StructField("attributes", ArrayType(StructType([
                StructField("key", StringType()),
                StructField("value", StructType([
                    StructField("stringValue", StringType()),
                    StructField("intValue", IntegerType())
                ]))
            ])))
        ])),
        StructField("scopeSpans", ArrayType(StructType([
            StructField("scope", StructType([
                StructField("name", StringType()),
                StructField("version", StringType())
            ])),
            StructField("spans", ArrayType(StructType([
                StructField("traceId", StringType()),
                StructField("spanId", StringType()),
                StructField("parentSpanId", StringType()),
                StructField("flags", IntegerType()),
                StructField("name", StringType()),
                StructField("kind", IntegerType()),
                StructField("startTimeUnixNano", LongType()),
                StructField("endTimeUnixNano", LongType()),
                StructField("attributes", ArrayType(StructType([
                    StructField("key", StringType()),
                    StructField("value", StructType([
                        StructField("stringValue", StringType()),
                        StructField("intValue", IntegerType())
                    ]))
                ]))),
                StructField("status", StructType())
            ])))
        ]))),
        StructField("schemaUrl", StringType())
    ])))
])


In [8]:
df_parsed = df_parsed.withColumn("data", from_json(col("json_string"), schema))

In [9]:
df_parsed

DataFrame[json_string: string, data: struct<resourceSpans:array<struct<resource:struct<attributes:array<struct<key:string,value:struct<stringValue:string,intValue:int>>>>,scopeSpans:array<struct<scope:struct<name:string,version:string>,spans:array<struct<traceId:string,spanId:string,parentSpanId:string,flags:int,name:string,kind:int,startTimeUnixNano:bigint,endTimeUnixNano:bigint,attributes:array<struct<key:string,value:struct<stringValue:string,intValue:int>>>,status:struct<>>>>>,schemaUrl:string>>>]

In [10]:
query = df_parsed.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

In [11]:
query

<pyspark.sql.streaming.query.StreamingQuery at 0x7f2cbe609660>

In [12]:
query.awaitTermination()