In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import *

# Define the schema for the incoming data
schema = StructType([
    StructField("coin_marketcap", DecimalType(18, 12)),
    StructField("coin_name", StringType()),
    StructField("coin_price", DecimalType(18, 12)),
    StructField("coin_flag", StringType())
])

# Create a SparkSession
spark = SparkSession.builder \
    .appName("KafkaStreaming") \
    .getOrCreate()

# Define the Kafka broker and topic
bootstrap_servers = "aws ec2 public ip :9092"
kafka_topic = "your kafka topic name"

# Define the options for reading data from Kafka
kafka_options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "subscribe": kafka_topic,
    "startingOffsets": "earliest"
}

# Read the streaming data from Kafka as a DataFrame
df = spark.readStream \
    .format("kafka") \
    .options(**kafka_options) \
    .load()

# Convert the binary value to string
df = df.withColumn("value", df["value"].cast(StringType()))

# Parse the JSON data
df = df.withColumn("data", from_json("value", schema))

# Select the required columns
df = df.select("data.*")

# Start the streaming query
query = df.writeStream \
    .format("memory") \
    .queryName("coins_stream") \
    .start()

# Wait for 2 minutes
import time
time.sleep(120)

# Stop the streaming query
query.stop()

# Retrieve the stored data as a DataFrame
result_df = spark.sql("SELECT * FROM coins_stream")

# Display the result DataFrame
result_df.show()
result_df.createOrReplaceTempView('final_df')
result_df.write \
    .format("json") \
    .mode("overwrite") \
    .save("dbfs:/FileStore/tables/coins.json")
top_ten_coins = spark.sql('select coin_marketcap, coin_price, coin_name from final_df order by coin_price desc limit 10;')
top_ten_coins.show()


+-------------------+--------------------+----------------+---------+
|     coin_marketcap|           coin_name|      coin_price|coin_flag|
+-------------------+--------------------+----------------+---------+
|518734.000000000000|              dexIRA|  0.001288000000|  Million|
|     1.470000000000|           WebDollar|  0.000095780000|  Million|
|     4.270000000000|           PaintSwap|  0.014920000000|  Million|
|     4.260000000000|             Ridotto|  0.020890000000|  Million|
|     8.150000000000|Shib Original Vision|       7.6730E-8|  Million|
|     7.000000000000|            WigoSwap|  0.005088000000|  Million|
|     5.050000000000|           Yield Yak|504.760000000000|  Million|
|     5.540000000000|        Cake Monster|  0.000879200000|  Million|
|    73.470000000000|                FLEX|  0.744700000000|  Million|
|     3.040000000000|              Bumper|  0.052630000000|  Million|
|    28.200000000000|   Velodrome Finance|  0.119500000000|  Million|
|     3.340000000000

In [0]:
result_df.show()

+-------------------+--------------------+----------------+---------+
|     coin_marketcap|           coin_name|      coin_price|coin_flag|
+-------------------+--------------------+----------------+---------+
|518734.000000000000|              dexIRA|  0.001288000000|  Million|
|     1.470000000000|           WebDollar|  0.000095780000|  Million|
|     4.270000000000|           PaintSwap|  0.014920000000|  Million|
|     4.260000000000|             Ridotto|  0.020890000000|  Million|
|     8.150000000000|Shib Original Vision|       7.6730E-8|  Million|
|     7.000000000000|            WigoSwap|  0.005088000000|  Million|
|     5.050000000000|           Yield Yak|504.760000000000|  Million|
|     5.540000000000|        Cake Monster|  0.000879200000|  Million|
|    73.470000000000|                FLEX|  0.744700000000|  Million|
|     3.040000000000|              Bumper|  0.052630000000|  Million|
|    28.200000000000|   Velodrome Finance|  0.119500000000|  Million|
|     3.340000000000

In [0]:
import boto3
import os

# Configure AWS credentials
aws_access_key_id = "put your accesse key id"
aws_secret_access_key = "put your secret access key"
region_name = "put your bucket region"

# Create a Boto3 S3 client
s3_client = boto3.client("s3", aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name)

# Convert the result_df DataFrame to JSON string
result_json = result_df.toJSON().collect()

# Create a temporary file to save the JSON data
temp_file_path = "/tmp/result.json"
with open(temp_file_path, "w") as f:
    for row in result_json:
        f.write(row + "\n")

# Upload the file to S3 bucket
bucket_name = "kafka-spark-streaming-project"
object_key = "result.json"
s3_client.upload_file(temp_file_path, bucket_name, object_key)

# Delete the temporary file
os.remove(temp_file_path)
