In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, IntegerType
from pyspark.sql.functions import from_json, col, lit, from_unixtime
from pyspark.sql.functions import year, month, dayofmonth, hour

In [None]:
spark = SparkSession.builder \
    .appName("stream_wiki_from_kafka_to_hdfs") \
    .master("yarn") \
    .config("spark.driver.memory", "1g") \
    .config("spark.driver.cores", "1") \
    .config("spark.executor.memory", "1g") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.instances", "1") \
    .config("spark.yarn.queue", "streaming") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5,org.apache.kafka:kafka-clients:3.9.1") \
    .getOrCreate()
spark.sparkContext.setLogLevel("WARN")

In [None]:
kafka_options = {
    "kafka.bootstrap.servers": "kafka:9092",
    "subscribe": "wiki",
    "startingOffsets": "latest",
    "kafka.group.id": "wiki_group"
}

In [None]:
df = spark.readStream \
    .format("kafka") \
    .options(**kafka_options) \
    .load()

In [None]:
schema = StructType([
    StructField("id", StringType()),
    StructField("wiki", StringType()),
    StructField("timestamp", IntegerType()),
    StructField("bot", BooleanType())
])
parsed_df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
    .select(from_json(col("value"), schema).alias("data"))

In [None]:
final_df = parsed_df.select(
        col("data.id").alias("id"),
        col("data.wiki").alias("wiki"),
        col("data.timestamp").alias("timestamp"),
        col("data.bot").alias("bot")) \
    .withColumn("timestamp", from_unixtime(col("timestamp"))) \
    .withColumn("year", year(col("timestamp"))) \
    .withColumn("month", month(col("timestamp"))) \
    .withColumn("day", dayofmonth(col("timestamp"))) \
    .withColumn("hour", hour(col("timestamp")))

In [None]:
query = final_df.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/datalake/data/wiki") \
    .option("checkpointLocation", "/datalake/checkpoint/wiki") \
    .partitionBy("year", "month", "day", "hour") \
    .trigger(processingTime="5 seconds") \
    .start()
query.awaitTermination()