In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

def main():
    # 创建sparksession
    spark = (
        SparkSession.builder
        .appName("IcelandWindStreamingSkeleton")
        .getOrCreate()
    )

    # TODO: update Kafka bootstrap servers and topic name
    # 从kafka读数据
    df = (
        spark.readStream
        .format("kafka") 
        .option("kafka.bootstrap.servers", "vm1:xxxx") # kafka集群地址
        .option("subscribe", "weather_iceland_raw") # 订阅kafka中的topic
        .option("startingOffsets", "latest") # 从最新的偏移量开始读
        .load()
    )

    # Just print raw JSON for now
    lines = df.selectExpr("CAST(value AS STRING) as json_str") 
    # df 里 value 列是二进制类型（binary），转成字符串让spark读
    # lines：只有一列的 DataFrame

    # 定义一个流式写入(sink): 把流写进控制台，即终端
    query = (
        lines.writeStream
        .outputMode("append")
        .format("console") # 输出到控制台（标准输出），这是 Spark 提供的一个简单调试用的 sink
        .option("truncate", False)
        .start()
    )

    query.awaitTermination() # 让主线程卡在这里，持续运行流处理

if __name__ == "__main__":
    main()
