## Setting the configuration for Azure Storage Account access

In [0]:
spark.conf.set(
  "fs.azure.account.key.twittergenstorage.blob.core.windows.net",
  "okEjVsoQ+OmK+TNB4/gpnkiDAVNofpG1IxYTOFx+j1JJGQHw9JIk2zakiqyoXm4fmtrAH66vXQB0+AStEZgvtg==")

## Copying the model to Azure Storage Account

In [0]:
dbutils.fs.cp("wasbs://realtimetwitterdata@twittergenstorage.blob.core.windows.net/models/logistic-regression-hash","FileStore/models/logistic-regression-hash",recurse=True)

Out[2]: True

## Importing the libraries and creating the transformers, estimator and the pipeline

In [0]:
from pyspark.ml import PipelineModel

!pip install spark-nlp==4.0.1
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *

import pyspark.ml.feature as feats
from pyspark.ml.feature import StringIndexer, IDF, HashingTF
from pyspark.ml.classification import LogisticRegression

documentAssembler = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

tokenizer = Tokenizer().setInputCols("document").setOutputCol("token")

normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized")\
    .setLowercase(True)\
    .setCleanupPatterns(["[^\w\d\s]"]) # remove punctuations (keep alphanumeric chars)
    # if we don't set CleanupPatterns, it will only keep alphabet letters ([^A-Za-z])

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)\

finisher = Finisher().setInputCols("cleanTokens").setOutputCols("output").setOutputAsArray(False).setAnnotationSplitSymbol(' ')

tokenizer2 = feats.Tokenizer().setInputCol("output").setOutputCol("token_tweet")

hashtf = HashingTF(numFeatures=2**16, inputCol="token_tweet", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")

lr = LogisticRegression(maxIter = 100, labelCol="label", featuresCol="features", predictionCol="prediction")

pipeline = PipelineModel(stages=[documentAssembler,tokenizer,normalizer,stopwords_cleaner,finisher,tokenizer2,hashtf,idf,label_stringIdx,lr])
pipelineFit = pipeline.load("dbfs:/FileStore/models/logistic-regression-hash")

You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-18b8da7c-6425-486f-9925-06c45f4f6e0c/bin/python -m pip install --upgrade pip' command.[0m


## Creating configuration for Azure Event Hub

In [0]:
# Initialize event hub config dictionary with connectionString

ehConf = {}

connectionString = "Endpoint=sb://streamer.servicebus.windows.net/;SharedAccessKeyName=manager;SharedAccessKey=zXFkFXKED4g14LDP3pw/XibU4a0RO/TTJ0VB34inmuI=;EntityPath=tweets"

ehConf['eventhubs.connectionString'] = connectionString

# Add consumer group to the ehConf dictionary

ehConf['eventhubs.consumerGroup'] = "$Default"

# Encrypt ehConf connectionString property

ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)

In [0]:
df = spark.readStream.format("eventhubs").options(**ehConf).load()

## Decoding the input stream

In [0]:
from pyspark.sql.types import *

import  pyspark.sql.functions as F

events_schema = StructType([
   StructField("target",IntegerType(),True),
   StructField("text",StringType(),True)
   ]
 )

decoded_df = df.selectExpr("cast (body as string) as json").select(F.from_json("json",events_schema).alias("data")).select("data.*")

In [0]:
result = pipelineFit.transform(decoded_df)

## Creating the Databricks Delta table

In [0]:
%sql
CREATE TABLE IF NOT EXISTS default.modelOutput (
  timestamp TIMESTAMP,
  target TINYINT,
  prediction TINYINT,
  clean_text STRING
) USING DELTA

## Starting the Stream consumer and writing to the Delta table

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.functions import current_timestamp

In [0]:
result.withColumn("clean_text", result["output"]) \
    .select(col('target'), col('prediction'), col('clean_text')) \
    .withColumn("timestamp", current_timestamp()) \
    .withColumn("prediction", result["prediction"].cast(ByteType())) \
    .withColumn("target", result["target"].cast(ByteType())) \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/delta/_checkpoints/") \
    .start("/user/hive/warehouse/modeloutput")


Out[11]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fcb75207400>

## Fetching sample records from the Databricks Delta table

In [0]:
%sql
select * from default.modelOutput order by timestamp desc limit 10;

timestamp,target,prediction,clean_text
2022-08-17T08:23:31.607+0000,0,0,monday morning still four days week
2022-08-17T08:23:31.607+0000,0,0,concerts hurt back lol
2022-08-17T08:23:31.607+0000,0,0,cant sleep miss ipod amongst things another devoured comic need heaps stupid dishevled comic sectionborders
2022-08-17T08:23:31.607+0000,0,0,waiting brownies get doneheading chadleys later fun wish see baby today
2022-08-17T08:23:31.607+0000,0,0,rawr dont want go school tomorrow listening song makes sad
2022-08-17T08:23:31.607+0000,1,0,bios compromise made security mail servers user accounts vulnerable bad passwords jc
2022-08-17T08:23:31.607+0000,1,1,haha first sitemodel icon httpi686photobucketcomalbumsvv223lautnerx3sitemodelx3png
2022-08-17T08:23:31.607+0000,1,1,nice work today wife order driving father make radiohosting long eyes open lol take look
2022-08-17T08:23:31.607+0000,1,1,admiring beautiful farm ft going onto rc
2022-08-17T08:23:31.607+0000,1,1,reneetakeover phoebe friends course
