In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.master("local[*]") \
    .appName("pyspark_test") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1") \
    .getOrCreate()

In [None]:
from pyspark.sql.functions import from_json, get_json_object, explode
from pyspark.sql.types import BooleanType, IntegerType, LongType, StringType, ArrayType, FloatType, StructType, StructField, BinaryType, TimestampType, DateType, ByteType

schema_init = StructType() \
    .add("type", StringType()) \
    .add("item", StringType())

schema_fin = StructType() \
         .add("postId", IntegerType()) \
         .add("wallId", IntegerType()) \
         .add("authorId", IntegerType()) \
         .add("date", DateType()) \
         .add("text", StringType()) \
         .add("likes", IntegerType()) \
         .add("reports", IntegerType()) \
         .add("images", ArrayType(StringType()))

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "t1") \
    .load() \
    .selectExpr("CAST(value AS STRING)") \
    .select(col("value").cast('string')) \
    .select(from_json('value', schema_init).alias('json')) \
    .select('json.item') \
    .select(from_json('item', schema_fin).alias('data'))

df_images = df \
    .select(col('data.images').alias('value')) \
    .writeStream \
    .format("kafka") \
    .outputMode("append")\
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "images") \
    .option("kafka.group.id", "test_group") \
    .option("checkpointLocation", "/home/jovyan/checkpoint_images") \
    .start()    

df_text = df \
    .select(col('data.text').alias('value')) \
    .writeStream \
    .format("kafka") \
    .outputMode("append")\
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "text") \
    .option("kafka.group.id", "test_group") \
    .option("checkpointLocation", "/home/jovyan/checkpoint_text") \
    .start() \
    .awaitTermination()