In [1]:
import pyspark
import os
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.streaming import *
from pyspark.sql.functions import *

conf = (SparkConf()
       .set("spark.driver.bindAddress", "0.0.0.0")\
       .set("spark.submit.deployMode", "client")\
       .set("spark.kubernetes.driver.pod.name", os.getenv("MY_POD_NAME"))\
       .set("spark.kubernetes.authenticate.subdmission.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")\
       .set("spark.kubernetes.authenticate.submission.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token")\
       .set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark-driver")\
       .set("spark.kubernetes.namespace", os.getenv("MY_POD_NAMESPACE"))\
       .set("spark.executor.instances", "1")\
       .set("spark.dynamicAllocation.maxExecutors", "3")\
       .set("spark.dynamicAllocation.enabled", "true")\
       .set("spark.dynamicAllocation.shuffleTracking.enabled", "true")\
       .set("spark.driver.host", "spark-driver")\
       .set("spark.driver.port", "20020")\
       .set("spark.kubernetes.executor.request.cores", "1")\
       .set("spark.kubernetes.executor.limit.cores", "2")\
       .set("spark.executor.memory", "4096m")\
       .set("spark.kubernetes.container.image", "klovercloud/airflow-spark-k8s-driver:3.1.2")\
       .set("spark.kubernetes.container.image.pullPolicy", "IfNotPresent")\
       .set("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")\
       .set("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkSessionCatalog")\
       .set("spark.sql.catalog.spark_catalog.type","hive")\
       .set("spark.hadoop.hive.metastore.uris","thrift://hive-metastore.analytics-dev:9083")\
       .set("spark.hadoop.hive.metastore.warehouse.dir","s3a://bigdata-dev-cmfcknil/warehouse/")\
       .set("spark.hadoop.fs.s3a.connection.ssl.enabled","true")\
       .set("spark.hadoop.fs.s3a.path.style.access","true")\
       .set("spark.hadoop.fs.s3a.endpoint","https://kcs3.bd-1.wpc.waltonelectronics.com")\
       .set("spark.hadoop.fs.s3a.access.key","30J9IFFY75NVQZVHG4D4")\
       .set("spark.hadoop.fs.s3a.secret.key","WMGk6kqq5erjqZjnqo0QItSQ2zuoNWEpHIphsTgR")\
       .set("spark.hadoop.fs.s3a.attempts.maximum","1")\
       .set("spark.hadoop.fs.s3a.connection.establish.timeout","500")\
       .set("spark.hadoop.datanucleus.autoCreateSchema","true")\
       .set("spark.hadoop.datanucleus.fixedDatastore","false"))\
       .set("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2");
       #.set("spark.jars.packages","org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.1")

       


spark = SparkSession.builder \
            .master("k8s://https://kubernetes.default:443") \
            .appName("spark-streaming") \
            .config(conf=conf).enableHiveSupport().getOrCreate()




:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/klovercloud/.ivy2/cache
The jars for the packages stored in: /home/klovercloud/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-20602d26-d774-4361-9cdf-b44532da3a82;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 371ms :: artifacts dl 7ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.8-1 from central in [default]
	org.apache.commons#commons-pool2;2.6.2 fro

In [2]:

batch_df = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
    .option("subscribe", KAFKA_TOPIC_NAME_CONS) \
    .option("group.id", "Structured-Streaming-kpi")\
    .load()
batch_df=batch_df.withColumn("key",batch_df.key.cast(StringType()))\
                 .withColumn("value",batch_df.value.cast(StringType()))\
                 .filter(batch_df.value.contains('{'))

json_schema = spark.read.json(batch_df.rdd.map(lambda row: row.value),multiLine=True).schema


                                                                                

In [None]:
input_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
    .option("subscribe", KAFKA_TOPIC_NAME_CONS) \
    .option("group.id", "Structured-Streaming-kpi")\
    .load()




#json_schema = spark.read.json(batch_df.rdd.map(lambda row: row.value),multiLine=True).schema

parsed_df=input_df.withColumn("key",input_df.key.cast(StringType()))\
                 .withColumn("value",input_df.value.cast(StringType()))\
                 .filter(input_df.value.contains('{'))\
          .withColumn("value", from_json("value",json_schema))\
          .select(col('key'),col('value.*'),col('topic'),col('offset'),col('timestamp'))


query=parsed_df.writeStream\
     .format("iceberg")\
    .outputMode("append")\
    .trigger(processingTime="20 seconds")\
    .option("path", "iot.fridge__dev")\
    .option("checkpointLocation", "s3a://bigdata-dev-cmfcknil/raw/test/iotcpfridge")\
    .start()\
    .awaitTermination()

22/02/23 05:36:46 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
22/02/23 05:36:47 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
22/02/23 05:55:49 WARN WatchConnectionManager: Exec Failure                     
java.io.EOFException
	at okio.RealBufferedSource.require(RealBufferedSource.java:61)
	at okio.RealBufferedSource.readByte(RealBufferedSource.java:74)
	at okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:117)
	at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:101)
	at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
	at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
	at okhttp3.RealCall$AsyncCall.execute(RealCall.java:203)
	at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
	at java.base/java

In [None]:
spark.sql("DESCRIBE iot.fridge__dev").show(40)

In [None]:
query.show(3)