In [1]:
import pandas as pd
from pyspark.sql.functions import from_json, col, concat_ws, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType, TimestampType
from pyspark.sql import SparkSession

KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
KAFKA_TOPIC = "api1"

spark = SparkSession.builder.appName("read_test_straeam").getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 1)

# Reduce logging
spark.sparkContext.setLogLevel("WARN")


df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("startingOffsets", "earliest") \
    .load()



# Define nested structures first
coordinates_schema = StructType([
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True)
])

timezone_schema = StructType([
    StructField("offset", StringType(), True),
    StructField("description", StringType(), True)
])

street_schema = StructType([
    StructField("number", IntegerType(), True),
    StructField("name", StringType(), True)
])

location_schema = StructType([
    StructField("street", street_schema, True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("country", StringType(), True),
    StructField("postcode", IntegerType(), True),
    StructField("coordinates", coordinates_schema, True),
    StructField("timezone", timezone_schema, True)
])

name_schema = StructType([
    StructField("title", StringType(), True),
    StructField("first", StringType(), True),
    StructField("last", StringType(), True)
])

login_schema = StructType([
    StructField("uuid", StringType(), True),
    StructField("username", StringType(), True),
    StructField("password", StringType(), True),
    StructField("salt", StringType(), True),
    StructField("md5", StringType(), True),
    StructField("sha1", StringType(), True),
    StructField("sha256", StringType(), True)
])

picture_schema = StructType([
    StructField("large", StringType(), True),
    StructField("medium", StringType(), True),
    StructField("thumbnail", StringType(), True)
])


# Define the main schema based on the results array
results_schema = StructType([
    StructField("gender", StringType(), True),
    StructField("name", name_schema, True),
    StructField("location", location_schema, True),
    StructField("email", StringType(), True),
    StructField("login", login_schema, True),
    StructField("dob", StructType([StructField("date", TimestampType(), True), StructField("age", IntegerType(), True)]), True),
    StructField("registered", StructType([StructField("date", TimestampType(), True), StructField("age", IntegerType(), True)]), True),
    StructField("phone", StringType(), True),
    StructField("cell", StringType(), True),
    StructField("id", StructType([StructField("name", StringType(), True), StructField("value", StringType(), True)]), True),
    StructField("picture", picture_schema, True),
    StructField("nat", StringType(), True)
])

final_schema = StructType([
    StructField("results", ArrayType(results_schema), True),
    StructField("info", StructType([
        StructField("seed", StringType(), True),
        StructField("results", IntegerType(), True),
        StructField("page", IntegerType(), True),
        StructField("version", StringType(), True)
    ]), True)
])



parsed_df = df.select(
    col("key").cast("string"),
    from_json(col("value").cast("string"), final_schema).alias("parsed_value")
)


flattened_df = parsed_df.select(
    col("key").alias("Id"),
    col("parsed_value.results.gender").getItem(0).alias("gender"),
    col("parsed_value.results.name.first").getItem(0).alias("first_name"),
    col("parsed_value.results.name.last").getItem(0).alias("last_name"),
    col("parsed_value.results.location.city").getItem(0).alias("city"),
    col("parsed_value.results.email").getItem(0).alias("email"),
    col("parsed_value.results.registered.date").getItem(0).alias("registered_date"),
    col("parsed_value.results.phone").getItem(0).alias("phone"),
    col("parsed_value.results.dob.date").getItem(0).alias("birth_date"),
    col("parsed_value.results.login.uuid").getItem(0).alias("login_id"),
    col("parsed_value.results.login.username").getItem(0).alias("username"),
    col("parsed_value.results.login.password").getItem(0).alias("password"),
    col("parsed_value.results.cell").getItem(0).alias("cell_phone"),
    col("parsed_value.results.id").getItem(0).alias("pic_id"),
    col("parsed_value.results.picture").getItem(0).alias("picture"),
    col("parsed_value.results.nat").getItem(0).alias("nat"),
    col("parsed_value.results.dob.age").getItem(0).alias("age")
)



single_line_df = flattened_df.select(
    col("Id"),
    col("first_name"),
    col("last_name"),
    col("gender"),
    col("age"),
    #col("birth_date"),
    col("email"),
    col("cell_phone"),
    col("phone"),
    col("city"),
    col("username"),
    col("password")
)



query = single_line_df.writeStream \
    .format("memory") \
    .queryName("Table1") \
    .outputMode("append") \
    .option("truncate", False) \
    .start()



24/02/10 04:01:54 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/02/10 04:01:56 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-13b63dfa-ced3-4bce-ad11-fe5961a4c897. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/02/10 04:01:57 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/02/10 04:01:58 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
[Stage 0:>                                                          (0 + 1) / 1]

In [3]:
spark.sql("SELECT * FROM Table1").show()

+---+-------------+----------+------+---+--------------------+--------------+--------------+--------------------+-----------------+--------+
| Id|   first_name| last_name|gender|age|               email|    cell_phone|         phone|                city|         username|password|
+---+-------------+----------+------+---+--------------------+--------------+--------------+--------------------+-----------------+--------+
|  0|        Cathy|  Phillips|female| 54|cathy.phillips@ex...|  07509 183719|   01126 94446|              Bangor|   blueladybug208| optimus|
|  1|         Arlo|      Wang|  male| 28|arlo.wang@example...|(852)-974-3436|(230)-823-7407|            Hamilton|     bluegoose551|    jojo|
|  2|       Eileen|   Coleman|female| 59|eileen.coleman@ex...|  0400-784-048|  07-0329-3647|         Rockhampton|       lazycat382|   jazzy|
|  3|Jean-François|     Marie|  male| 77|jean-francois.mar...| 078 740 41 43| 079 603 49 13|         Saint-Brais|    crazykoala327|    test|
|  4|       M

                                                                                

In [5]:
spark.sql("SELECT * FROM my_streaming_table where age between 45 and 70").show(5)

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `my_streaming_table` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 1 pos 14;
'Project [*]
+- 'Filter (('age >= 45) AND ('age <= 70))
   +- 'UnresolvedRelation [my_streaming_table], [], false


                                                                                

In [6]:
spark.sql("SELECT * FROM my_streaming_table where age between 45 and 60").show(5, truncate=False)

+---+----------+-------------+------+---+-------------------------------+--------------+----------------+---------+-------------------+--------+
|Id |first_name|last_name    |gender|age|email                          |cell_phone    |phone           |city     |username           |password|
+---+----------+-------------+------+---+-------------------------------+--------------+----------------+---------+-------------------+--------+
|3  |Stanko    |Bekić        |male  |60 |stanko.bekic@example.com       |064-9686-814  |027-3802-277    |Žitorađa |ticklishelephant425|nikki1  |
|5  |Curtis    |Bell         |male  |46 |curtis.bell@example.com        |07806 959639  |016974 66989    |Lisburn  |happypeacock592    |a123456 |
|7  |Afşar     |Taşlı        |female|58 |afsar.tasli@example.com        |(287)-864-1501|(883)-205-4106  |Batman   |blackgoose782      |zebra   |
|13 |Eleanor   |Garcia       |female|52 |eleanor.garcia@example.com     |07032 223959  |0116216 462 1897|Bath     |heavymeercat386

                                                                                

In [7]:
spark.table("my_streaming_table").show(1000)

+---+--------------+------------------+------+---+--------------------+--------------+----------------+--------------------+--------------------+------------------+
| Id|    first_name|         last_name|gender|age|               email|    cell_phone|           phone|                city|            username|          password|
+---+--------------+------------------+------+---+--------------------+--------------+----------------+--------------------+--------------------+------------------+
|  0|         Helge|             Eckel|  male| 24|helge.eckel@examp...|  0175-4904064|    0412-3944932|           Freystadt|       crazykoala642|           derrick|
|  1|        Victor|           Lefevre|  male| 78|victor.lefevre@ex...|06-53-54-15-37|  01-75-62-12-93|           Montreuil|     orangerabbit792|              1812|
|  2|   Klaus Peter|              Daum|  male| 22|klauspeter.daum@e...|  0171-2489902|    0518-9877063|      Haltern am See|       purpleduck217|            clarke|
|  3|     

                                                                                

In [1]:
spark = SparkSession.builder.getOrCreate()
spark.stop()
