# Data Wrangling on Streaming data
Stream data from eventhubs into Azure Databricks
Leverage Spark functionality to transform, clean and normalize the data
to prepare it for Machine Learning modeling, tuning and classification

In [2]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Event Hubs Connection Configuration
# for more deatils -> https://bit.ly/2Zw4qED
ehConf = {
  'eventhubs.connectionString': dbutils.secrets.get(scope="mle2ebigdatakv", key="twitterstreamingkey") }

In [3]:
inputStream = spark.readStream.format("eventhubs").options(**ehConf).load()
inputStream = inputStream.withColumn("body",inputStream["body"].cast("string"))


In [4]:
# Parse event body and set schema
expectedSchema = StructType([
  StructField("user_name", StringType(), True),
  StructField("user_location", StringType(), True),
  StructField("user_description", StringType(), True),
  StructField("user_created", StringType(), True),
  StructField("user_followers", FloatType(), True),
  StructField("user_friends", FloatType(), True),
  StructField("user_favourites", FloatType(), True),
  StructField("user_verified", BooleanType(), True),
  StructField("date", StringType(), True),
  StructField("text", StringType(), True),
  StructField("hashtags", StringType(), True),
  StructField("source", StringType(), True),
  StructField("is_retweet", BooleanType(), True)
])

In [5]:
# when using azure databricks, use this call to visualize the data
#display(inputStream)

# Split the body into an array
inputStream = inputStream.select(
  inputStream.enqueuedTime.alias('timestamp'),
  split(inputStream.body.cast('string'), ',').alias('splitted_body')
)

# Map the body array to columns
for index, field in enumerate(expectedSchema):
  inputStream = inputStream.withColumn(
    field.name, inputStream.splitted_body.getItem(index)
  )

# Drop irrelevant columns
inputStream = inputStream.drop('timestamp', 'splitted_body')



In [6]:


# Set data types - cast the data in columns to match schema

inputStream = inputStream \
  .withColumn("user_name", inputStream["user_name"].cast("string")) \
  .withColumn("user_location", inputStream["user_location"].cast("string")) \
  .withColumn("user_description", inputStream["user_description"].cast("string")) \
  .withColumn("user_created", inputStream["user_created"].cast("string")) \
  .withColumn("user_followers", inputStream["user_followers"].cast("float")) \
  .withColumn("user_friends", inputStream["user_friends"].cast("float")) \
  .withColumn("user_favourites", inputStream["user_favourites"].cast("float")) \
  .withColumn("user_verified", inputStream["user_verified"].cast("boolean")) \
  .withColumn("date", inputStream["date"].cast("string")) \
  .withColumn("text", inputStream["text"].cast("string")) \
  .withColumn("hashtags", inputStream["hashtags"].cast("string")) \
  .withColumn("source", inputStream["source"].cast("string")) \
  .withColumn("is_retweet", inputStream["is_retweet"].cast("boolean")) \




In [7]:
# provide endpoint and key 
sentimentEndpoint = dbutils.secrets.get(scope="mle2ebigdatakv", key="sentimentEndpoint")
sentimentAccessKeys = dbutils.secrets.get(scope="mle2ebigdatakv", key="sentimentAccessKeys")

# text analytics provides multiple tools for us to use, here we ask specificaly for sentiment
language_api_url = sentimentEndpoint + "/text/analytics/v3.0/sentiment" 
headers = {"Ocp-Apim-Subscription-Key": sentimentAccessKeys}



In [8]:
# drop null text values
inputStream = inputStream.dropna(subset=('text'))

In [9]:
import os
from pyspark.sql.functions import udf
import pandas as pd
import requests
from pprint import pprint

language_api_url = sentimentEndpoint + "/text/analytics/v3.0/sentiment"
headers = {"Ocp-Apim-Subscription-Key": sentimentAccessKeys}

import json
def constractDocRequest(text):
  textJson = text
  docRequest = {}
  doc = {}
  doc["id"]= textJson
  doc["text"]= textJson
  docRequest["documents"] = [doc]
  return docRequest


def extractSentiment(doc,sentimentType):
  if doc == {} or not 'documents' in doc:
    return 0.0
  return float(doc['documents'][0]['confidenceScores'][sentimentType])



def getPositiveSentiment(text):
  if bool(text.strip()) == False:
    return 0.0
  docRequest = constractDocRequest(text)
  response = requests.post(language_api_url, headers=headers, json=docRequest)
  sentiment = response.json()
  positive = extractSentiment(sentiment,'positive')
  return positive

get_positive_sentiment = udf(getPositiveSentiment, StringType())

inputStream = inputStream.withColumn('positive_sentiment', get_positive_sentiment(inputStream["text"]))
  




In [10]:
#,Write processed streaming data to storage
# Stream processed data to parquet for the Data Science to explore and build ML models
comments_stream.writeStream \
  .trigger(processingTime = "30 seconds") \
  .format("parquet") \
  .outputMode("append") \
  .partitionBy("user_location") \
  .option("compression", "none") \
  .option("checkpointLocation", "/mnt/stream/_checkpoints/covid19twitter") \
  .start("/mnt/root/COVID19_TWEETS/REFINED/WITH_SENTIMENT/")