In [None]:
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, MapType
from define_schema.tweets import TweetDocument
def create_spark_connection():
    try:
        spark_conn = SparkSession.builder \
            .appName('KafkaToHDFS') \
            .config('spark.hadoop.hadoop.security.authentication', 'simple') \
            .config("spark.hadoop.dfs.replication", "1") \
            .config('spark.jars.packages', "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3") \
            .getOrCreate()

        spark_conn.sparkContext.setLogLevel("ERROR")
        logging.info("Spark connection created successfully!")
        print("Connected to HDFS successfully")
        return spark_conn
    except Exception as e:
        logging.error(f"Couldn't create the spark session due to exception: {e}")
        return None
    
spark_conn = create_spark_connection()

In [None]:
df = spark.read.format("json").option("multiline", "true").load("hdfs://namenode:9000/user/hdfs/tweets/*.json")
df.printSchema()

In [None]:
def connect_to_kafka(spark_conn):
    try:
        kafka_df = spark_conn.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "broker:9092") \
            .option("subscribe", "tweets_topic") \
            .option("multiline", "true")\
            .load()
        logging.info("Kafka DataFrame created successfully")
        print("Kafka DataFrame created successfully")

        return kafka_df
    except Exception as e:
        logging.error(f"Kafka DataFrame could not be created: {e}", exc_info=True)
        return None
kafka_df = connect_to_kafka(spark_conn)

In [None]:
def write_to_hdfs(df):
    try:
        streaming_query = df.writeStream \
            .outputMode("append") \
            .format("parquet") \
            .option("path", "hdfs://localhost:9000/user/hdfs/tweets/") \
            .option("checkpointLocation", "hdfs://localhost:9000/user/hdfs/tweets_checkpoint/") \
            .start()

        logging.info("Writing data to HDFS")
        print("Writing data to HDFS")
        streaming_query.awaitTermination()
    except Exception as e:
        logging.error(f"Failed to write to HDFS: {e}", exc_info=True)
        print("Failed to write to HDFS")

tweet_schema = TweetDocument().get_schema()
kafka_df = kafka_df.select(col("value").cast("string").alias("json_data"))
parsed_df = kafka_df.select(from_json(col("json_data"), tweet_schema).alias("data")).select("data.*")

write_to_hdfs(parsed_df)

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName('KafkaToHDFS') \
    .config('spark.hadoop.hadoop.security.authentication', 'simple') \
    .config("spark.hadoop.dfs.replication", "1") \
    .config('spark.jars.packages', "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3") \
    .getOrCreate()

df = spark.read.format("json").load("hdfs://localhost:9000/user/hdfs/tweets/*.json")
df.write.format("json").save("/mnt/d/hust/bigdata/20241/test/files")


24/12/09 16:19:37 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/12/09 16:20:08 WARN BlockReaderFactory: I/O error constructing remote block reader.
org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/172.18.0.2:9866]
	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:589)
	at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3033)
	at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:829)
	at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:754)
	at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.build(BlockReaderFactory.java:381)
	at org.apache.hadoop.hdfs.DFSInputStream.getBlockReader(DFSInputStream.java:755)
	at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStrea

KeyboardInterrupt: 

24/12/09 16:23:23 WARN BlockReaderFactory: I/O error constructing remote block reader.
org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/172.18.0.2:9866]
	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:589)
	at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3033)
	at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:829)
	at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:754)
	at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.build(BlockReaderFactory.java:381)
	at org.apache.hadoop.hdfs.DFSInputStream.getBlockReader(DFSInputStream.java:755)
	at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:685)
	at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:884)
	at org.apache.hadoop.hd