# Inference 3, Consumer 2 from HDFS, giving the  prediction

In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "./spark-3.5.3-bin-hadoop3"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StringType


In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, BooleanType, StringType
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegressionModel

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Read from HDFS with Schema") \
    .getOrCreate()

# Define the data mode
schema = StructType([
    StructField("audio_features", ArrayType(FloatType()), True),
    StructField("processed", BooleanType(), True)
])

hdfs_data_path = "hdfs://localhost:9000/user/hadoop/Music/Inference"
lr_model_path = "hdfs://localhost:9000/user/hadoop/Music/Models/LogisticRegression_2"

# Load the pre-trained model
loaded_lr_model = LogisticRegressionModel.load(lr_model_path)

# Define UDF to convert array to vector
array_to_vector_udf = udf(lambda arr: Vectors.dense(arr), VectorUDT())

# Mapping from index to label
index_to_label = {
    0: 'classical',
    1: 'country',
    2: 'disco',
    3: 'hiphop',
    4: 'jazz',
    5: 'metal',
    6: 'pop',
    7: 'reggae',
    8: 'rock'
}

# Define UDF to convert prediction index to label
def index_to_label_func(index):
    return index_to_label.get(int(index), 'Unknown')

index_to_label_udf = udf(index_to_label_func, StringType())

# Define streaming function to read from HDFS and make predictions
def predict_audio_features(df, epoch_id):
    # Filter records where processed=True
    df = df.filter("processed = True")
    
    # Load data and convert 'audio_features' column to Vector format
    df = df.withColumn("audio_features_vector", array_to_vector_udf("audio_features"))

    # Use VectorAssembler to create the 'features' column
    assembler = VectorAssembler(inputCols=["audio_features_vector"], outputCol="features")
    df = assembler.transform(df)

    # Apply the prediction model
    predictions = loaded_lr_model.transform(df)
    
    # Use UDF to convert prediction index to label
    predictions_with_labels = predictions.withColumn("predictedLabel", index_to_label_udf("prediction"))
    
    # Show the prediction results
    predictions_with_labels.select("predictedLabel").show(truncate=False)
    print(f"Prediction completed for batch {epoch_id}")

# Define the streaming task to read feature data from HDFS
prediction_query = spark.readStream \
    .schema(schema) \
    .format("parquet") \
    .option("path", hdfs_data_path) \
    .load() \
    .writeStream \
    .foreachBatch(predict_audio_features) \
    .start()

prediction_query.awaitTermination()
# Stop the Spark session after processing is done
spark.stop()

:: loading settings :: url = jar:file:/root/music/spark-3.5.3-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d4947728-bc3e-41ea-8883-ec60be06b21d;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.2.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.0 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 in 

+--------------+
|predictedLabel|
+--------------+
|jazz          |
|country       |
|hiphop        |
|hiphop        |
|rock          |
|classical     |
|classical     |
|rock          |
|metal         |
|jazz          |
|pop           |
|jazz          |
|disco         |
|classical     |
|hiphop        |
|jazz          |
|country       |
|rock          |
|disco         |
|rock          |
+--------------+
only showing top 20 rows

Prediction completed for batch 0
+--------------+
|predictedLabel|
+--------------+
|reggae        |
+--------------+

Prediction completed for batch 1


                                                                                

+--------------+
|predictedLabel|
+--------------+
|disco         |
+--------------+

Prediction completed for batch 2
+--------------+
|predictedLabel|
+--------------+
|disco         |
+--------------+

Prediction completed for batch 3


                                                                                

+--------------+
|predictedLabel|
+--------------+
|classical     |
+--------------+

Prediction completed for batch 4
+--------------+
|predictedLabel|
+--------------+
|reggae        |
+--------------+

Prediction completed for batch 5


In [3]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, ArrayType, FloatType, BooleanType, StringType
from pyspark.sql import SparkSession
from pyspark.ml.classification import OneVsRestModel, RandomForestClassificationModel

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Read from HDFS with Schema") \
    .getOrCreate()

# Define the data mode
schema = StructType([
    StructField("audio_features", ArrayType(FloatType()), True),
    StructField("processed", BooleanType(), True)
])

hdfs_data_path = "hdfs://localhost:9000/user/hadoop/Music/Inference"
lr_model_path = "hdfs://localhost:9000/user/hadoop/Music/Models/OneVsRest_SVC_0"

# Load the pre-trained model
loaded_lr_model = OneVsRestModel.load(lr_model_path)

# Define UDF to convert array to vector
array_to_vector_udf = udf(lambda arr: Vectors.dense(arr), VectorUDT())

# Mapping from index to label
index_to_label = {
    0: 'classical',
    1: 'country',
    2: 'disco',
    3: 'hiphop',
    4: 'jazz',
    5: 'metal',
    6: 'pop',
    7: 'reggae',
    8: 'rock'
}

# Define UDF to convert prediction index to label
def index_to_label_func(index):
    return index_to_label.get(int(index), 'Unknown')

index_to_label_udf = udf(index_to_label_func, StringType())

# Define streaming function to read from HDFS and make predictions
def predict_audio_features(df, epoch_id):
    # Filter records where processed=True
    df = df.filter("processed = True")
    
    # Load data and convert 'audio_features' column to Vector format
    df = df.withColumn("audio_features_vector", array_to_vector_udf("audio_features"))

    # Use VectorAssembler to create the 'features' column
    assembler = VectorAssembler(inputCols=["audio_features_vector"], outputCol="features")
    df = assembler.transform(df)

    # Apply the prediction model
    predictions = loaded_lr_model.transform(df)
    
    # Use UDF to convert prediction index to label
    predictions_with_labels = predictions.withColumn("predictedLabel", index_to_label_udf("prediction"))
    
    # Show the prediction results
    predictions_with_labels.select("predictedLabel").show(truncate=False)
    print(f"Prediction completed for batch {epoch_id}")

# Define the streaming task to read feature data from HDFS
prediction_query = spark.readStream \
    .schema(schema) \
    .format("parquet") \
    .option("path", hdfs_data_path) \
    .load() \
    .writeStream \
    .foreachBatch(predict_audio_features) \
    .start()

prediction_query.awaitTermination()
# Stop the Spark session after processing is done
spark.stop()


:: loading settings :: url = jar:file:/root/music/spark-3.5.3-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-477f6b8b-83a9-4eee-88c0-adc89df9f8f0;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.2.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.0 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 in 

+--------------+
|predictedLabel|
+--------------+
|jazz          |
|country       |
|metal         |
|hiphop        |
|rock          |
|country       |
|classical     |
|jazz          |
|metal         |
|jazz          |
|pop           |
|jazz          |
|disco         |
|classical     |
|hiphop        |
|jazz          |
|country       |
|rock          |
|disco         |
|pop           |
+--------------+
only showing top 20 rows

Prediction completed for batch 0


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/root/miniconda3/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: reentrant call inside <_io.BufferedReader name=59>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/root/miniconda3/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/miniconda3/lib/python3.12/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/root/miniconda3/lib/python3.12/site-packages/py4j/clientserv

Py4JError: An error occurred while calling o697.awaitTermination

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 36866)
Traceback (most recent call last):
  File "/root/miniconda3/lib/python3.12/socketserver.py", line 318, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/root/miniconda3/lib/python3.12/socketserver.py", line 349, in process_request
    self.finish_request(request, client_address)
  File "/root/miniconda3/lib/python3.12/socketserver.py", line 362, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/root/miniconda3/lib/python3.12/socketserver.py", line 761, in __init__
    self.handle()
  File "/root/miniconda3/lib/python3.12/site-packages/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/root/miniconda3/lib/python3.12/site-packages/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
                           ^^^^^^
  File "/root/miniconda3/lib/pytho