In [1]:
dbutils.library.installPyPI("mlflow")
dbutils.library.restartPython()
import mlflow

In [2]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [3]:
# reading parquet files from the curated data
dirPath = '/mnt/root/COVID19_TWEETS/v3/REFINED/WITH_SENTIMENT/'

In [4]:
# read the data in batch
tweets = spark.read.parquet(dirPath)

In [5]:
# just to get a feel of how much data we are dealing with:
tweets.where(tweets['user_location'] =='US').count()

In [6]:
tweets.where(tweets["text"]=='').count()

In [7]:
tweets.select("user_location").distinct().count()

In [8]:
tweets = tweets.where(tweets["text"]!='')

In [9]:
tweets.where(tweets["text"].isNull()).count()

In [10]:
display(tweets.limit(10))

#Leveraging Spark ML:
Data wrangling, feature exctraction and mormalization:

Tuning with pyspark.ml in databricks environment, metrics and models are automatically logged to MLflow.

In [12]:
# arrange the data to fit to our needs:
from pyspark.sql.types import DoubleType
# drop null values
tweets = tweets.dropna(subset=["positive_sentiment","negative_sentiment","is_retweet","user_location","hashtags"])
#cast values
tweets = tweets.withColumn("negative_sentiment",tweets['negative_sentiment'].cast(DoubleType()))
tweets = tweets.withColumn("positive_sentiment",tweets['positive_sentiment'].cast(DoubleType()))
# fill null values with default value
tweets = tweets.fillna(0.0, subset=["user_followers","user_friends","user_favourites"])
tweets = tweets.fillna(False, subset=["user_verified"])

In [13]:


from pyspark.ml.classification import DecisionTreeClassifier, DecisionTreeClassificationModel
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer


# Read input columns location and hashtags - and annotate them as categorical values
indexLocation = StringIndexer(inputCol="user_location",outputCol="indexedLocations").setHandleInvalid("keep")
indexHashtages = StringIndexer(inputCol="hashtags",outputCol="indexedhashtags").setHandleInvalid("keep")

# assemble multiple columns into one feature column, we often use this with many Spark mllib out of the box machine learnig models
assembler = VectorAssembler(inputCols=[ "indexedLocations","indexedhashtags","user_followers", "user_friends","user_favourites","positive_sentiment","negative_sentiment","user_verified"], outputCol="features")

# StringIndexer: Read input column "is_retweet" (true/false) and annotate them as categorical values.
indexLabel = StringIndexer(inputCol="is_retweet", outputCol="indexedRetweetLabel").setHandleInvalid("keep")

# most often with decision tree we would want to normalize the data to make it fit into the bins:
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)

# DecisionTreeClassifier: Learn to predict column "indexedRetweetLabel" using the "normFeatures" column.
dtc = DecisionTreeClassifier(labelCol="indexedRetweetLabel",featuresCol="normFeatures")

# Chain indexer + dtc together into a single ML Pipeline.
pipeline = Pipeline(stages=[indexLocation,indexHashtages,assembler,indexLabel,normalizer, dtc])

# train our model
model = pipeline.fit(tweets)
# use our model - only for demo, for other cases we will use multiple datasets for testing and training
prediction = model.transform(tweets)


In [14]:
prediction.printSchema()

In [15]:
selected = prediction.select("is_retweet","user_location","hashtags","positive_sentiment","prediction")
for row in selected.collect():
    is_retweet,user_location, hashtags,positive_sentiment,prediction = row
    print("is_retweet=%s , user_location=%s , positive_sentiment=%s , hashtags=%s --> prediction=%f" % (str(is_retweet),str(user_location),str(positive_sentiment) ,str(hashtags), prediction))
    


In [16]:
# persist the model to disk
model.save("/mnt/root/COVID19_TWEETS/ML-Models/V1")