In [None]:
import os
import time
import json
import requests
import xml.etree.ElementTree as ET
import datetime
import yaml
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, FloatType, StructField, StructType, DoubleType
from pyspark.sql.functions import *
from pyspark.sql.functions import udf, array, col
import mlflow
import mlflow.pyfunc
import pandas as pd
from pyspark.sql.functions import struct
from pyspark.sql import functions as F

#Extracting the correct URL from hive-site.xml
tree = ET.parse('/etc/hadoop/conf/hive-site.xml')
root = tree.getroot()

for prop in root.findall('property'):
    if prop.find('name').text == "hive.metastore.warehouse.dir":
        storage = prop.find('value').text.split("/")[0] + "//" + prop.find('value').text.split("/")[2]

print("The correct Cloud Storage URL is:{}".format(storage))

os.environ['STORAGE'] = storage

spark = SparkSession.builder.appName('MLFlow model inference').config("spark.yarn.access.hadoopFileSystems",os.environ["STORAGE"]).getOrCreate()

predict = mlflow.pyfunc.spark_udf(spark, "/home/cdsw/model")


customSchema = StructType() \
     .add("_c0", FloatType(), True) \
     .add("_c1", FloatType(), True) \
     .add("_c2", FloatType(), True) \
     .add("_c3", FloatType(), True) \
     .add("_c4", FloatType(), True) \
     .add("_c5", FloatType(), True) \
     .add("_c6", FloatType(), True) \
     .add("_c7", FloatType(), True) \
     .add("_c8", FloatType(), True) \
     .add("_c9", FloatType(), True) \
     .add("_c10", FloatType(), True) \
     .add("_c11", FloatType(), True) \
     .add("_c12", FloatType(), True) \
     .add("_c13", FloatType(), True) \
     .add("_c14", FloatType(), True) \
     .add("_c15", FloatType(), True) \
     .add("_c16", FloatType(), True) \
     .add("_c17", FloatType(), True) \
     .add("_c18", FloatType(), True) \
     .add("_c19", FloatType(), True) \
     .add("_c20", FloatType(), True) \
     .add("_c21", FloatType(), True) \
     .add("_c22", FloatType(), True) \
     .add("_c23", FloatType(), True) \
     .add("_c24", FloatType(), True)


df_data=spark.readStream.format('csv').option('header',False).option("sep",",").schema(customSchema).load(os.environ["STORAGE"]+"/tmp/")

df=df_data.select(struct([df_data[i] for i in range(25)]).alias('domain_tokens'))
#df.show()
df1=df.withColumn("prediction", predict(df.domain_tokens)).withColumn('timestamp', F.current_timestamp()).withWatermark("timestamp", "3 seconds").groupBy("domain_tokens","timestamp").sum("prediction")
df1.writeStream.format('json').outputMode("append").trigger(processingTime='2 seconds').option("checkpointLocation",os.environ["STORAGE"]+"/chkpnt/").option("path",os.environ["STORAGE"]+"/output/").start().awaitTermination()

The correct Cloud Storage URL is:s3a://hrong1-cdp-bucket


Setting spark.hadoop.yarn.resourcemanager.principal to hrongali
                                                                                