### 1. Mount buckets
Mounting the buckets will give easier access to the path that will be used to retrieve the data and the path that will be used to store files after processing and modeling.

In [None]:
# Step 1: Mount the  project bucket

# Use the mount script from intro_to_spark folder's s3_data_mounting function and keys

def mountBucket(accesskey, secretkey, bucketName, mountFolder):
  ACCESS_KEY_ID = accesskey
  SECRET_ACCESS_KEY = secretkey

  print ("Mounting", bucketName)

  try:
    # Unmount the data in case it was already mounted.
    dbutils.fs.unmount(mountFolder)
    
  except:
    # If it fails to unmount it most likely wasn't mounted in the first place
    print ("Directory not unmounted: ", mountFolder )
    
  finally:
    # Lastly, mount our bucket.
    dbutils.fs.mount("s3a://"+ ACCESS_KEY_ID + ":" + SECRET_ACCESS_KEY + "@" + bucketName, mountFolder)
    print ("The bucket", bucketName, "was mounted to", mountFolder, "\n")

# Set AWS programmatic access credentials

import os
from dotenv import load_dotenv, find_dotenv

# these expect to find a .env file at the directory above the project.                                                                                                                     # the format for that file is (without the comment)                                                                                                                                       #API_KEYNAME=AStringThatIsTheLongAPIKeyFromSomeService                                                                                                                                     
def load_env():
    _ = load_dotenv(find_dotenv())

load_env()
access_key_id = os.environ['AWS_KEY_ID'] 
secret_access_key = os.environ['SECRET']

# Mount WeCloudData bucket with project's data. This project will use AI tweets to make a sentiment analysis model
mountBucket(access_key_id, secret_access_key, "weclouddata/twitter/AI", "/mnt/AI_tweets")

# Mount personal S3 bucket for data storage
mountBucket(access_key_id, secret_access_key, "diogo-weclouddata/big_data_project/", "/mnt/project")

Mounting weclouddata/twitter/AI
/mnt/AI_tweets has been unmounted.
The bucket weclouddata/twitter/AI was mounted to /mnt/AI_tweets 

Mounting diogo-weclouddata/big_data_project/
/mnt/project has been unmounted.
The bucket diogo-weclouddata/big_data_project/ was mounted to /mnt/project 



In [None]:
%fs ls /mnt/AI_tweets

path,name,size,modificationTime
dbfs:/mnt/AI_tweets/2022/,2022/,0,0


### 2. Start spark session and get the spark context

In [None]:
# Start the spark session 

from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName('Sentiment Analysis Project') \
        .getOrCreate()
print('Session created')

# Get spark context

sc = spark.sparkContext

Session created


### 3. Load the dataset

In [None]:
# As I did not have access to the document column_definition.txt file that was shared in the Slack channel, the schema had to be infered and some columns may not be named properly. As we will only use the column refering to the tweets to build the ML model, this should not be a problem.

# Inspect first few rows before defining schema

# Define path to file
AI_tweets = "/mnt/AI_tweets/*/*/*/*/*"

# Read first 5 lines as text
test = sc.textFile(AI_tweets)
test.take(5)

Out[3]: ['1600919229729067008\tA2z Daily News\ta2zdailynews\tPursuing a practical approach to\xa0research https://t.co/IlJ4zuXTJI\t3\tNetherlands\tNone\tThu Dec 08 18:24:22 +0000 2022\t',
 '1600919272208928770\tJoshua Buah\tDJoshuaBuah\tRT @agristok: 🇺🇸 University of Florida is offering a scholarship to conduct PhD studies in plant breeding with emphasis in the fusion of ar…\t39\tNone\tNone\tThu Dec 08 18:24:32 +0000 2022\t',
 '1600919279431716864\tMK.KH2004\tKh2004Mk\t@Cryptogems223 #MVP #DeFi #Cryptocurency #BNB\u202f\u202f\u202f\u202f\u202f\u202f\u202f\u202f\u202f\u202f\u202f\u202f #Ethereum #IMPOSSIBLE #ARNC What is impossible robot? imp… https://t.co/jG5UY8HTWu\t2\tUnited States America 🇺🇸 \tNone\tThu Dec 08 18:24:34 +0000 2022\t',
 "1600919304576602113\tGanpal Ramanjaneya Reddy\tganpalramu\tRT @RajivMessage: In conversation with @GadSaad, Evolutionary Behavioral Scientist, on my latest books, 'Artificial Intelligence and the Fu…\t16\tNone\tNone\tThu Dec 08 18:24:40 +0000 2022\t",

In [None]:
# Define the data schema

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, LongType

my_schema = StructType([
  StructField("id", LongType(), True),
  StructField("name", StringType(), True),
  StructField("username", StringType(), True),
  StructField("tweet", StringType(), True),
  StructField("unknown1", IntegerType(), True),
  StructField("location", StringType(), True),
  StructField("unknown2", StringType(), True),
  StructField("datetime", StringType(), True),
  StructField("unknown3", StringType(), True)
])

# Read the data from the WeCloudData bucket

AI_tweets = "/mnt/AI_tweets/*/*/*/*/*"

data = (spark.read                        
   .option("header", "false")       
   .option("delimiter", "\t")      
   .schema(my_schema)
   .csv(AI_tweets) 
)   

# Display first 10 rows
display(data.limit(10))

id,name,username,tweet,unknown1,location,unknown2,datetime,unknown3
1601172099045158912,YUNUS HANBAL,HanbalYunus,@CryptoEmdarks There are many innovations and surprises in the future project #DXGM metaverse universe. You can par… https://t.co/R4PzLNEGFk,21,,,Fri Dec 09 11:09:11 +0000 2022,
1601172122730041344,ASLI HANBAL,HanbalAsli,"@Crypto__Diva #GPLEX with blockchain technology in the gaming world, with the unique #metaverse world waiting to be… https://t.co/scFwwFVAvz",152,,,Fri Dec 09 11:09:17 +0000 2022,
1601172161372491778,YUNUS HANBAL,HanbalYunus,@CryptoThro There are many innovations and surprises in the future project #DXGM metaverse universe. You can partic… https://t.co/HvRw4EbJZU,21,,,Fri Dec 09 11:09:26 +0000 2022,
1601172171602419712,ASLI HANBAL,HanbalAsli,"@belufrancese #GPLEX with blockchain technology in the gaming world, with the unique #metaverse world waiting to be… https://t.co/FltRhZlt3J",152,,,Fri Dec 09 11:09:28 +0000 2022,
1601172214056767489,YUNUS HANBAL,HanbalYunus,@cryptojack There are many innovations and surprises in the future project #DXGM metaverse universe. You can partic… https://t.co/n0FmdR4I81,21,,,Fri Dec 09 11:09:38 +0000 2022,
1601172226631311362,ASLI HANBAL,HanbalAsli,"@CryptoThro #GPLEX with blockchain technology in the gaming world, with the unique #metaverse world waiting to be d… https://t.co/D0TacL3rxu",152,,,Fri Dec 09 11:09:41 +0000 2022,
1601172266460454914,ASLI HANBAL,HanbalAsli,"@cryptoworld202 #GPLEX with blockchain technology in the gaming world, with the unique #metaverse world waiting to… https://t.co/eflMFNfp2K",152,,,Fri Dec 09 11:09:51 +0000 2022,
1601172313017581568,ASLI HANBAL,HanbalAsli,"@pascualprincipe #GPLEX with blockchain technology in the gaming world, with the unique #metaverse world waiting to… https://t.co/gWtYa8bIf6",152,,,Fri Dec 09 11:10:02 +0000 2022,
1601172334810783744,YUNUS HANBAL,HanbalYunus,@riccardogems There are many innovations and surprises in the future project #DXGM metaverse universe. You can part… https://t.co/MGNxYHe1cK,21,,,Fri Dec 09 11:10:07 +0000 2022,
1601172340188254208,Space ☆ Bruce,spacebruce,"The VF-0 Phoenix variable fighter was an prototype for the VF-1 Valkyrie, it served in 2008 as a front-line fighter… https://t.co/25Omth4G4a",952,68000 HEART ON FIRE /🔞 please,,Fri Dec 09 11:10:08 +0000 2022,


In [None]:
# Separate the tweets, which will be use as data to build the ML Model

tweets = data.select("tweet")

# Cache the dataset
tweets.cache()

# Consolidate the cache by using count function
tweets.count()

Out[5]: 10497

### 4. Prepare the data for building a ml Sentiment Analysis model

In [None]:
# Clean the text by removing URLs (http), names starting with @ (@example), special characters, substituting multiple spaces with single space, lowercasing all text, and triming the leading/trailing whitespaces

import pyspark.sql.functions as F

tweets_clean = tweets.withColumn('tweet', F.regexp_replace('tweet', r"http\S+", "")) \
                    .withColumn('tweet', F.regexp_replace('tweet', r"@[a-zA-z]+", "")) \
                    .withColumn('tweet', F.regexp_replace('tweet', r"[^a-zA-z]", " ")) \
                    .withColumn('tweet', F.regexp_replace('tweet', r"\s+", " ")) \
                    .withColumn('tweet', F.lower('tweet')) \
                    .withColumn('tweet', F.trim('tweet')) 

# Remove rows with missing values

tweets_clean = tweets_clean.dropna()

# Display resulting df

display(tweets_clean.limit(10))

tweet
there are many innovations and surprises in the future project dxgm metaverse universe you can par
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to be
there are many innovations and surprises in the future project dxgm metaverse universe you can partic
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to be
there are many innovations and surprises in the future project dxgm metaverse universe you can partic
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to be d
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to
there are many innovations and surprises in the future project dxgm metaverse universe you can part
the vf phoenix variable fighter was an prototype for the vf valkyrie it served in as a front line fighter


In [None]:
# Install textblob library that will be used to perform text analysis

!pip install textblob

Collecting textblob
  Downloading textblob-0.18.0.post0-py3-none-any.whl (626 kB)
[?25l[K     |▌                               | 10 kB 24.6 MB/s eta 0:00:01[K     |█                               | 20 kB 16.9 MB/s eta 0:00:01[K     |█▋                              | 30 kB 9.9 MB/s eta 0:00:01[K     |██                              | 40 kB 4.7 MB/s eta 0:00:01[K     |██▋                             | 51 kB 5.6 MB/s eta 0:00:01[K     |███▏                            | 61 kB 6.1 MB/s eta 0:00:01[K     |███▋                            | 71 kB 6.0 MB/s eta 0:00:01[K     |████▏                           | 81 kB 4.8 MB/s eta 0:00:01[K     |████▊                           | 92 kB 5.4 MB/s eta 0:00:01[K     |█████▎                          | 102 kB 4.7 MB/s eta 0:00:01[K     |█████▊                          | 112 kB 4.7 MB/s eta 0:00:01[K     |██████▎                         | 122 kB 4.7 MB/s eta 0:00:01[K     |██████▉                         | 133 kB 4.7 MB/s eta 0

In [None]:
# Define function to get sentiment labels from tweets

# Import TextBlob function to handle text analysis

from textblob import TextBlob

# Define functions to be used

def get_polarity(tweet):
  score = TextBlob(tweet).sentiment.polarity
  return score

def get_sentiment(tweet):
  score = TextBlob(tweet).sentiment.polarity
  if score < 0:
    return "negative"
  elif score == 0:
      return "neutral"
  else:
    return "positive"
  
# Convert functions to udf

from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, FloatType

get_polarity_UDF = udf(lambda x: get_polarity(x), FloatType()) 
get_sentiment_UDF = udf(lambda x: get_sentiment(x), StringType()) 

In [None]:
# Create sentiment column

from pyspark.sql.functions import col

tweets_labeled = tweets_clean.withColumn("sentiment", get_sentiment_UDF(col("tweet"))) \
                             .withColumn("polarity", get_polarity_UDF(col("tweet")))

# Display first 10 rows

display(tweets_labeled.limit(10))

tweet,sentiment,polarity
there are many innovations and surprises in the future project dxgm metaverse universe you can par,positive,0.25
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to be,positive,0.375
there are many innovations and surprises in the future project dxgm metaverse universe you can partic,positive,0.25
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to be,positive,0.375
there are many innovations and surprises in the future project dxgm metaverse universe you can partic,positive,0.25
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to be d,positive,0.375
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to,positive,0.375
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to,positive,0.375
there are many innovations and surprises in the future project dxgm metaverse universe you can part,positive,0.25
the vf phoenix variable fighter was an prototype for the vf valkyrie it served in as a front line fighter,neutral,0.0


### 5. Creating a Machine Learning model for Sentiment Analysis

In [None]:
# Import relevant packages

from pyspark.ml.feature import NGram, VectorAssembler, StopWordsRemover, CountVectorizer, HashingTF, IDF, Tokenizer, StringIndexer
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Split the data intro training, validation and test sets

train, validation, test = tweets_labeled.randomSplit([0.6, 0.2, 0.2], seed=2)

In [None]:
# Logistic regression model using 1gram_idf as feature

# Create transformers for the ML pipeline with Logistic Regression
tokenizer = Tokenizer(inputCol="tweet", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="1gram_idf", minDocFreq=5) #minDocFreq: remove sparse terms
assembler = VectorAssembler(inputCols=["1gram_idf"], outputCol="features")
label_encoder= StringIndexer(inputCol = "sentiment", outputCol = "label")

lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, assembler, label_encoder, lr])

pipeline_model = pipeline.fit(train) # Train Logistic Regression model
predictions = pipeline_model.transform(validation)

# Evaluate the model on the validation data

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label")
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
log_loss = evaluator.evaluate(predictions, {evaluator.metricName: "logLoss"})

print("LR accuracy score: {0:.4f}".format(accuracy))
print("LR weighted precision: {0:.4f}".format(precision))
print("LR weighted recall: {0:.4f}".format(recall))
print("LR f1: {0:.4f}".format(f1))
print("LR log loss: {0:.4f}".format(log_loss))


LR accuracy score: 0.9026
LR weighted precision: 0.9034
LR weighted recall: 0.9026
LR f1: 0.9027
LR log loss: 2.1735


In [None]:
# Decision Tree model using 1gram_idf as feature

# Create transformers for the ML pipeline with Logistic Regression
tokenizer = Tokenizer(inputCol="tweet", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="1gram_idf", minDocFreq=5) #minDocFreq: remove sparse terms
assembler = VectorAssembler(inputCols=["1gram_idf"], outputCol="features")
label_encoder= StringIndexer(inputCol = "sentiment", outputCol = "label")

dt = DecisionTreeClassifier(maxDepth=5)
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, assembler, label_encoder, dt])

pipeline_model = pipeline.fit(train) # Train model
predictions = pipeline_model.transform(validation)

# Evaluate the model on the validation data

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label")
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
log_loss = evaluator.evaluate(predictions, {evaluator.metricName: "logLoss"})

print("DT accuracy score: {0:.4f}".format(accuracy))
print("DT weighted precision: {0:.4f}".format(precision))
print("DT weighted recall: {0:.4f}".format(recall))
print("DT f1: {0:.4f}".format(f1))
print("DT log loss: {0:.4f}".format(log_loss))

DT accuracy score: 0.7892
DT weighted precision: 0.8353
DT weighted recall: 0.7892
DT f1: 0.7645
DT log loss: 0.4794


In [None]:
# Random Forest model using 1gram_idf as feature

from pyspark.ml.classification import RandomForestClassifier

# Create transformers for the ML pipeline with Logistic Regression
tokenizer = Tokenizer(inputCol="tweet", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="1gram_idf", minDocFreq=5) #minDocFreq: remove sparse terms
assembler = VectorAssembler(inputCols=["1gram_idf"], outputCol="features")
label_encoder= StringIndexer(inputCol = "sentiment", outputCol = "label")

rf = RandomForestClassifier(numTrees=100, maxDepth=5)
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, assembler, label_encoder, rf])

pipeline_model = pipeline.fit(train) # Train Logistic Regression model
predictions = pipeline_model.transform(validation)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label")
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
log_loss = evaluator.evaluate(predictions, {evaluator.metricName: "logLoss"})

print("RF accuracy score: {0:.4f}".format(accuracy))
print("RF weighted precision: {0:.4f}".format(precision))
print("RF weighted recall: {0:.4f}".format(recall))
print("RF f1: {0:.4f}".format(f1))
print("RF log loss: {0:.4f}".format(log_loss))


RF accuracy score: 0.6562
RF weighted precision: 0.7951
RF weighted recall: 0.6562
RF f1: 0.6075
RF log loss: 0.8240


Considering accuracy as the key metric to select the best model, the logistic regression that uses 1gram_idf as feature is the winner. The next step is to score the model using the test set.

In [None]:
# Score the best model in the test dataset

# Logistic regression model using 1gram_idf as feature

# Create transformers for the ML pipeline with Logistic Regression
tokenizer = Tokenizer(inputCol="tweet", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="1gram_idf", minDocFreq=5) #minDocFreq: remove sparse terms
assembler = VectorAssembler(inputCols=["1gram_idf"], outputCol="features")
label_encoder= StringIndexer(inputCol = "sentiment", outputCol = "label")

lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, assembler, label_encoder, lr])

pipeline_model = pipeline.fit(train) # Train Logistic Regression model
predictions_test = pipeline_model.transform(test)

# Score the model on the test data

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label")
accuracy = evaluator.evaluate(predictions_test, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions_test, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions_test, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predictions_test, {evaluator.metricName: "f1"})
log_loss = evaluator.evaluate(predictions_test, {evaluator.metricName: "logLoss"})

print("LR accuracy score: {0:.4f}".format(accuracy))
print("LR weighted precision: {0:.4f}".format(precision))
print("LR weighted recall: {0:.4f}".format(recall))
print("LR f1: {0:.4f}".format(f1))
print("LR log loss: {0:.4f}".format(log_loss))


LR accuracy score: 0.9047
LR weighted precision: 0.9045
LR weighted recall: 0.9047
LR f1: 0.9046
LR log loss: 2.1283


In [None]:
# Fit the best model using the whole dataset and use it to make predictions

# Logistic regression model using 1gram_idf as feature

# Create transformers for the ML pipeline with Logistic Regression
tokenizer = Tokenizer(inputCol="tweet", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="1gram_idf", minDocFreq=5) #minDocFreq: remove sparse terms
assembler = VectorAssembler(inputCols=["1gram_idf"], outputCol="features")
label_encoder= StringIndexer(inputCol = "sentiment", outputCol = "label")

lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, assembler, label_encoder, lr])

pipeline_model = pipeline.fit(tweets_labeled) # Train Logistic Regression model
predictions = pipeline_model.transform(tweets_labeled)

### Saving project files

In [None]:
display(tweets_labeled.limit(10))

tweet,sentiment,polarity
there are many innovations and surprises in the future project dxgm metaverse universe you can par,positive,0.25
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to be,positive,0.375
there are many innovations and surprises in the future project dxgm metaverse universe you can partic,positive,0.25
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to be,positive,0.375
there are many innovations and surprises in the future project dxgm metaverse universe you can partic,positive,0.25
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to be d,positive,0.375
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to,positive,0.375
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to,positive,0.375
there are many innovations and surprises in the future project dxgm metaverse universe you can part,positive,0.25
the vf phoenix variable fighter was an prototype for the vf valkyrie it served in as a front line fighter,neutral,0.0


In [None]:
display(predictions.limit(10))

tweet,sentiment,polarity,tokens,filtered,cv,1gram_idf,features,label,rawPrediction,probability,prediction
there are many innovations and surprises in the future project dxgm metaverse universe you can par,positive,0.25,"List(there, are, many, innovations, and, surprises, in, the, future, project, dxgm, metaverse, universe, you, can, par)","List(many, innovations, surprises, future, project, dxgm, metaverse, universe, par)","Map(vectorType -> sparse, length -> 8485, indices -> List(43, 47, 111, 126, 140, 141, 306, 703, 3225), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 8485, indices -> List(43, 47, 111, 126, 140, 141, 306, 703, 3225), values -> List(3.6058791607181138, 3.736907423124518, 4.302541283385503, 4.422086434035286, 4.804021044733257, 4.5047781498804, 5.215317073152214, 6.167325887628449, 0.0))","Map(vectorType -> sparse, length -> 8485, indices -> List(43, 47, 111, 126, 140, 141, 306, 703), values -> List(3.6058791607181138, 3.736907423124518, 4.302541283385503, 4.422086434035286, 4.804021044733257, 4.5047781498804, 5.215317073152214, 6.167325887628449))",1.0,"Map(vectorType -> dense, length -> 3, values -> List(-12.827131833325156, 19.138761308105273, -6.311629474780116))","Map(vectorType -> dense, length -> 3, values -> List(1.3103550885926769E-14, 0.9999999999911351, 8.851884135562573E-12))",1.0
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to be,positive,0.375,"List(gplex, with, blockchain, technology, in, the, gaming, world, with, the, unique, metaverse, world, waiting, to, be)","List(gplex, blockchain, technology, gaming, world, unique, metaverse, world, waiting)","Map(vectorType -> sparse, length -> 8485, indices -> List(9, 16, 18, 43, 83, 105, 132, 156), values -> List(1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 8485, indices -> List(9, 16, 18, 43, 83, 105, 132, 156), values -> List(3.0338099117114044, 7.03715085761506, 3.2594317790400815, 3.6058791607181138, 3.935358361848356, 4.1831945257529375, 4.446183985614347, 4.613977441845392))","Map(vectorType -> sparse, length -> 8485, indices -> List(9, 16, 18, 43, 83, 105, 132, 156), values -> List(3.0338099117114044, 7.03715085761506, 3.2594317790400815, 3.6058791607181138, 3.935358361848356, 4.1831945257529375, 4.446183985614347, 4.613977441845392))",1.0,"Map(vectorType -> dense, length -> 3, values -> List(-13.858940063528273, 17.117931068420994, -3.258991004892722))","Map(vectorType -> dense, length -> 3, values -> List(3.52302560946477E-14, 0.9999999985860761, 1.4138885737591142E-9))",1.0
there are many innovations and surprises in the future project dxgm metaverse universe you can partic,positive,0.25,"List(there, are, many, innovations, and, surprises, in, the, future, project, dxgm, metaverse, universe, you, can, partic)","List(many, innovations, surprises, future, project, dxgm, metaverse, universe, partic)","Map(vectorType -> sparse, length -> 8485, indices -> List(43, 47, 111, 126, 140, 141, 306, 703, 2547), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 8485, indices -> List(43, 47, 111, 126, 140, 141, 306, 703, 2547), values -> List(3.6058791607181138, 3.736907423124518, 4.302541283385503, 4.422086434035286, 4.804021044733257, 4.5047781498804, 5.215317073152214, 6.167325887628449, 0.0))","Map(vectorType -> sparse, length -> 8485, indices -> List(43, 47, 111, 126, 140, 141, 306, 703), values -> List(3.6058791607181138, 3.736907423124518, 4.302541283385503, 4.422086434035286, 4.804021044733257, 4.5047781498804, 5.215317073152214, 6.167325887628449))",1.0,"Map(vectorType -> dense, length -> 3, values -> List(-12.827131833325156, 19.138761308105273, -6.311629474780116))","Map(vectorType -> dense, length -> 3, values -> List(1.3103550885926769E-14, 0.9999999999911351, 8.851884135562573E-12))",1.0
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to be,positive,0.375,"List(gplex, with, blockchain, technology, in, the, gaming, world, with, the, unique, metaverse, world, waiting, to, be)","List(gplex, blockchain, technology, gaming, world, unique, metaverse, world, waiting)","Map(vectorType -> sparse, length -> 8485, indices -> List(9, 16, 18, 43, 83, 105, 132, 156), values -> List(1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 8485, indices -> List(9, 16, 18, 43, 83, 105, 132, 156), values -> List(3.0338099117114044, 7.03715085761506, 3.2594317790400815, 3.6058791607181138, 3.935358361848356, 4.1831945257529375, 4.446183985614347, 4.613977441845392))","Map(vectorType -> sparse, length -> 8485, indices -> List(9, 16, 18, 43, 83, 105, 132, 156), values -> List(3.0338099117114044, 7.03715085761506, 3.2594317790400815, 3.6058791607181138, 3.935358361848356, 4.1831945257529375, 4.446183985614347, 4.613977441845392))",1.0,"Map(vectorType -> dense, length -> 3, values -> List(-13.858940063528273, 17.117931068420994, -3.258991004892722))","Map(vectorType -> dense, length -> 3, values -> List(3.52302560946477E-14, 0.9999999985860761, 1.4138885737591142E-9))",1.0
there are many innovations and surprises in the future project dxgm metaverse universe you can partic,positive,0.25,"List(there, are, many, innovations, and, surprises, in, the, future, project, dxgm, metaverse, universe, you, can, partic)","List(many, innovations, surprises, future, project, dxgm, metaverse, universe, partic)","Map(vectorType -> sparse, length -> 8485, indices -> List(43, 47, 111, 126, 140, 141, 306, 703, 2547), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 8485, indices -> List(43, 47, 111, 126, 140, 141, 306, 703, 2547), values -> List(3.6058791607181138, 3.736907423124518, 4.302541283385503, 4.422086434035286, 4.804021044733257, 4.5047781498804, 5.215317073152214, 6.167325887628449, 0.0))","Map(vectorType -> sparse, length -> 8485, indices -> List(43, 47, 111, 126, 140, 141, 306, 703), values -> List(3.6058791607181138, 3.736907423124518, 4.302541283385503, 4.422086434035286, 4.804021044733257, 4.5047781498804, 5.215317073152214, 6.167325887628449))",1.0,"Map(vectorType -> dense, length -> 3, values -> List(-12.827131833325156, 19.138761308105273, -6.311629474780116))","Map(vectorType -> dense, length -> 3, values -> List(1.3103550885926769E-14, 0.9999999999911351, 8.851884135562573E-12))",1.0
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to be d,positive,0.375,"List(gplex, with, blockchain, technology, in, the, gaming, world, with, the, unique, metaverse, world, waiting, to, be, d)","List(gplex, blockchain, technology, gaming, world, unique, metaverse, world, waiting, d)","Map(vectorType -> sparse, length -> 8485, indices -> List(9, 16, 18, 43, 83, 105, 132, 150, 156), values -> List(1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 8485, indices -> List(9, 16, 18, 43, 83, 105, 132, 150, 156), values -> List(3.0338099117114044, 7.03715085761506, 3.2594317790400815, 3.6058791607181138, 3.935358361848356, 4.1831945257529375, 4.446183985614347, 4.7697319712546244, 4.613977441845392))","Map(vectorType -> sparse, length -> 8485, indices -> List(9, 16, 18, 43, 83, 105, 132, 150, 156), values -> List(3.0338099117114044, 7.03715085761506, 3.2594317790400815, 3.6058791607181138, 3.935358361848356, 4.1831945257529375, 4.446183985614347, 4.7697319712546244, 4.613977441845392))",1.0,"Map(vectorType -> dense, length -> 3, values -> List(-14.466180561266349, 16.458606155357113, -1.9924255940907654))","Map(vectorType -> dense, length -> 3, values -> List(3.711382970888799E-14, 0.9999999902989135, 9.701049521968727E-9))",1.0
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to,positive,0.375,"List(gplex, with, blockchain, technology, in, the, gaming, world, with, the, unique, metaverse, world, waiting, to)","List(gplex, blockchain, technology, gaming, world, unique, metaverse, world, waiting)","Map(vectorType -> sparse, length -> 8485, indices -> List(9, 16, 18, 43, 83, 105, 132, 156), values -> List(1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 8485, indices -> List(9, 16, 18, 43, 83, 105, 132, 156), values -> List(3.0338099117114044, 7.03715085761506, 3.2594317790400815, 3.6058791607181138, 3.935358361848356, 4.1831945257529375, 4.446183985614347, 4.613977441845392))","Map(vectorType -> sparse, length -> 8485, indices -> List(9, 16, 18, 43, 83, 105, 132, 156), values -> List(3.0338099117114044, 7.03715085761506, 3.2594317790400815, 3.6058791607181138, 3.935358361848356, 4.1831945257529375, 4.446183985614347, 4.613977441845392))",1.0,"Map(vectorType -> dense, length -> 3, values -> List(-13.858940063528273, 17.117931068420994, -3.258991004892722))","Map(vectorType -> dense, length -> 3, values -> List(3.52302560946477E-14, 0.9999999985860761, 1.4138885737591142E-9))",1.0
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to,positive,0.375,"List(gplex, with, blockchain, technology, in, the, gaming, world, with, the, unique, metaverse, world, waiting, to)","List(gplex, blockchain, technology, gaming, world, unique, metaverse, world, waiting)","Map(vectorType -> sparse, length -> 8485, indices -> List(9, 16, 18, 43, 83, 105, 132, 156), values -> List(1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 8485, indices -> List(9, 16, 18, 43, 83, 105, 132, 156), values -> List(3.0338099117114044, 7.03715085761506, 3.2594317790400815, 3.6058791607181138, 3.935358361848356, 4.1831945257529375, 4.446183985614347, 4.613977441845392))","Map(vectorType -> sparse, length -> 8485, indices -> List(9, 16, 18, 43, 83, 105, 132, 156), values -> List(3.0338099117114044, 7.03715085761506, 3.2594317790400815, 3.6058791607181138, 3.935358361848356, 4.1831945257529375, 4.446183985614347, 4.613977441845392))",1.0,"Map(vectorType -> dense, length -> 3, values -> List(-13.858940063528273, 17.117931068420994, -3.258991004892722))","Map(vectorType -> dense, length -> 3, values -> List(3.52302560946477E-14, 0.9999999985860761, 1.4138885737591142E-9))",1.0
there are many innovations and surprises in the future project dxgm metaverse universe you can part,positive,0.25,"List(there, are, many, innovations, and, surprises, in, the, future, project, dxgm, metaverse, universe, you, can, part)","List(many, innovations, surprises, future, project, dxgm, metaverse, universe, part)","Map(vectorType -> sparse, length -> 8485, indices -> List(43, 47, 111, 126, 140, 141, 151, 306, 703), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 8485, indices -> List(43, 47, 111, 126, 140, 141, 151, 306, 703), values -> List(3.6058791607181138, 3.736907423124518, 4.302541283385503, 4.422086434035286, 4.804021044733257, 4.5047781498804, 4.673400862316193, 5.215317073152214, 6.167325887628449))","Map(vectorType -> sparse, length -> 8485, indices -> List(43, 47, 111, 126, 140, 141, 151, 306, 703), values -> List(3.6058791607181138, 3.736907423124518, 4.302541283385503, 4.422086434035286, 4.804021044733257, 4.5047781498804, 4.673400862316193, 5.215317073152214, 6.167325887628449))",1.0,"Map(vectorType -> dense, length -> 3, values -> List(-15.380186271680214, 21.54660421111065, -6.166417939430435))","Map(vectorType -> dense, length -> 3, values -> List(9.181183318356128E-17, 0.9999999999990787, 9.212710055364739E-13))",1.0
the vf phoenix variable fighter was an prototype for the vf valkyrie it served in as a front line fighter,neutral,0.0,"List(the, vf, phoenix, variable, fighter, was, an, prototype, for, the, vf, valkyrie, it, served, in, as, a, front, line, fighter)","List(vf, phoenix, variable, fighter, prototype, vf, valkyrie, served, front, line, fighter)","Map(vectorType -> sparse, length -> 8485, indices -> List(98, 1428, 2111, 2154, 2610, 3265, 3277, 4182, 4272), values -> List(2.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 8485, indices -> List(98, 1428, 2111, 2154, 2610, 3265, 3277, 4182, 4272), values -> List(8.187164734126501, 6.860473068188393, 7.312458191931451, 7.312458191931451, 0.0, 0.0, 0.0, 0.0, 0.0))","Map(vectorType -> sparse, length -> 8485, indices -> List(98, 1428, 2111, 2154), values -> List(8.187164734126501, 6.860473068188393, 7.312458191931451, 7.312458191931451))",2.0,"Map(vectorType -> dense, length -> 3, values -> List(8.446717625389514, -26.241603544538044, 17.794885919148527))","Map(vectorType -> dense, length -> 3, values -> List(8.711727009536473E-5, 7.501667094655749E-20, 0.9999128827299048))",2.0


In [None]:
predictions.printSchema()

root
 |-- tweet: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- polarity: float (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- cv: vector (nullable = true)
 |-- 1gram_idf: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [None]:
my_bucket = "/mnt/project"

# Save data df to bucket

(tweets_labeled.write                      
  .option("delimiter", "\t")  
  .option("header", "false")
  .mode("overwrite")              
  .csv(my_bucket + "/data")              
)

# Drop array and vector columns from predictitons

predictions_final = predictions.drop("tokens", "filtered", "cv", "1gram_idf", "features", "rawPrediction", "probability")

# Save prediction_final df to bucket

(predictions_final.write                      
  .option("delimiter", "\t")  
  .option("header", "false")
  .mode("overwrite")              
  .csv(my_bucket + "/predictions")              
)


tweet,sentiment,polarity,label,prediction
there are many innovations and surprises in the future project dxgm metaverse universe you can par,positive,0.25,1.0,1.0
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to be,positive,0.375,1.0,1.0
there are many innovations and surprises in the future project dxgm metaverse universe you can partic,positive,0.25,1.0,1.0
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to be,positive,0.375,1.0,1.0
there are many innovations and surprises in the future project dxgm metaverse universe you can partic,positive,0.25,1.0,1.0
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to be d,positive,0.375,1.0,1.0
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to,positive,0.375,1.0,1.0
gplex with blockchain technology in the gaming world with the unique metaverse world waiting to,positive,0.375,1.0,1.0
there are many innovations and surprises in the future project dxgm metaverse universe you can part,positive,0.25,1.0,1.0
the vf phoenix variable fighter was an prototype for the vf valkyrie it served in as a front line fighter,neutral,0.0,2.0,2.0


In [None]:
my_bucket = "/mnt/project"

# Drop array and vector columns from predictitons_test

predictions_test = predictions_test.drop("tokens", "filtered", "cv", "1gram_idf", "features", "rawPrediction", "probability")

# Save prediction_test df to bucket

(predictions_test.write                      
  .option("delimiter", "\t")  
  .option("header", "false")
  .mode("overwrite")              
  .csv(my_bucket + "/predictions_testdata")              
)


In [None]:
# Create text data with filtered tweet for WordCloud visualization

from pyspark.ml.feature import StopWordsRemover, Tokenizer

tokenizer = Tokenizer(inputCol="tweet", outputCol="tokens")
tweets_tokenized = tokenizer.transform(tweets_clean)
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
tweets_filtered = stopword_remover.transform(tweets_tokenized)

tweets_text = []

for row in tweets_filtered.toPandas()["filtered"]:
  for value in row:
      tweets_text.append(value)



In [None]:
my_bucket = "/mnt/project"

# Save text_file to bucket

df = spark.createDataFrame(tweets_text, StringType())

(df.write                      
  .option("delimiter", "\t")  
  .option("header", "false")
  .mode("overwrite")              
  .csv(my_bucket + "/tweets_text")              
)