In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

In [2]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName("test") \
        .getOrCreate()

In [6]:
streaming_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "10.0.0.10:9092") \
    .option("subscribe", "crypto_rates") \
    .option("startingOffsets", "earliest") \
    .load()

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StringType, DoubleType, LongType

# Define the schema for JSON messages
schema = StructType() \
    .add("id", StringType()) \
    .add("symbol", StringType()) \
    .add("currencySymbol", StringType()) \
    .add("type", StringType()) \
    .add("rateUsd", DoubleType()) \
    .add("timestamp", LongType())

# Parse JSON messages from Kafka topic
parsed_df = streaming_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", schema).alias("data")).select("data.*")



In [8]:
query = parsed_df  \
    .writeStream \
    .format("parquet") \
    .option("path", "output_parquet") \
    .option("checkpointLocation", "checkpoint_dir_output_parquet") 


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StringType, DoubleType, LongType

# Create a SparkSession
spark = SparkSession.builder \
    .appName("KafkaReader") \
    .getOrCreate()

# Define the schema to parse the JSON data from Kafka
schema = StructType() \
    .add("id", StringType()) \
    .add("symbol", StringType()) \
    .add("currencySymbol", StringType()) \
    .add("type", StringType()) \
    .add("rateUsd", DoubleType()) \
    .add("timestamp", LongType())

# Define the Kafka parameters
kafka_params = {
    "kafka.bootstrap.servers": "10.0.0.10:9092",
    "subscribe": "crypto_rates"
}
# 
# Read data from Kafka using the defined schema
kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "10.0.0.10:9092") \
    .option("subscribe", "crypto_rates") \
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .load() \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", schema).alias("data")) \
    .select("data.*")

query = kafka_df.writeStream \
    .format("json") \
    .outputMode("append") \
    .option("path", "output_json") \
    .option("checkpointLocation", "checkpoint_json") \
    .start()

query.awaitTermination()