In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName('WCD Big Data Course') \
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
        .getOrCreate()
print('Session created')
sc = spark.sparkContext


Session created


In [None]:
def mount_s3_bucket(access_key, secret_key, bucket_name, mount_folder):
  ACCESS_KEY_ID = access_key
  SECRET_ACCESS_KEY = secret_key
  ENCODED_SECRET_KEY = SECRET_ACCESS_KEY.replace("/", "%2F")

  print ("Mounting", bucket_name)

  try:
    # Unmount the data in case it was already mounted.
    dbutils.fs.unmount("/mnt/%s" % mount_folder)
    
  except:
    # If it fails to unmount it most likely wasn't mounted in the first place
    print ("Directory not unmounted: ", mount_folder)
    
  finally:
    # Lastly, mount our bucket.
    dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY_ID, ENCODED_SECRET_KEY, bucket_name), "/mnt/%s" % mount_folder)
    #dbutils.fs.mount("s3a://"+ ACCESS_KEY_ID + ":" + ENCODED_SECRET_KEY + "@" + bucket_name, mount_folder)
    print ("The bucket", bucket_name, "was mounted to", mount_folder, "\n")
    

In [None]:
# Set AWS programmatic access credentials
ACCESS_KEY = ""
SECRET_ACCESS_KEY = ""

In [None]:
mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, "weclouddata/", "WCD")

Mounting weclouddata/
/mnt/WCD has been unmounted.
The bucket weclouddata/ was mounted to WCD 



In [None]:
# %fs ls /mnt/WCD/twitter/BlackFriday/2022/11/25/

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, FloatType, StringType

tweetSchema = StructType([
    StructField('id', LongType(), True),
    StructField('user_name', StringType(), True),
    StructField('user_screen_name', StringType(), True),
    StructField('text', StringType(), True),
    StructField('followers_count', IntegerType(), True),
    StructField('location', StringType(), True),
    StructField('geo', StringType(), True),
    StructField('created_at', StringType(), True) ])

In [None]:
file = 'mnt/WCD/twitter/BlackFriday/2022/11/25/*/*'

tweets = (spark.read
        .option('header', 'false')
        .option('delimiter', '\t')
        .schema(tweetSchema)
        .csv(file))

In [None]:
tweets.count()

Out[14]: 848241

In [None]:
# from pyspark.sql.functions import col

# # Assuming 'geo' is the name of the column
# non_null_geo_count = tweets.filter(col("geo") != "None").count()

# print("Number of rows where geo column value is not 'None':", non_null_geo_count)
# #431 is meaningless to visualize

Number of rows where geo column value is not 'None': 431


In [None]:
import pyspark.sql.functions as F
tweets = tweets.select('text', 'followers_count','created_at')


In [None]:
# from pyspark.sql.functions import col

# # Count null values in the 'created_at' column
# null_count = tweets.filter(col("created_at").isNull()).count()

# print("Number of null values in the 'created_at' column:", null_count)


Number of null values in the 'created_at' column: 3054


In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import re


# Define the UDF
def is_retweet(text):
    if re.match(r"^RT\b", text):
        return 1
    else:
        return 0

# Register the UDF
is_retweet_udf = udf(is_retweet, IntegerType())

# Apply the UDF to create a new column 'is_retweet'
tweets = tweets.withColumn('is_retweet', is_retweet_udf(tweets['text']))
tweets = tweets.na.drop(subset=['text'])
tweets = tweets.na.drop(subset=['created_at'])
# Show the DataFrame with the new column
tweets.display()


In [None]:
from pyspark.sql.functions import to_timestamp, date_format

# Assuming 'created_at' is the name of the column
tweets_with_formatted_date = tweets.withColumn('created_at_ts', to_timestamp('created_at', 'EEE MMM dd HH:mm:ss Z yyyy')) \
                                    .withColumn('created_at_date', date_format('created_at_ts', 'yyyy-MM-dd HH:mm:ss'))

# Display the DataFrame with the formatted date
display(tweets_with_formatted_date)


In [None]:
from pyspark.sql.functions import regexp_replace, lower, trim

# Define regex patterns for cleaning
url_pattern = r"http\S+"
non_alpha_pattern = r"[^a-zA-Z\s]"
extra_space_pattern = r"\s+"

# Apply regex replacements
tweets_clean = tweets.withColumn('text', regexp_replace('text', url_pattern, "")) \
                    .withColumn('text', regexp_replace('text', non_alpha_pattern, " ")) \
                    .withColumn('text', regexp_replace('text', extra_space_pattern, " ")) \
                    .withColumn('text', lower('text')) \
                    .withColumn('text', trim('text'))

# Display the cleaned DataFrame
display(tweets_clean)


In [None]:
pip install TextBlob

Python interpreter will be restarted.
Collecting TextBlob
  Downloading textblob-0.18.0.post0-py3-none-any.whl (626 kB)
Collecting nltk>=3.8
  Downloading nltk-3.8.1-py3-none-any.whl (1.5 MB)
Collecting tqdm
  Downloading tqdm-4.66.4-py3-none-any.whl (78 kB)
Collecting regex>=2021.8.3
  Downloading regex-2024.4.28-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (773 kB)
Installing collected packages: tqdm, regex, nltk, TextBlob
Successfully installed TextBlob-0.18.0.post0 nltk-3.8.1 regex-2024.4.28 tqdm-4.66.4
Python interpreter will be restarted.


In [None]:
from textblob import TextBlob
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# Define a function to calculate sentiment scores using TextBlob
def calculate_sentiment(text):
    blob = TextBlob(text)
    return blob.sentiment.polarity

# Register the UDF
sentiment_udf = udf(calculate_sentiment, FloatType())

# Apply the UDF to create a new column 'sentiment'
tweets_clean = tweets_clean.withColumn('sentiment', sentiment_udf(tweets_clean['text']))

# Display the DataFrame with sentiment scores
display(tweets_clean)


In [None]:
from pyspark.sql.types import IntegerType

def map_sentiment(score):
    if score >= 0.33:
        return 1  # Positive sentiment
    elif score <= -0.66:
        return -1  # Negative sentiment
    else:
        return 0  # Neutral sentiment

# Register the UDF
sentiment_integer_udf = udf(map_sentiment, IntegerType())

# Convert sentiment scores to integers
tweets_clean = tweets_clean.withColumn('sentiment_int', sentiment_integer_udf(tweets_clean['sentiment']))

# Display the DataFrame with integer sentiment values
display(tweets_clean)


text,followers_count,created_at,is_retweet,sentiment,sentiment_int
rt johnfugelsang black friday when the tells the to go save the economy they ve been looting for the last months,162,Fri Nov 25 16:59:12 +0000 2022,1,-0.083333336,0
rt arsenal black friday get up to off on arsenal direct,11,Fri Nov 25 16:59:12 +0000 2022,1,-0.033333335,0
some grounds around dublin town no black friday sales from me this year i m no jeff bezos lads but thank you to,5133,Fri Nov 25 16:59:12 +0000 2022,0,0.083333336,0
rt endymionva you want to get wet for the holidays black friday sale going on now a mega off all work sale ends on sunday,116,Fri Nov 25 16:59:12 +0000 2022,1,-0.13333334,0
rt angel funsized hours rt follow cryptocoincoach amp neblioteam be active on profile tweet on timeline nebl next gem o,99,Fri Nov 25 16:59:12 +0000 2022,1,-0.06666667,0
jeremyduda saw the video of the protest we had more people at our house yesterday than is present at the az cap,242,Fri Nov 25 16:59:12 +0000 2022,0,0.25,0
rt rhyheimx black friday phatrabbitkill rhyheimx alternativax,88,Fri Nov 25 16:59:13 +0000 2022,1,-0.16666667,0
rt africanowonline on black friday amazonworkers in countries strike and protest despicable treatment via,450,Fri Nov 25 16:59:13 +0000 2022,1,-0.16666667,0
ellinainthesky hi i m toni i graphic design tshirts sweatshirts and also make custom designs my whole shop is,3949,Fri Nov 25 16:59:12 +0000 2022,0,0.1,0
rt freydis moon black friday sale paperbacks amp stickers off poetry amp tarot off,294,Fri Nov 25 16:59:13 +0000 2022,1,-0.16666667,0


In [None]:
mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, "weclouddata-test/", "my_s3")

Mounting weclouddata-test/
/mnt/my_s3 has been unmounted.
The bucket weclouddata-test/ was mounted to my_s3 



In [None]:

cleanOut = "mnt/my_s3/tweets_clean_1.csv"

(tweets_clean.write                       # Our DataFrameWriter
  .option("delimiter", "\t")  
  .option("header", "true")
  .mode("overwrite")               # Replace existing files
  .csv(cleanOut)               # Write DataFrame to csv files
)

In [None]:

cleanIn = "/weclouddata-test/tweets_clean.csv"
cdr = (spark.read
       .option("header", "true")
       .option("delimiter", "\t")
       .csv(cleanIn)
      )

In [None]:
display(cdr)

In [None]:
from pyspark.ml.feature import NGram, VectorAssembler, StopWordsRemover, HashingTF, IDF, Tokenizer, StringIndexer, NGram, ChiSqSelector, CountVectorizer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Filter out rows with null values in the 'text' column
tweets_clean = cdr.dropna(subset=["text"])
tweets_clean = tweets_clean.select('text', 'sentiment_int')
# Define data splitting
train, test = tweets_clean.randomSplit([0.9, 0.1], seed=20200819)
# re sample
# Define label encoding
label_encoder = StringIndexer(inputCol="sentiment_int", outputCol="label")

# Define pipeline stages
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="1gram_idf", minDocFreq=5)
ngram = NGram(n=2, inputCol="filtered", outputCol="2gram")
ngram_hashingtf = HashingTF(inputCol="2gram", outputCol="2gram_tf", numFeatures=20000)
ngram_idf = IDF(inputCol='2gram_tf', outputCol="2gram_idf", minDocFreq=5) 
assembler = VectorAssembler(inputCols=["1gram_idf", "2gram_tf"], outputCol="rawFeatures")
selector = ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")

# Define classifiers
rf = RandomForestClassifier(numTrees=100)


# Build pipeline with classifiers
pipeline_rf = Pipeline(stages=[label_encoder, tokenizer, stopword_remover, cv, idf, ngram, ngram_hashingtf, ngram_idf, assembler, selector, rf])

# Train the pipeline for RandomForestClassifier
pipeline_model_rf = pipeline_rf.fit(train)

# Make predictions for RandomForestClassifier
prediction_rf = pipeline_model_rf.transform(test)


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Evaluate RandomForestClassifier
evaluator_rf = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
accuracy_rf = evaluator_rf.evaluate(prediction_rf)

# Precision
precision_rf = evaluator_rf.evaluate(prediction_rf, {evaluator_rf.metricName: "weightedPrecision"})

# Recall
recall_rf = evaluator_rf.evaluate(prediction_rf, {evaluator_rf.metricName: "weightedRecall"})

# F1-score
f1_score_rf = evaluator_rf.evaluate(prediction_rf, {evaluator_rf.metricName: "f1"})

# Print accuracy, precision, recall, and F1-score
print("Random Forest - Accuracy Score: {0:.4f}".format(accuracy_rf))
print("Random Forest - Precision: {0:.4f}".format(precision_rf))
print("Random Forest - Recall: {0:.4f}".format(recall_rf))
print("Random Forest - F1 Score: {0:.4f}".format(f1_score_rf))


Random Forest - Accuracy Score: 0.9290
Random Forest - Precision: 0.9335
Random Forest - Recall: 0.9290
Random Forest - F1 Score: 0.9031


In [None]:
from pyspark.ml.feature import NGram, VectorAssembler, StopWordsRemover, HashingTF, IDF, Tokenizer, StringIndexer, NGram, ChiSqSelector, CountVectorizer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Filter out rows with null values in the 'text' column
tweets_clean = cdr.dropna(subset=["text"])
tweets_clean = tweets_clean.select('text', 'sentiment_int')
# Define data splitting
train, test = tweets_clean.randomSplit([0.9, 0.1], seed=20200819)

# Define label encoding
label_encoder = StringIndexer(inputCol="sentiment_int", outputCol="label")

# Define pipeline stages
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="1gram_idf", minDocFreq=5)
ngram = NGram(n=2, inputCol="filtered", outputCol="2gram")
ngram_hashingtf = HashingTF(inputCol="2gram", outputCol="2gram_tf", numFeatures=20000)
ngram_idf = IDF(inputCol='2gram_tf', outputCol="2gram_idf", minDocFreq=5) 
assembler = VectorAssembler(inputCols=["1gram_idf", "2gram_tf"], outputCol="rawFeatures")
selector = ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")

# Define classifier (Decision Tree)
dt = DecisionTreeClassifier()

# Build pipeline with Decision Tree classifier
pipeline_dt = Pipeline(stages=[label_encoder, tokenizer, stopword_remover, cv, idf, ngram, ngram_hashingtf, ngram_idf, assembler, selector, dt])

# Train the pipeline for Decision Tree classifier
pipeline_model_dt = pipeline_dt.fit(train)

# Make predictions for Decision Tree classifier
prediction_dt = pipeline_model_dt.transform(test)


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Evaluate DecisionTreeClassifier
evaluator_dt = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
accuracy_dt = evaluator_dt.evaluate(prediction_dt)

# Print accuracy, precision, recall, and F1-score for Decision Tree classifier
print("Decision Tree - Accuracy Score: {0:.4f}".format(accuracy_dt))



Decision Tree - Accuracy Score: 0.9485


In [None]:
from pyspark.ml.classification import LogisticRegression
# Filter out rows with null values in the 'text' column
tweets_clean = cdr.dropna(subset=["text"])
tweets_clean = tweets_clean.select('text', 'sentiment_int')
# Define data splitting
train, test = tweets_clean.randomSplit([0.9, 0.1], seed=20200819)
# Define classifier (Logistic Regression)
lr = LogisticRegression(maxIter=100)

# Build pipeline with Logistic Regression classifier
pipeline_lr = Pipeline(stages=[label_encoder, tokenizer, stopword_remover, cv, idf, ngram, ngram_hashingtf, ngram_idf, assembler, selector, lr])

# Train the pipeline for Logistic Regression classifier
pipeline_model_lr = pipeline_lr.fit(train)

# Make predictions for Logistic Regression classifier
prediction_lr = pipeline_model_lr.transform(test)


In [None]:
# Evaluate Logistic Regression model
accuracy_lr = evaluator_dt.evaluate(prediction_lr)

# Print accuracy for Logistic Regression model
print("Logistic Regression - Accuracy Score: {0:.4f}".format(accuracy_lr))


Logistic Regression - Accuracy Score: 0.9884


In [None]:
#lr have highest score so pick it as base line model

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Evaluate LogisticRegression
evaluator_lr = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
accuracy_lr = evaluator_lr.evaluate(prediction_lr)

# Precision
precision_lr = evaluator_lr.evaluate(prediction_lr, {evaluator_lr.metricName: "weightedPrecision"})

# Recall
recall_lr = evaluator_lr.evaluate(prediction_lr, {evaluator_lr.metricName: "weightedRecall"})

# F1-score
f1_score_lr = evaluator_lr.evaluate(prediction_lr, {evaluator_lr.metricName: "f1"})

# Print accuracy, precision, recall, and F1-score
print("Logistic Regression - Accuracy Score: {0:.4f}".format(accuracy_lr))
print("Logistic Regression - Precision: {0:.4f}".format(precision_lr))
print("Logistic Regression - Recall: {0:.4f}".format(recall_lr))
print("Logistic Regression - F1 Score: {0:.4f}".format(f1_score_lr))


Logistic Regression - Accuracy Score: 0.9884
Logistic Regression - Precision: 0.9885
Logistic Regression - Recall: 0.9884
Logistic Regression - F1 Score: 0.9884


In [None]:
tweets_clean_lr = cdr.dropna(subset=["text"])
# transform for hole dataset
predictions_lr_all = pipeline_model_lr.transform(tweets_clean_lr)

In [None]:
# Select specific columns from the DataFrame
predictions_lr_to_csv = predictions_lr_all.select("text", "sentiment_int",'label','prediction', 'followers_count','created_at','is_retweet')

In [None]:

cleanOut = "mnt/my_s3/prediction_lr/prediction_lr.csv"

(predictions_lr_to_csv.write                       # Our DataFrameWriter
  .option("delimiter", "\t")  
  .option("header", "true")
  .mode("overwrite")               # Replace existing files
  .csv(cleanOut)               # Write DataFrame to csv files
)

In [None]:
#only one feature no need to check feature importances

In [None]:
# Select specific columns from the DataFrame
predictions_all_to_csv = predictions_all.select("text", "sentiment_int",'label','prediction', 'followers_count','created_at','is_retweet')


In [None]:

cleanOut = "mnt/my_s3/prediction_all_1.csv"

(predictions_all_to_csv.write                       # Our DataFrameWriter
  .option("delimiter", "\t")  
  .option("header", "true")
  .mode("overwrite")               # Replace existing files
  .csv(cleanOut)               # Write DataFrame to csv files
)

In [None]:
file = "mnt/my_s3/prediction/*"
prediction_all_1 = (spark.read
       .option("header", "true")
       .option("delimiter", "\t")
       .csv(file)
      )