In [0]:
!pip install textblob

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from textblob import TextBlob
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, StructField, StructType, IntegerType
import pyspark.sql.functions as F
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import NGram, VectorAssembler, StopWordsRemover, HashingTF, IDF, Tokenizer, StringIndexer, ChiSqSelector, VectorAssembler, CountVectorizer
from pyspark.ml.classification import LogisticRegression,DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
import boto3

In [0]:
def mount_s3_bucket(access_key, secret_key, bucket_name, mount_folder):
  ACCESS_KEY_ID = access_key
  SECRET_ACCESS_KEY = secret_key
  ENCODED_SECRET_KEY = SECRET_ACCESS_KEY.replace("/", "%2F")

  print ("Mounting", bucket_name)

  try:
    # Unmount the data in case it was already mounted.
    dbutils.fs.unmount("/mnt/%s" % mount_folder)
    
  except:
    # If it fails to unmount it most likely wasn't mounted in the first place
    print ("Directory not unmounted: ", mount_folder)
    
  finally:
    # Lastly, mount our bucket.
    dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY_ID, ENCODED_SECRET_KEY, bucket_name), "/mnt/%s" % mount_folder)
    #dbutils.fs.mount("s3a://"+ ACCESS_KEY_ID + ":" + ENCODED_SECRET_KEY + "@" + bucket_name, mount_folder)
    print ("The bucket", bucket_name, "was mounted to", mount_folder, "\n")
    

In [0]:
# Set AWS programmatic access credentials
ACCESS_KEY = "AKIA6NYFE23R7RKKBOHM"
SECRET_ACCESS_KEY = "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"

In [0]:
mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, "weclouddata/twitter", "big_data_project")

In [0]:
%fs ls /mnt/big_data_project/

In [0]:
%fs ls /mnt/big_data_project/StudentLoanRelief/2022/12/13/02/


In [0]:
path = '/mnt/big_data_project/StudentLoanRelief/*/*/*/*/*'

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName('Big_data_project') \
        .getOrCreate()
print('Session created')

sc = spark.sparkContext

In [0]:
tweetsSchema = StructType([
    StructField('id', StringType(), True),
    StructField('user_name', StringType(), True),
    StructField('screen_name', StringType(), True),
    StructField('text', StringType(), True),
    StructField('followers_count', IntegerType(), True),
    StructField('location', StringType(), True),
    StructField('geo', StringType(), True),
    StructField('created_at', StringType(), True)])

In [0]:
df = spark.read.option('header',False).option('delimiter', '\t').schema(tweetsSchema).csv(path)


In [0]:
# cache the dataframe for faster iteration
df.cache() 

# run the count action to materialize the cache
df.count()

In [0]:
display(df)

## 2. Text Cleaning Preprocessing

In [0]:
df_clean = df.withColumn('text', F.regexp_replace('text', r"http\S+", "")) \
                    .withColumn('text', F.regexp_replace('text', r"[^a-zA-z]", " ")) \
                    .withColumn('text', F.regexp_replace('text', r"\s+", " ")) \
                    .withColumn('text', F.lower('text')) \
                    .withColumn('text', F.trim('text')) 
display(df_clean)

## 3. Creating a Sentiment

In [0]:
def get_sentiment(text):
    blob=TextBlob(text)
    sentiment = blob.sentiment.polarity
    if sentiment > 0:
        return 'positive'
    if sentiment < 0:
        return 'negative'
    else:
        return 'neutral'

In [0]:
sentiment_udf = udf(get_sentiment,StringType())
df_clean = df_clean.withColumn('sentiment',sentiment_udf(F.col('text')))

In [0]:
display(df_clean)

In [0]:
mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, "wcd-jayanth/", "big_data_project")

In [0]:
%fs ls /mnt/

In [0]:
df_clean.write.option('header', False).option('delimiter','\t').mode('overwrite').csv('/mnt/big_data_project/raw_data.csv')

## 3. Feature Transformer:Tokenizer

In [0]:
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol='text', outputCol='tokens')
tweets_tokenized = tokenizer.transform(df_clean)

display(tweets_tokenized)

## 3. Feature Transformer:Stopword Removal

In [0]:
#now remove stopwords from the review(list of words)    
from pyspark.ml.feature import StopWordsRemover

stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
tweets_stopword = stopword_remover.transform(tweets_tokenized)

display(tweets_stopword)

## 4. Feature Transformer: CountVectorizer (TF - Term Frequency)

In [0]:
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
cv_model = cv.fit(tweets_stopword)
tweets_cv = cv_model.transform(tweets_stopword)

display(tweets_cv)

## 5. Feature Transformer: TF-IDF Vectorization

In [0]:
from pyspark.ml.feature import HashingTF, IDF

idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
idf_model = idf.fit(tweets_cv)
tweets_idf = idf_model.transform(tweets_cv)

display(tweets_idf)

## 6. Label Encoder

In [0]:
from pyspark.ml.feature import StringIndexer

label_encoder = StringIndexer(inputCol = "sentiment", outputCol = "label")
le_model = label_encoder.fit(tweets_idf)
tweets_label = le_model.transform(tweets_idf)

display(tweets_label)

## 7. Model Training: Logistic Regression Classifier

In [0]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=100)

lr_model = lr.fit(tweets_label)

predictions = lr_model.transform(tweets_label)

display(predictions)

## 8. Model Evaluation

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
roc_auc = evaluator.evaluate(predictions)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))


In [0]:
(predictions.write.mode("overwrite")
 .parquet('/mnt/big_data_project/raw_data.csv/big_data_project_predictions.parquet')
)

In [0]:
display(predictions)

## 9. Putting a pipeline together

In [0]:
from pyspark.ml.feature import NGram, VectorAssembler, StopWordsRemover, HashingTF, IDF, Tokenizer, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Use 90% cases for training, 10% cases for testing
train, test = df_clean.randomSplit([0.9, 0.1], seed=20200819)

# Create transformers for the ML pipeline
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="1gram_idf", minDocFreq=5) #minDocFreq: remove sparse terms
assembler = VectorAssembler(inputCols=["1gram_idf"], outputCol="features")
label_encoder= StringIndexer(inputCol = "sentiment", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, assembler, label_encoder, lr])

pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(test.count())
roc_auc = evaluator.evaluate(predictions)

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))

## 10. Ngram Features

In [0]:
from pyspark.ml.feature import NGram, VectorAssembler, StopWordsRemover, HashingTF, IDF, Tokenizer, StringIndexer, NGram, ChiSqSelector, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Use 90% cases for training, 10% cases for testing
train, test = df_clean.randomSplit([0.9, 0.1], seed=20200819)

# label
label_encoder= StringIndexer(inputCol = "sentiment", outputCol = "label")

# Create transformers for the ML pipeline
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="1gram_idf", minDocFreq=5) #minDocFreq: remove sparse terms
ngram = NGram(n=2, inputCol="filtered", outputCol="2gram")
ngram_hashingtf = HashingTF(inputCol="2gram", outputCol="2gram_tf", numFeatures=20000)
ngram_idf = IDF(inputCol='2gram_tf', outputCol="2gram_idf", minDocFreq=5) 

# Assemble all text features
assembler = VectorAssembler(inputCols=["1gram_idf", "2gram_tf"], outputCol="rawFeatures")

# Chi-square variable selection
selector = ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")

# Regression model estimator
lr = LogisticRegression(maxIter=100)

# Build the pipeline
pipeline = Pipeline(stages=[label_encoder, tokenizer, stopword_remover, cv, idf, ngram, ngram_hashingtf, ngram_idf, assembler, selector, lr])

# Pipeline model fitting
pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(test.count())
roc_auc = evaluator.evaluate(predictions)

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))