In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.2 pyspark-shell'

In [2]:
from pyspark.sql.session import SparkSession
import json

In [3]:
spark = SparkSession \
        .builder \
        .appName("test").getOrCreate()

Read from `bey`

In [43]:
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:19092") \
    .option("subscribe", "lol1") \
    .option("includeHeaders", "true") \
    .option("startingOffsets", "earliest") \
    .load()



from pyspark.sql.functions import col, from_json
from pyspark.sql.types import ArrayType, FloatType, IntegerType, StringType, StructField, StructType

string_df = df.selectExpr("CAST(value AS STRING)")
    
# Print out the new dataframa schema
#string_df.printSchema()

# Create a schema for the df
schema = StructType([
    StructField("columns", ArrayType(StringType())),
    StructField("data", ArrayType(ArrayType(FloatType()))),
    StructField("id", IntegerType())
    ])

# Select the data present in the column value and apply the schema on it
json_df = string_df.withColumn("jsonData", from_json(col("value"), schema)).select("jsondata.*")

# Print out the dataframa schema
json_df.printSchema()

# Write output to the terminal
# json_df.writeStream.format("console").outputMode("append").start().awaitTermination()

# Write output to kafka topic
#json_df.selectExpr("id AS key", "to_json(struct(*)) AS value")\

root
 |-- columns: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- data: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: float (containsNull = true)
 |-- id: integer (nullable = true)



Now for `hey`

In [5]:
other_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:19092") \
    .option("subscribe", "lmao") \
    .option("includeHeaders", "true") \
    .option("startingOffsets", "earliest") \
    .load()


from pyspark.sql.functions import col, from_json
from pyspark.sql.types import ArrayType, FloatType, IntegerType, StringType, StructField, StructType

other_string_df = other_df.selectExpr("CAST(value AS STRING)")
    
# Print out the new dataframa schema
#string_df.printSchema()

# Create a schema for the df
other_schema = StructType([
    StructField("stuff", StringType()),
    StructField("id", IntegerType())
    ])

# Select the data present in the column value and apply the schema on it
other_json_df = other_string_df.withColumn("jsonData", from_json(col("value"), other_schema)).select("jsondata.*")

# Print out the dataframa schema
other_json_df.printSchema()

# Write output to the terminal
#json_df.writeStream.format("console").outputMode("append").start().awaitTermination()

# Write output to kafka topic
#json_df.selectExpr("id AS key", "to_json(struct(*)) AS value")\

root
 |-- stuff: string (nullable = true)
 |-- id: integer (nullable = true)



In [6]:
joined_df = json_df.join(other_json_df, on="id")

In [7]:
# joined_df.writeStream.foreach(lambda row: print(int(row.stuff) * 100)).start().awaitTermination()

In [8]:
import mlflow
import mlflow.pyfunc
import os

os.environ["MLFLOW_S3_ENDPOINT_URL"] = "http://localhost:9000"
mlflow.set_tracking_uri("http://localhost:5000")

model_name = "cc_model"
model_version = 1

model = mlflow.pyfunc.load_model(
    model_uri=f"models:/{model_name}/{model_version}"
)

In [89]:
predict = mlflow.pyfunc.spark_udf(spark,
                                  model_uri=f"models:/{model_name}/{model_version}",
                                  result_type=ArrayType(IntegerType())
                                 )

In [90]:
joined_df = joined_df.withColumn("predictions", predict("data"))

In [91]:
joined_df.writeStream.format("console").outputMode("append").start().awaitTermination()

StreamingQueryException: Writing job aborted.
=== Streaming Query ===
Identifier: [id = f69a5ded-c5e6-4b9d-a53e-bfec3c75e33f, runId = 830a7e66-0c83-4eea-af01-30ebd989a986]
Current Committed Offsets: {}
Current Available Offsets: {KafkaV2[Subscribe[lmao]]: {"lmao":{"0":5}},KafkaV2[Subscribe[lol]]: {"lol":{"0":10}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=true]
+- Project [id#31, columns#29, data#30, stuff#64, predict(data#30) AS predictions#683]
   +- Project [id#31, columns#29, data#30, stuff#64]
      +- Join Inner, (id#31 = id#65)
         :- Project [jsondata#26.columns AS columns#29, jsondata#26.data AS data#30, jsondata#26.id AS id#31]
         :  +- Project [value#24, from_json(StructField(columns,ArrayType(StringType,true),true), StructField(data,ArrayType(ArrayType(FloatType,true),true),true), StructField(id,IntegerType,true), value#24, Some(America/Toronto)) AS jsonData#26]
         :     +- Project [cast(value#9 as string) AS value#24]
         :        +- StreamingDataSourceV2Relation [key#8, value#9, topic#10, partition#11, offset#12L, timestamp#13, timestampType#14, headers#15], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@135ce89d, KafkaV2[Subscribe[lol]]
         +- Project [jsondata#61.stuff AS stuff#64, jsondata#61.id AS id#65]
            +- Project [value#59, from_json(StructField(stuff,StringType,true), StructField(id,IntegerType,true), value#59, Some(America/Toronto)) AS jsonData#61]
               +- Project [cast(value#44 as string) AS value#59]
                  +- StreamingDataSourceV2Relation [key#43, value#44, topic#45, partition#46, offset#47L, timestamp#48, timestampType#49, headers#50], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@1a982635, KafkaV2[Subscribe[lmao]]


In [92]:
def pred(data):
    result = model.predict(data)
    return result.tolist()

In [93]:
from pyspark.sql import functions as F
pred_udf = F.udf(pred, ArrayType(IntegerType()))

In [None]:
json_df.withColumn("predictions", pred_udf(F.col("data"))).writeStream.format("console").outputMode("append").start().awaitTermination()
