In [None]:
KafkaServerIP = dbutils.widgets.get("ip")
KafkaServerPort = dbutils.widgets.get("port")
srcTopicName = dbutils.widgets.get("topicname")


In [None]:
from PIL import Image
import pyspark.sql.functions as f
from pyspark.sql.types import *
import struct
import numpy as np
import io

In [0]:
dfraw = spark \
            .readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", f'{KafkaServerIP}:{KafkaServerPort}') \
            .option("subscribe", f'{srcTopicName}') \
            .load()

df = dfraw.selectExpr("CAST(value AS STRING)")


kafka_df = df.selectExpr("CAST(value AS STRING)") \

output_query = kafka_df.writeStream \
                      .queryName("storeKafka") \
                      .format("memory") \
                      .start()
output_query.awaitTermination(10)

sample_df = spark.sql("select * from storeKafka")

In [0]:
import pyspark.sql.functions as f
from pyspark.sql.types import *
import struct
import numpy as np

def b64_to_arr(value):
    image = Image.open(io.BytesIO(base64.b64decode(value)))
    image_np = np.array(image)
    return (image_np.tolist(), )


m_schema =  StructType([
  StructField('matrix', ArrayType(ArrayType(IntegerType()))), 
  ])

b64_to_arr_udf = f.udf(b64_to_arr, m_schema)

df = spark \
            .readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", f'{KafkaServerIP}:{KafkaServerPort}') \
            .option("subscribe", f'{srcTopicName}') \
            .load() \
            .selectExpr("CAST(value AS STRING)") \
            .withColumn('decoded', b64_to_arr_udf(f.col("value"))) \
            .select("decoded")

display(df)

In [0]:

def poolingOverlap(value, f=2, stride=None, method='max', pad=False, return_max_pos=False):
    image = Image.open(io.BytesIO(base64.b64decode(value)))
    mat = np.array(image)
    m, n = mat.shape[:2]
    if stride is None:
        stride = f
    _ceil = lambda x, y: x//y + 1
    if pad:
        ny = _ceil(m, stride)
        nx = _ceil(n, stride)
        size = ((ny-1)*stride+f, (nx-1)*stride+f) + mat.shape[2:]
        mat_pad = np.full(size, 0)
        mat_pad[:m, :n, ...] = mat
    else:
        mat_pad = mat[:(m-f)//stride*stride+f, :(n-f)//stride*stride+f, ...]

    s0, s1 = mat_pad.strides[:2]
    m1, n1 = mat_pad.shape[:2]
    m2, n2 = (f, f)[:2]
    view_shape = (1+(m1-m2)//stride, 1+(n1-n2)//stride, m2, n2)+mat_pad.shape[2:]
    strides = (stride*s0, stride*s1, s0, s1)+mat_pad.strides[2:]
    view = np.lib.stride_tricks.as_strided(mat_pad, view_shape, strides=strides, writeable=False)

    if method == 'max':
        result = np.nanmax(view, axis=(2, 3), keepdims=return_max_pos)
    else:
        result = np.nanmean(view, axis=(2, 3), keepdims=return_max_pos)
    if return_max_pos:
        pos = np.where(result == view, 1, 0)
        result = np.squeeze(result)
        return result.tolist(), [[pos]]
    else:
        return mat.tolist(), result.tolist()


m_schema =  StructType([
  StructField('matrix', ArrayType(ArrayType(IntegerType()))), \
  StructField('max_pool_matrix', ArrayType(ArrayType(IntegerType())))     
  ])

poolingOverlap_udf = f.udf(poolingOverlap, m_schema)

df = spark \
            .readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "65.20.115.103:9092") \
            .option("subscribe", "spark") \
            .load() \
            .selectExpr("CAST(value AS STRING)") \
            .withColumn('parsed', poolingOverlap_udf(f.col("value"))) \
            .select("parsed.matrix","parsed.max_pool_matrix")


df.writeStream \
  .queryName("storeKafka") \
  .format("memory") \
  .start()

display(df)