In [1]:
# Check python version, this should match with the current python version on Spark nodes. 
import sys
print(sys.version)


3.11.9 (main, Apr  6 2024, 17:59:24) [GCC 11.4.0]


In [2]:
# Check pyspark location
import pyspark
print(pyspark.__file__)


/usr/local/lib/python3.11/dist-packages/pyspark/__init__.py


In [3]:
# Define the packages to include

# For delta compatibility with spark, check https://docs.delta.io/latest/releases.html
packages = [
    "org.apache.hadoop:hadoop-aws:3.3.4",  # Hadoop-AWS package,
    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1",  # Kafka package
    "za.co.absa:abris_2.12:6.4.1", # ABRiS package, used for integration with Confluent Schema Registry,for more information please refer to https://github.com/AbsaOSS/ABRiS,
    "org.apache.spark:spark-avro_2.12:3.5.1",
    "io.delta:delta-spark_2.12:3.2.0" # Delta package
]

In [4]:
# MinIO endpoint and credentials
minio_endpoint = "http://minio:9000"
minio_access_key = "ouxhmt4Ez73Rt8HSNQFu"
minio_secret_key = "nkEwOH7TsENA7hRSkH0VSRMqdNq65ystE2BODdHW"

In [5]:
from pyspark import SparkConf
conf = (
            SparkConf()
            #Set name for the application
            .setAppName("test_spark_kafka_confluent_schema_avro_connection")
            # Set packages to be included in application. Note that packges dependencies will also be installed
            .set("spark.jars.packages", ",".join(packages))
            # For more information, please refer to https://github.com/AbsaOSS/ABRiS/issues/355
            #Include this dependencies over conflicted spark-avro_2.12 in za.co.absa:abris_2.12:6.4.1 package
            # Set additional repositories to search for packages and its transitional dependencies.
            # For more information, check https://spark.apache.org/docs/latest/configuration.html#runtime-environment
            # For search order of repos, check https://stackoverflow.com/a/51435038
            .set("spark.jars.repositories", 
                "https://packages.confluent.io/maven") 
            # Set sql extensions and catalog for Delta format.  For more information, please refer to https://spark.apache.org/docs/latest/configuration.html#static-sql-configuration
            .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
            .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
            ## For more information on spark integration with minio, please refer to https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html
            # Set configurations for authorization to minio.
            # For more information on minio access keys, please refer to https://min.io/docs/minio/linux/administration/identity-access-management/minio-user-management.html#id3            .set("spark.hadoop.fs.s3a.access.key", minio_access_key)
            .set("spark.hadoop.fs.s3a.access.key", minio_access_key)
            .set("spark.hadoop.fs.s3a.secret.key", minio_secret_key)
            .set("spark.hadoop.fs.s3a.endpoint", minio_endpoint)
            # Set configuration to enable SSL connection
            .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "true")
            # Set config for interacting with MinIO object storage
            .set("spark.hadoop.fs.s3a.path.style.access", "true")
            .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
            .set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
            # Set default connection configuration for connection to MinIO
            .set("spark.hadoop.fs.s3a.attempts.maximum", "1")
            .set("spark.hadoop.fs.s3a.connection.establish.timeout", "5000")
            .set("spark.hadoop.fs.s3a.connection.timeout", "10000")
        )

In [6]:
from pyspark.sql import SparkSession
# Initialize SparkSession

spark =SparkSession.builder.config(conf=conf).master("spark://spark-master:7077").getOrCreate()

https://packages.confluent.io/maven added as a remote repository with the name: repo-1
Ivy Default Cache set to: /config/.ivy2/cache
The jars for the packages stored in: /config/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
za.co.absa#abris_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1a9c8ebf-2bff-47a4-aa5d-36a5416222f5;1.0
	confs: [default]


:: loading settings :: url = jar:file:/usr/local/lib/python3.11/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
	found za.co.absa#abris_2.12;6.4.1 in central
	found io.confluent#kafka-avro-serializer;6.2.1 in repo-1
	found org.apache.avro#avro;1.10.2 in central
	found com.fasterxml.jackson.core#

In [7]:
from pyspark import SparkContext
from pyspark.sql.column import Column, _to_java_column

def from_avro(col, config):
    """
    avro deserialize

    :param col (PySpark column / str): column name "key" or "value"
    :param config (za.co.absa.abris.config.FromAvroConfig): abris config, generated from abris_config helper function
    :return: PySpark Column
    """
    jvm_gateway = SparkContext._active_spark_context._gateway.jvm
    abris_avro = jvm_gateway.za.co.absa.abris.avro

    return Column(abris_avro.functions.from_avro(_to_java_column(col), config))

def from_avro_abris_config(config_map, topic, is_key):
    """
    Create from avro abris config with a schema url

    :param config_map (dict[str, str]): configuration map to pass to deserializer, ex: {'schema.registry.url': 'http://localhost:8081'}
    :param topic (str): kafka topic
    :param is_key (bool): boolean
    :return: za.co.absa.abris.config.FromAvroConfig
    """
    jvm_gateway = SparkContext._active_spark_context._gateway.jvm
    scala_map = jvm_gateway.PythonUtils.toScalaMap(config_map)

    return jvm_gateway.za.co.absa.abris.config \
        .AbrisConfig \
        .fromConfluentAvro() \
        .downloadReaderSchemaByLatestVersion() \
        .andTopicNameStrategy(topic, is_key) \
        .usingSchemaRegistry(scala_map)

def to_avro(col, config):
    """
    avro serialize
    :param col (PySpark column / str): column name "key" or "value"
    :param config (za.co.absa.abris.config.ToAvroConfig): abris config, generated from abris_config helper function
    :return: PySpark Column
    """
    jvm_gateway = SparkContext._active_spark_context._gateway.jvm
    abris_avro = jvm_gateway.za.co.absa.abris.avro

    return Column(abris_avro.functions.to_avro(_to_java_column(col), config))

def to_avro_abris_config(config_map, topic, is_key):
    """
    Create to avro abris config with a schema url

    :param config_map (dict[str, str]): configuration map to pass to the serializer, ex: {'schema.registry.url': 'http://localhost:8081'}
    :param topic (str): kafka topic
    :param is_key (bool): boolean
    :return: za.co.absa.abris.config.ToAvroConfig
    """
    jvm_gateway = SparkContext._active_spark_context._gateway.jvm
    scala_map = jvm_gateway.PythonUtils.toScalaMap(config_map)

    return jvm_gateway.za.co.absa.abris.config \
        .AbrisConfig \
        .toConfluentAvro() \
        .downloadSchemaByLatestVersion() \
        .andTopicNameStrategy(topic, is_key) \
        .usingSchemaRegistry(scala_map)

In [8]:
# Subscribe to topic of a table with messages produced by Debezium.
kafka_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka-broker-0:9092,kafka-broker-1:9092") \
  .option("subscribe", "postgresql.devschema.person") \
  .option("startingOffsets", "earliest") \
  .load()


In [9]:

#Set from avro config setting with endpoint, topic, key value as value
from_avro_abris_settings_key_false = from_avro_abris_config({'schema.registry.url': 'http://kafka-schema-registry:8081'}, 'postgresql.devschema.person', False)

#Set from avro config setting with endpoint, topic, key value as key
from_avro_abris_settings_key_true = from_avro_abris_config({'schema.registry.url': 'http://kafka-schema-registry:8081'}, 'postgresql.devschema.person', True)

# Parse value with avro format
kafka_df_avro_key_value_converted = kafka_df.withColumn("value_parsed", from_avro("value", from_avro_abris_settings_key_false)).withColumn("key_parsed", from_avro("key", from_avro_abris_settings_key_true))

In [10]:
## Use this to test connection to memory sink
# Query to memory as name postgresql_devschema_persion
# For output sink memory, please refer to https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
query = kafka_df_avro_key_value_converted \
    .writeStream \
    .outputMode("update") \
    .format("memory") \
    .queryName("postgresql_devschema_person_test") \
    .start()

24/07/16 09:28:48 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-7c6bf588-987e-4939-9c98-5d1045b0aa4b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/07/16 09:28:48 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/07/16 09:28:49 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


[Stage 0:>                                                          (0 + 1) / 1]

In [11]:
# Show if memory view table is created by listing tables
spark.sql('show tables').show(truncate=False)

                                                                                

+---------+--------------------------------+-----------+
|namespace|tableName                       |isTemporary|
+---------+--------------------------------+-----------+
|         |postgresql_devschema_person_test|false      |
+---------+--------------------------------+-----------+



In [12]:
# Select from that created memory table
spark.sql('select key_parsed,value_parsed from postgresql_devschema_person_test').show(truncate=False)

+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|key_parsed|value_parsed                                                                                                                                                                                                                                                      |
+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{1}       |{NULL, {1, Alice, 30}, {2.7.0.Final, postgresql, postgresql, 1721036562977, first, devdatabase, [null,"31232792"], 1721036562977778, 1721036562977778000, devschema, person,

In [15]:
## Use this to test connection to Delta lake table format sink 
# Query to file in MinIO with path s3a://<bucket>/path/to/your/folder
# For output sink file, please refer to https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
query = (kafka_df_avro_key_value_converted 
    .writeStream 
    # Output format (can be "delta", "parquet", "json", "csv", etc.) 
    .format("delta") 
    # Output directory
    .option("path", "s3a://test2/data")  
    # Checkpoint location for recovery
    .option("checkpointLocation", "s3a://test2/checkpoint") 
    # Output mode: append, complete, update
    .outputMode("append") 
    .start())

24/07/16 09:31:03 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


24/07/16 09:31:04 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
[Stage 3:>                                                          (0 + 1) / 1]

In [18]:
spark.stop()