In [1]:
import numpy as np
from pyspark.sql import SparkSession

# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('kafka-streaming')
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5")
         .getOrCreate())
sc = spark.sparkContext

# Khởi tạo Kafka consumer
kafka_bootstrap_servers = 'kafka1:19091,kafka2:19092'  # Địa chỉ Kafka của các container Kafka
kafka_topic = 'thao_loz'  # Tên topic Kafka

# Subscribe to 1 topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
  .option("subscribe", kafka_topic) \
  .load()

In [7]:

from pyspark.sql.functions import from_json, udf
from pyspark.sql.types import StructType, StructField, FloatType, ArrayType, IntegerType
from pyspark.sql.functions import col, udf
import numpy as np

In [12]:
schema = ArrayType(
    ArrayType(
        StructType([
            StructField("col1", FloatType(), nullable=False),
            StructField("col2", FloatType(), nullable=False),
            StructField("col3", FloatType(), nullable=False),
            StructField("col4", FloatType(), nullable=False),
            StructField("col5", IntegerType(), nullable=False)
        ])
    )
)

# Chuyển đổi cột value thành decoded_value (UTF-8 decoding)
decoded_df = df.withColumn("decoded_value", df["value"].cast("string").alias("decoded_value"))
# Hàm chuyển đổi dữ liệu
def transform_data(rowData):
    value = np.array(eval(rowData))
    print(value)
    return value.tolist()

# Đăng ký UDF với Spark
transform_data_udf = udf(transform_data, ArrayType(FloatType()))

# Áp dụng UDF để chuyển đổi cột decoded_value thành giá trị
decoded_df = decoded_df.withColumn("value", transform_data_udf(decoded_df["decoded_value"]))

# Chọn cột value
decoded_df = decoded_df.select("value")

# In dữ liệu ra màn hình console
query = decoded_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

In [None]:
# Start the streaming query
query = df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()
query.awaitTermination()

In [None]:
# Parse dữ liệu JSON trong cột decoded_value
parsed_df = decoded_df.select(from_json(decoded_df.decoded_value, schema).alias("data"))

# Chuyển đổi dữ liệu thành mảng numpy và áp dụng reshape(-1)
@udf(returnType=ArrayType(FloatType()))
def to_numpy(data):
    return np.array(data).reshape(-1)

parsed_df = parsed_df.withColumn("numpy_data", to_numpy(parsed_df.data))
parsed_df = parsed_df.withColumn("numpy_data", parsed_df.numpy_data[0])

# In dữ liệu numpy ra màn hình console
query = parsed_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Chờ cho quá trình stream kết thúc
query.awaitTermination()