In [None]:
import os
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LinearSVC
import json
from pyspark.ml.classification import LinearSVCModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SQLContext
from json import loads, dumps
from kafka import KafkaProducer
import uuid
from pyspark.sql.types import IntegerType

# Set the classpath to include the JAR file
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/jovyan/spark-streaming-kafka-0-8-assembly_2.11-2.0.0-preview.jar pyspark-shell'

# Initialize a SparkContext
sc = SparkContext(appName="Kafka Message Processing")

# Initialize a StreamingContext with a batch interval of 15 seconds
ssc = StreamingContext(sc, 15)

# Initialize a SparkSession
spark = SparkSession.builder.appName("Kafka Message Processing").config("spark.driver.allowMultipleContexts", "true").config("hive.metastore.uris", "thrift://hive-metastore:9083").config("spark.sql.warehouse.dir", "hdfs://namenode:9000/hive").enableHiveSupport().getOrCreate()
sqlContext = SQLContext(spark)
# Tạo bảng Hive nếu chưa tồn tại
sqlContext.sql("""
    CREATE TABLE IF NOT EXISTS WeatherData (
        time int,
        day int,
        month int,
        temperature DOUBLE,
        feelslike DOUBLE,
        wind DOUBLE,
        cloud DOUBLE,
        rain DOUBLE,
        pressure DOUBLE,
        weather STRING
    )
    USING HIVE
""")
# Define your Kafka parameters
topics = ["WeatherData"]
kafka_params = {
    "bootstrap.servers": "kafka:9093"
}

# Create a direct stream from Kafka
direct_stream = KafkaUtils.createDirectStream(ssc, topics, kafka_params)

def publish_message(kafka_producer, topic_name, key, value):
    try:
        key_bytes = bytes(key, encoding='utf-8')
        value_bytes = bytes(value, encoding='utf-8')
        kafka_producer.send(topic_name, key=key_bytes, value=value_bytes)
        kafka_producer.flush()
        print('Message published successfully.')
    except Exception as ex:
        print(str(ex))

# Define your data processing logic
def process_data(rdd):
    kafka_producer = KafkaProducer(bootstrap_servers=['kafka:9093'], api_version=(0, 10))

    # Process each batch of RDDs
    for message in rdd.collect():
        # Process the Kafka message
        loaded_model = LinearSVCModel.load("/home/jovyan/model")
        data = json.loads(message[1])
        print(data)

        # Extract the relevant fields from the JSON message
        temperature = data["temperature"]
        feelslike = data["feelslike"]
        wind = data["wind"]
        cloud = data["cloud"]
        humidity = data["humidity"]
        time = data["time"]
        pressure = data["pressure"]
        month = data["month"]

        print(time)
        print(month)

        # Create a Spark DataFrame with matching column names
        new_data = spark.createDataFrame([(temperature, feelslike, wind, cloud,pressure,time,month)],
                                  ["temperature", "feelslike", "wind", "cloud", "pressure","time","month"])
        new_data.show()
        # Define feature columns
        feature_columns = ['temperature', 'feelslike', 'wind', 'cloud', "pressure","time","month"]

        # Combine features into a single feature vector
        assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
        new_data = assembler.transform(new_data)

        # Make predictions for the new data
        predictions_new_data = loaded_model.transform(new_data)

        # Show the prediction results for the new data
        predictions_new_data.show()   
        rs = predictions_new_data.withColumn("prediction", col("prediction").cast(IntegerType()))
        rs = predictions_new_data.select("prediction")

        # Convert DataFrame `rs` to JSON
        json_records = rs.toJSON().collect()
        for json_record in json_records:
            key = str(uuid.uuid4())
            prediction = rs.iloc[0]['prediction']

            data = {
            'prediction': prediction
            
             }
            value = json.dumps(data)
            key_bytes = bytes(key, encoding='utf-8')
            value_bytes = bytes(value, encoding='utf-8')
            kafka_producer.send('WeatherPredictData', key=key_bytes, value=value_bytes)
            kafka_producer.flush()
            print('Message published successfully.')

    if kafka_producer is not None:
        kafka_producer.close()
        

# Apply your processing logic to each batch of RDDs
direct_stream.foreachRDD(process_data)

# Start the Spark Streaming context
ssc.start()

# Wait for the context to terminate (Ctrl+C to stop)
ssc.awaitTermination()
sc.stop()


<pyspark.sql.context.SQLContext object at 0x7efd9bb916d8>


In [None]:
import os
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LinearSVCModel
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
from kafka import KafkaProducer
import json
import uuid
import os
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LinearSVC
import json
from pyspark.ml.classification import LinearSVCModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SQLContext
from json import loads, dumps
from kafka import KafkaProducer
import uuid
import datetime

from pyspark.sql.types import IntegerType
# Set the classpath to include the JAR file
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/jovyan/spark-streaming-kafka-0-8-assembly_2.11-2.0.0-preview.jar pyspark-shell'

# Initialize a SparkContext
sc = SparkContext(appName="Kafka Message Processing")

# Initialize a StreamingContext with a batch interval of 15 seconds
ssc = StreamingContext(sc, 15)

# Initialize a SparkSession
spark = SparkSession.builder.appName("Kafka Message Processing").config("spark.driver.allowMultipleContexts", "true").config("hive.metastore.uris", "thrift://hive-metastore:9083").config("spark.sql.warehouse.dir", "hdfs://namenode:9000/hive").enableHiveSupport().getOrCreate()
sqlContext = SQLContext(spark)
# Tạo bảng Hive nếu chưa tồn tại
sqlContext.sql("""
    CREATE TABLE IF NOT EXISTS WeatherStreamingData (
        time int,
        date STRING,
        month int,
        temperature DOUBLE,
        feelslike DOUBLE,
        wind DOUBLE,
        cloud DOUBLE,
        pressure DOUBLE,
        weather STRING
    )
    USING HIVE
""")
# Define your Kafka parameters
topics = ["WeatherData"]
kafka_params = {
    "bootstrap.servers": "kafka:9093"
}

# Create a direct stream from Kafka
direct_stream = KafkaUtils.createDirectStream(ssc, topics, kafka_params)

# Initialize Kafka producer outside the process_data function
kafka_producer = KafkaProducer(bootstrap_servers=['kafka:9093'], api_version=(0, 10))

def process_data(rdd):
    # Process each batch of RDDs
    for message in rdd.collect():
        # Process the Kafka message
        loaded_model = LinearSVCModel.load("/home/jovyan/model")
        data = json.loads(message[1])

        # Extract the relevant fields from the JSON message
        temperature = data["temperature"]
        feelslike = data["feelslike"]
        wind = data["wind"]
        cloud = data["cloud"]
        humidity = data["humidity"]
        time = data["time"]
        pressure = data["pressure"]
        month = data["month"]
        weather = data["weather"]

        date = datetime.datetime.now()

        # Create a Spark DataFrame with matching column names
        hive_data = spark.createDataFrame([( time,date ,month,temperature, feelslike, wind, cloud, pressure,weather)],
                                  ["time","date", "month","temperature", "feelslike", "wind", "cloud", "pressure","weather"])
        
        hive_data.write.mode("append").insertInto("WeatherStreamingData")

        
        # Create a Spark DataFrame with matching column names
        new_data = spark.createDataFrame([(temperature, feelslike, wind, cloud, pressure, time, month)],
                                  ["temperature", "feelslike", "wind", "cloud", "pressure", "time", "month"])

        # Define feature columns
        feature_columns = ['temperature', 'feelslike', 'wind', 'cloud', "pressure", "time", "month"]

        # Combine features into a single feature vector
        assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
        new_data = assembler.transform(new_data)

        # Make predictions for the new data
        predictions_new_data = loaded_model.transform(new_data)
        rs = predictions_new_data.select(col("prediction").cast(IntegerType()).alias("prediction"))
        rs.show()
        # Convert DataFrame `rs` to JSON
        json_records = rs.toJSON().collect()
        for json_record in json_records:
            key = str(uuid.uuid4())
            prediction = rs.collect()[0]["prediction"]

            data = {'prediction': prediction}

            value = json.dumps(data)
            key_bytes = bytes(key, encoding='utf-8')
            value_bytes = bytes(value, encoding='utf-8')
            kafka_producer.send('WeatherPredictData', key=key_bytes, value=value_bytes)
            kafka_producer.flush()
            print('Message published successfully')

# Apply your processing logic to each batch of RDDs using foreachRDD
direct_stream.foreachRDD(process_data)

# Start the Spark Streaming context
ssc.start()

# Wait for the context to terminate (Ctrl+C to stop)
ssc.awaitTermination()
sc.stop()
