# Spark configuration

In [1]:
from constants import *

import json
import numpy as np
import struct as sstruct

from pyspark.sql.functions import udf, col, explode, mean, stddev, count, to_json, struct
from pyspark.sql.types import ArrayType, FloatType, IntegerType, StructType, StructField, StringType
from pyspark.sql import SparkSession

from kafka import KafkaProducer,KafkaConsumer

## Spark Session

In [2]:
spark = SparkSession.builder \
    .master("spark://10.67.22.8:7077")\
    .appName("Spark structured streaming application")\
    .config("spark.executor.memory", "1000m")\
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
    .config("spark.sql.adaptive.enabled", "false")\
    .config("spark.sql.execution.arrow.pyspark.fallback.enabled", "false")\
    .config("spark.sql.shuffle.partitions", 12)\
    .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2")\
    .getOrCreate()

sc = spark.sparkContext
sc

:: loading settings :: url = jar:file:/usr/local/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/lupi/.ivy2/cache
The jars for the packages stored in: /home/lupi/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-24f981f7-ebd8-4bc3-8ae4-488fc6b12e7a;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.2 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resoluti

23/09/18 20:51:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Functions

In [3]:
# Function to convert a byte array into a list of float values
def bytes_to_float32_list(bytes_value):
    float_list = []
    for i in range(0, len(bytes_value), 4): 
        float_value = sstruct.unpack('f', bytes_value[i:i+4])[0]
        float_list.append(float_value)
    
    return float_list

# Function to combine two arrays of length 3072 into complex numbers
def to_complex(x):
    l = 3072
    r = x[:l]
    i = x[l:]
    c = r + 1j*i
    return c

# Function to compute the Fourier transform of a given array
def Fourie(x):
    x = np.array(x)
    z = to_complex(x)
    
    power = np.abs(np.fft.fft(z))**2         # Compute the squared magnitude of the FFT
    FS = fft_bandwidth
    norm = n_bins * FS * np.sqrt(2)          # Normalization factor
    normalized_power = power / norm          # Normalize the power spectrum
    power_shifted = np.fft.fftshift(normalized_power) # Shift the power spectrum
    
    power_shifted = power_shifted.tolist()
    
    return(power_shifted)

# Function to index elements in a list with file numbers
def indexing(x,file_num):
    k = []
    for i in range(len(x)):
        add = (f'{file_num}_{i}',x[i])
        k.append(add)
    return k

# Function to extract the file number from a byte array (big-endian short)
def extract_file_num(key_bytes):
    return sstruct.unpack('>H', key_bytes[:2])[0]  # Unpack from big-endian short

# Function to count elements in each batch and print batch size
def batches_count(batch_df,batch_id):
    batch_count = batch_df.count()
    print(f"Batch {batch_id}: Size = {batch_count}")

# Function to send data to Kafka as JSON messages
def send_to_kafka(batch_df, batch_id):
    batch_json = batch_df.toJSON().collect()
    all_data_json = json.dumps([json.loads(row) for row in batch_json])
    
    producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)
    producer.send("results", value=all_data_json.encode("utf-8"))
    producer.close() 

## Query 

In [4]:
# Read streaming data from Kafka into a DataFrame
inputDF = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)\
    .option("kafkaConsumer.pollTimeoutMs", 30_000)\
    .option("startingOffsets", "latest") \
    .option("maxOffsetsPerTrigger", 2000)\
    .option("subscribe", "chunk_data")\
    .load()


# Define user-defined functions (UDFs) and schema
bytes_to_float32_udf = udf(bytes_to_float32_list, ArrayType(FloatType()))
fft_udf = udf(Fourie, ArrayType(FloatType()))

schema = StructType(
        [
                StructField("index", StringType()),
                StructField("x", FloatType())
        ]
)

# Define UDF to extract file number from 'key' column
indexing_udf = udf(indexing, ArrayType(schema))
extract_file_num_udf = udf(extract_file_num, IntegerType())


# Apply UDFs to transform 'value' column in a list of Fourier transformed value
streaming_df = inputDF.select('key', 'value')
streaming_df = streaming_df.withColumn('float', bytes_to_float32_udf(streaming_df['value']))
streaming_df = streaming_df.withColumn('fft', fft_udf(streaming_df['float']))

# Extract file numbers from 'key' column
streaming_df = streaming_df.withColumn('file_num', extract_file_num_udf(col('key')))

# Apply UDF to index 'fft' column by 'file_num'
streaming_df = streaming_df.withColumn('indexed_fft', indexing_udf(streaming_df['fft'],streaming_df['file_num']) )

# Explode the 'indexed_fft' array to separate rows
exploded_df = streaming_df.select('key', explode('indexed_fft').alias('indexed_fft'))

# Group by 'indexed_fft.index' and calculate statistics
result_df = exploded_df.groupBy("indexed_fft.index").agg(
    mean("indexed_fft.x").alias("mean_x"),
    stddev("indexed_fft.x").alias("stddev_x"),
    count("indexed_fft.x").alias("count_x")
)


# Code for debugging purpose    
# result_json_df.writeStream \
#    .outputMode("update") \
#    .format('console')\
#    .foreachBatch(batches_count)\
#    .start()\
#    .awaitTermination()

# Select and structure the data for output as a single JSON message 
#when the mean is calculated from a full couple of file
result_json_df = result_df.where(col('count_x')==2731)\
    .select(struct("index", "mean_x", "stddev_x","count_x").alias("data"))




# Write the JSON data to Kafka as a single message, triggered every 12 seconds
query = result_json_df.writeStream \
    .trigger(processingTime="12 seconds")\
    .outputMode("update") \
    .foreachBatch(send_to_kafka) \
    .start()

query.awaitTermination()

23/09/18 20:51:11 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-9405aa74-26d3-4811-aca9-cb71da056187. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/09/18 20:51:12 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-69a1a8ee-7420-4746-aaa3-ee5a8182ae64-300708440-driver-0-1, groupId=spark-kafka-source-69a1a8ee-7420-4746-aaa3-ee5a8182ae64-300708440-driver-0] Error while fetching metadata with correlation id 2 : {chunk_data=LEADER_NOT_AVAILABLE}
23/09/18 20:51:12 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-69a1a8ee-7420-4746-aaa3-ee5a8182ae64-300708440-driver-0-1, groupId=spark-kafka-source-69a1a8ee-7420-4746-aaa3-ee5a8182ae64-300708440-driver-0] Error while fetching metadata with correlation id 4 : {chunk_data=LEADE

                                                                                

23/09/18 20:51:23 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 12000 milliseconds, but spent 12471 milliseconds


ERROR:root:KeyboardInterrupt while sending command.                             
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [5]:
spark.stop()
sc.stop()