In [11]:
import json
import pandas as pd
from kafka import KafkaConsumer
from time import sleep
from json import dumps
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col, from_json, explode

In [2]:
consumer = KafkaConsumer(
    'test_kafka_aws',  # Kafka topic
    bootstrap_servers=['IP:9092'],  # List of brokers passed as a list
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),  # Deserializer for JSON values
)

In [None]:
# Example loop to consume messages
for message in consumer:
    print(message)

# Properly close the consumer when done

In [3]:
spark = SparkSession.builder \
    .appName("Kafka2Parquet") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()

24/09/01 00:28:43 WARN Utils: Your hostname, Harshs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.183.81.8 instead (on interface en0)
24/09/01 00:28:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/harshwadhawe/.ivy2/cache
The jars for the packages stored in: /Users/harshwadhawe/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ce8a51e4-6f9f-45ee-ad6f-fea2eedb1469;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central


:: loading settings :: url = jar:file:/opt/homebrew/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 190ms :: artifacts dl 6ms
	:: modules in use:
	com.google.code.findbugs#jsr305;3.0.0 from central in [default]
	commons-logging#commons-logging;1.1.3 from central in [default]
	org.apache.commons#commons-pool2;2.11.1 from central in [default]
	org.apache.hadoop#hadoop-client-api;3.3.4 from central in [default]
	org.apache.hadoop#hadoop-client-runtime;3.3.4 from central in [default]
	org.apache.ka

In [12]:
# Define the schema corresponding to the JSON format
schema = StructType([
    StructField("symbol", StringType(), True),
    StructField("name", StringType(), True),
    StructField("change", DoubleType(), True),
    StructField("price", DoubleType(), True),
    StructField("changesPercentage", DoubleType(), True)
])

In [13]:
# Read data from Kafka
df_kafka = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "IP:9092") \
    .option("subscribe", "test_kafka_aws") \
    .load()

In [14]:
# Assuming the JSON data is in the "value" field of the Kafka message
df_parsed = df_kafka.selectExpr("CAST(value AS STRING) as json_string")

In [15]:
# Apply the schema to the JSON data, considering it as an array
df_json = df_parsed.select(explode(from_json("json_string", "array<struct<symbol:string,name:string,change:double,price:double,changesPercentage:double>>")).alias("data"))

# Flatten the structure to make it easier to work with
df_flat = df_json.select(
    "data.symbol",
    "data.name",
    "data.change",
    "data.price",
    "data.changesPercentage"
)

In [16]:
# Write the output to a console for debugging
query = df_flat \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

24/09/01 00:29:45 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/kf/8rzd89fx61l2r6vlmkhxqpw80000gn/T/temporary-9700d16a-822d-4984-8597-fe9232846128. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/09/01 00:29:45 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/09/01 00:29:45 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


-------------------------------------------
Batch: 0
-------------------------------------------
+------+----+------+-----+-----------------+
|symbol|name|change|price|changesPercentage|
+------+----+------+-----+-----------------+
+------+----+------+-----+-----------------+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+------+--------------------+------+------+-----------------+
|symbol|                name|change| price|changesPercentage|
+------+--------------------+------+------+-----------------+
|   WTO|       UTime Limited|-0.015|0.0756|         -16.5563|
|  NVDA|  NVIDIA Corporation|  1.78|119.37|           1.5137|
|  FCUV|Focus Universal Inc.|0.0787|0.2689|          41.3775|
|  BNRG|Brenmiller Energy...| 0.877|  1.55|          130.312|
|  INTC|   Intel Corporation|  1.91| 22.04|           9.4883|
|  SQQQ|ProShares UltraPr...| -0.27|   8.2|          -3.1877|
|    NU|    Nu Holdings Ltd.|  0.74| 14.97|           5.2003|
|  MAXN|Maxeon Solar Tech...|0.0038|0.1081|           3.6433|
|  SOXL|Direxion Daily Se...|   2.6| 38.79|           7.1843|
|  ATPC|Agape ATP Corpora...| 0.958|  2.55|          60.1759|
|   NIO|            NIO Inc.|   0.1|  4.04|           2.5381|
|  TSLA|         Tesla, Inc.|  7.83

                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+------+--------------------+------+------+-----------------+
|symbol|                name|change| price|changesPercentage|
+------+--------------------+------+------+-----------------+
|   WTO|       UTime Limited|-0.015|0.0756|         -16.5563|
|  NVDA|  NVIDIA Corporation|  1.78|119.37|           1.5137|
|  FCUV|Focus Universal Inc.|0.0787|0.2689|          41.3775|
|  BNRG|Brenmiller Energy...| 0.877|  1.55|          130.312|
|  INTC|   Intel Corporation|  1.91| 22.04|           9.4883|
|  SQQQ|ProShares UltraPr...| -0.27|   8.2|          -3.1877|
|    NU|    Nu Holdings Ltd.|  0.74| 14.97|           5.2003|
|  MAXN|Maxeon Solar Tech...|0.0038|0.1081|           3.6433|
|  SOXL|Direxion Daily Se...|   2.6| 38.79|           7.1843|
|  ATPC|Agape ATP Corpora...| 0.958|  2.55|          60.1759|
|   NIO|            NIO Inc.|   0.1|  4.04|           2.5381|
|  TSLA|         Tesla, Inc.|  7.83

                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+------+--------------------+------+------+-----------------+
|symbol|                name|change| price|changesPercentage|
+------+--------------------+------+------+-----------------+
|   WTO|       UTime Limited|-0.015|0.0756|         -16.5563|
|  NVDA|  NVIDIA Corporation|  1.78|119.37|           1.5137|
|  FCUV|Focus Universal Inc.|0.0787|0.2689|          41.3775|
|  BNRG|Brenmiller Energy...| 0.877|  1.55|          130.312|
|  INTC|   Intel Corporation|  1.91| 22.04|           9.4883|
|  SQQQ|ProShares UltraPr...| -0.27|   8.2|          -3.1877|
|    NU|    Nu Holdings Ltd.|  0.74| 14.97|           5.2003|
|  MAXN|Maxeon Solar Tech...|0.0038|0.1081|           3.6433|
|  SOXL|Direxion Daily Se...|   2.6| 38.79|           7.1843|
|  ATPC|Agape ATP Corpora...| 0.958|  2.55|          60.1759|
|   NIO|            NIO Inc.|   0.1|  4.04|           2.5381|
|  TSLA|         Tesla, Inc.|  7.83

                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+------+--------------------+------+------+-----------------+
|symbol|                name|change| price|changesPercentage|
+------+--------------------+------+------+-----------------+
|   WTO|       UTime Limited|-0.015|0.0756|         -16.5563|
|  NVDA|  NVIDIA Corporation|  1.78|119.37|           1.5137|
|  FCUV|Focus Universal Inc.|0.0787|0.2689|          41.3775|
|  BNRG|Brenmiller Energy...| 0.877|  1.55|          130.312|
|  INTC|   Intel Corporation|  1.91| 22.04|           9.4883|
|  SQQQ|ProShares UltraPr...| -0.27|   8.2|          -3.1877|
|    NU|    Nu Holdings Ltd.|  0.74| 14.97|           5.2003|
|  MAXN|Maxeon Solar Tech...|0.0038|0.1081|           3.6433|
|  SOXL|Direxion Daily Se...|   2.6| 38.79|           7.1843|
|  ATPC|Agape ATP Corpora...| 0.958|  2.55|          60.1759|
|   NIO|            NIO Inc.|   0.1|  4.04|           2.5381|
|  TSLA|         Tesla, Inc.|  7.83

                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+------+--------------------+------+------+-----------------+
|symbol|                name|change| price|changesPercentage|
+------+--------------------+------+------+-----------------+
|   WTO|       UTime Limited|-0.015|0.0756|         -16.5563|
|  NVDA|  NVIDIA Corporation|  1.78|119.37|           1.5137|
|  FCUV|Focus Universal Inc.|0.0787|0.2689|          41.3775|
|  BNRG|Brenmiller Energy...| 0.877|  1.55|          130.312|
|  INTC|   Intel Corporation|  1.91| 22.04|           9.4883|
|  SQQQ|ProShares UltraPr...| -0.27|   8.2|          -3.1877|
|    NU|    Nu Holdings Ltd.|  0.74| 14.97|           5.2003|
|  MAXN|Maxeon Solar Tech...|0.0038|0.1081|           3.6433|
|  SOXL|Direxion Daily Se...|   2.6| 38.79|           7.1843|
|  ATPC|Agape ATP Corpora...| 0.958|  2.55|          60.1759|
|   NIO|            NIO Inc.|   0.1|  4.04|           2.5381|
|  TSLA|         Tesla, Inc.|  7.83

                                                                                

-------------------------------------------
Batch: 6
-------------------------------------------
+------+--------------------+------+------+-----------------+
|symbol|                name|change| price|changesPercentage|
+------+--------------------+------+------+-----------------+
|   WTO|       UTime Limited|-0.015|0.0756|         -16.5563|
|  NVDA|  NVIDIA Corporation|  1.78|119.37|           1.5137|
|  FCUV|Focus Universal Inc.|0.0787|0.2689|          41.3775|
|  BNRG|Brenmiller Energy...| 0.877|  1.55|          130.312|
|  INTC|   Intel Corporation|  1.91| 22.04|           9.4883|
|  SQQQ|ProShares UltraPr...| -0.27|   8.2|          -3.1877|
|    NU|    Nu Holdings Ltd.|  0.74| 14.97|           5.2003|
|  MAXN|Maxeon Solar Tech...|0.0038|0.1081|           3.6433|
|  SOXL|Direxion Daily Se...|   2.6| 38.79|           7.1843|
|  ATPC|Agape ATP Corpora...| 0.958|  2.55|          60.1759|
|   NIO|            NIO Inc.|   0.1|  4.04|           2.5381|
|  TSLA|         Tesla, Inc.|  7.83

                                                                                

-------------------------------------------
Batch: 9
-------------------------------------------
+------+--------------------+------+------+-----------------+
|symbol|                name|change| price|changesPercentage|
+------+--------------------+------+------+-----------------+
|   WTO|       UTime Limited|-0.015|0.0756|         -16.5563|
|  NVDA|  NVIDIA Corporation|  1.78|119.37|           1.5137|
|  FCUV|Focus Universal Inc.|0.0787|0.2689|          41.3775|
|  BNRG|Brenmiller Energy...| 0.877|  1.55|          130.312|
|  INTC|   Intel Corporation|  1.91| 22.04|           9.4883|
|  SQQQ|ProShares UltraPr...| -0.27|   8.2|          -3.1877|
|    NU|    Nu Holdings Ltd.|  0.74| 14.97|           5.2003|
|  MAXN|Maxeon Solar Tech...|0.0038|0.1081|           3.6433|
|  SOXL|Direxion Daily Se...|   2.6| 38.79|           7.1843|
|  ATPC|Agape ATP Corpora...| 0.958|  2.55|          60.1759|
|   NIO|            NIO Inc.|   0.1|  4.04|           2.5381|
|  TSLA|         Tesla, Inc.|  7.83

                                                                                

-------------------------------------------
Batch: 10
-------------------------------------------
+------+--------------------+------+------+-----------------+
|symbol|                name|change| price|changesPercentage|
+------+--------------------+------+------+-----------------+
|   WTO|       UTime Limited|-0.015|0.0756|         -16.5563|
|  NVDA|  NVIDIA Corporation|  1.78|119.37|           1.5137|
|  FCUV|Focus Universal Inc.|0.0787|0.2689|          41.3775|
|  BNRG|Brenmiller Energy...| 0.877|  1.55|          130.312|
|  INTC|   Intel Corporation|  1.91| 22.04|           9.4883|
|  SQQQ|ProShares UltraPr...| -0.27|   8.2|          -3.1877|
|    NU|    Nu Holdings Ltd.|  0.74| 14.97|           5.2003|
|  MAXN|Maxeon Solar Tech...|0.0038|0.1081|           3.6433|
|  SOXL|Direxion Daily Se...|   2.6| 38.79|           7.1843|
|  ATPC|Agape ATP Corpora...| 0.958|  2.55|          60.1759|
|   NIO|            NIO Inc.|   0.1|  4.04|           2.5381|
|  TSLA|         Tesla, Inc.|  7.8

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/homebrew/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.9_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

                                                                                

-------------------------------------------
Batch: 11
-------------------------------------------
+------+--------------------+------+------+-----------------+
|symbol|                name|change| price|changesPercentage|
+------+--------------------+------+------+-----------------+
|   WTO|       UTime Limited|-0.015|0.0756|         -16.5563|
|  NVDA|  NVIDIA Corporation|  1.78|119.37|           1.5137|
|  FCUV|Focus Universal Inc.|0.0787|0.2689|          41.3775|
|  BNRG|Brenmiller Energy...| 0.877|  1.55|          130.312|
|  INTC|   Intel Corporation|  1.91| 22.04|           9.4883|
|  SQQQ|ProShares UltraPr...| -0.27|   8.2|          -3.1877|
|    NU|    Nu Holdings Ltd.|  0.74| 14.97|           5.2003|
|  MAXN|Maxeon Solar Tech...|0.0038|0.1081|           3.6433|
|  SOXL|Direxion Daily Se...|   2.6| 38.79|           7.1843|
|  ATPC|Agape ATP Corpora...| 0.958|  2.55|          60.1759|
|   NIO|            NIO Inc.|   0.1|  4.04|           2.5381|
|  TSLA|         Tesla, Inc.|  7.8

In [None]:
consumer.close()