# Spark analysis

In [1]:
# GENERAL requirements

import json

In [2]:
# environment

KAFKA_BOOTSTRAP_SERVER = 'localhost:9092'
INPUT_TOPIC  = 'cosmo-stream'
OUTPUT_TOPIC = 'cosmo-results'

SPARK_PATH = '/home/baronefr/storage/uni/courses/mapd_modB/repo/spark/spark-3.2.1-bin-hadoop3.2/'

In [3]:
# get Kafka ready
from kafka import KafkaProducer

cosmo_kafka = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVER)

In [4]:
## CAYDE LOCAL CLUSTER for streaming

import findspark
findspark.init('/home/baronefr/storage/uni/courses/mapd_modB/repo/spark/spark-3.2.1-bin-hadoop3.2/')

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, IntegerType, LongType
from pyspark.sql.functions import split

spark = SparkSession.builder \
        .master("spark://localhost:7077")\
        .appName("test realtime")\
        .config("spark.executor.memory", "512m")\
        .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
        .config("spark.sql.execution.arrow.pyspark.fallback.enabled", "false")\
        .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")\
        .getOrCreate()

KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'

sc = spark.sparkContext
sc

22/06/16 21:16:55 WARN Utils: Your hostname, cayde resolves to a loopback address: 127.0.1.1; using 192.168.0.117 instead (on interface wlp5s0)
22/06/16 21:16:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/baronefr/.ivy2/cache
The jars for the packages stored in: /home/baronefr/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-646d8b4d-27da-4987-ac90-86a4d1db5bf1;1.0
	confs: [default]


:: loading settings :: url = jar:file:/mnt/DATA/baronefr/storage/uni/courses/mapd_modB/repo/spark/spark-3.2.1-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.1 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 206ms :: artifacts dl 5ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.8-1 from central in [default]
	org.apache.commons#commons-pool2;2.6.2 from central in [default]
	org.apache.kafka#kafka-clients;2.6.0 from central in [default]
	org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.1 from central in [default]
	org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.1 from central in [default]
	org.lz4#lz4-java;1.7.1 from central in [default]
	org.slf4j#s

### parse input dataframe (streamed)

In [5]:
inputDF = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)\
    .option("kafkaConsumer.pollTimeoutMs", 4000)\
    .option('subscribe', INPUT_TOPIC)\
    .option("startingOffsets", "latest") \
    .load()
#inputDF.isStreaming  # to check if True
inputDF.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
valuesDF = inputDF.select( split( col("value").cast("String"), ",").alias("sv") )
valuesDF.printSchema()

root
 |-- sv: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [7]:
# cast to HEAD,FPGA,TDC_CHANNEL,ORBIT_CNT,BX_COUNTER,TDC_MEAS
cosmoDF = valuesDF.withColumn('HEAD', col('sv').getItem(0).cast("Integer"))\
                .withColumn('FPGA', col('sv').getItem(1).cast("Integer"))\
                .withColumn('TDC_CHANNEL', col('sv').getItem(2).cast("Integer"))\
                .withColumn('ORBIT_CNT', col('sv').getItem(3).cast("Long"))\
                .withColumn('BX_COUNTER', col('sv').getItem(4).cast("Integer"))\
                .withColumn('TDC_MEAS', col('sv').getItem(5).cast("Integer"))\
                .drop('sv')
cosmoDF.printSchema()

root
 |-- HEAD: integer (nullable = true)
 |-- FPGA: integer (nullable = true)
 |-- TDC_CHANNEL: integer (nullable = true)
 |-- ORBIT_CNT: long (nullable = true)
 |-- BX_COUNTER: integer (nullable = true)
 |-- TDC_MEAS: integer (nullable = true)



In [8]:
def just_show(df, batch_id):
    df.show()
#cosmoDF.writeStream.foreachBatch(just_show).start().awaitTermination()

### elaborator

In [9]:
def counter(df, batch_id):
    n = df.count()
    
    # buffer out!
    batch_statistics = { 'n' : n }
    cosmo_kafka.send(OUTPUT_TOPIC, json.dumps(batch_statistics).encode('utf-8'))
    return n

In [None]:
cosmoDF.writeStream.foreachBatch(counter).start().awaitTermination()

End of file