In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.types import DoubleType
from pyspark.ml.linalg import Vectors
import re
from pymongo import MongoClient


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.types import DoubleType
from pyspark.ml.linalg import Vectors
import re
from pymongo import MongoClient

class Preprocessor:
    def __init__(self):
        self.stemmer = PorterStemmer()
        self.lemmatizer = WordNetLemmatizer()

    def remove_hashtags(self, text):
        return re.sub(r'#\w+', '', text)

    def remove_emojis(self, text):
        emoji_pattern = re.compile("["
                                  u"\U0001F600-\U0001F64F"  # emoticons
                                  u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                                  u"\U0001F680-\U0001F6FF"  # transport & map symbols
                                  u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                                  "]+", flags=re.UNICODE)
        return emoji_pattern.sub(r'', text)

    def preprocess_text(self, text):
        text = self.remove_hashtags(text)
        text = self.remove_emojis(text)
        # Add more preprocessing steps as per your requirement
        return text

    def stem(self, text):
        words = text.split()
        stemmed_words = [self.stemmer.stem(word) for word in words]
        return ' '.join(stemmed_words)

    def lemmatize(self, text):
        words = text.split()
        lemmatized_words = [self.lemmatizer.lemmatize(word) for word in words]
        return ' '.join(lemmatized_words)

    def preprocess_dataframe(self, dataframe):
        preprocess_text_udf = udf(self.preprocess_text)
        stem_udf = udf(self.stem)
        lemmatize_udf = udf(self.lemmatize)

        # Remove hashtags and emojis
        dataframe = dataframe.withColumn('preprocessed_text', preprocess_text_udf(col('text')))

        # Stemming
        dataframe = dataframe.withColumn('stemmed_text', stem_udf(col('preprocessed_text')))

        # Lemmatization
        dataframe = dataframe.withColumn('lemmatized_text', lemmatize_udf(col('preprocessed_text')))

        return dataframe

class LabelEncoder:
    def __init__(self):
        self.label_indexer = StringIndexer(inputCol='sentiment', outputCol='label')

    def encode_labels(self, dataframe):
        indexed_data = self.label_indexer.fit(dataframe).transform(dataframe)
        return indexed_data

class SentimentAnalyzer:
    def __init__(self):
        self.spark = SparkSession.builder.appName('SentimentAnalysis').getOrCreate()

    def load_data_from_csv(self, file_path):
        dataframe = self.spark.read.csv(file_path, header=True, inferSchema=True)
        return dataframe

    def create_pipeline(self):
        tokenizer = RegexTokenizer(inputCol='lemmatized_text', outputCol='tokens', pattern='\\W')
        stopword_remover = StopWordsRemover(inputCol='tokens', outputCol='filtered_tokens')
        count_vectorizer = CountVectorizer(inputCol='filtered_tokens', outputCol='features')
        logistic_regression = LogisticRegression(featuresCol='features', labelCol='label')

        pipeline = Pipeline(stages=[tokenizer, stopword_remover, count_vectorizer, logistic_regression])
        return pipeline

    def train_model(self, dataframe):
        pipeline = self.create_pipeline()
        model = pipeline.fit(dataframe)
        return model

    def evaluate_model(self, model, dataframe):
        evaluator = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='rawPrediction', metricName='areaUnderROC')
        predictions = model.transform(dataframe)
        roc_auc = evaluator.evaluate(predictions)
        return roc_auc

    def save_datewise_average_sentiment(self, dataframe, collection_name):
        avg_sentiments = dataframe.groupBy('date').agg({'sentiment_score': 'avg'})
        sentiment_data = []
        for row in avg_sentiments.collect():
            date = row['date']
            avg_sentiment = row['avg(sentiment_score)']
            sentiment_data.append({'date': date, 'average_sentiment': avg_sentiment})

        client = MongoClient()
        db = client['sentiment_analysis']
        collection = db[collection_name]
        collection.insert_many(sentiment_data)

    def run_sentiment_analysis(self, file_path, collection_name):
        dataframe = self.load_data_from_csv(file_path)
        
        preprocessor = Preprocessor()
        dataframe = preprocessor.preprocess_dataframe(dataframe)
        
        label_encoder = LabelEncoder()
        dataframe = label_encoder.encode_labels(dataframe)
        
        model = self.train_model(dataframe)
        roc_auc = self.evaluate_model(model, dataframe)
        
        self.save_datewise_average_sentiment(dataframe, collection_name)
        
        print('ROC AUC:', roc_auc)




In [7]:
# Create SparkSession
spark = SparkSession.builder.getOrCreate()
# Read data from CSV file
df = spark.read.csv("PFV.csv", header=True, inferSchema=True)


# Tokenize the text column
tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\W")
df = tokenizer.transform(df)

# Remove stopwords
stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
df = stopwords_remover.transform(df)



# Label encoding
label_encoder = StringIndexer(inputCol="sentiments", outputCol="encoded_label")
df = label_encoder.fit(df).transform(df)



In [12]:
from pyspark.ml.feature import StringIndexer, RegexTokenizer, StopWordsRemover

# Read data from CSV
df = spark.read.csv("PFV.csv", header=True, inferSchema=True)

# Prepare the pipeline stages
tokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
indexer = StringIndexer(inputCol="sentiments", outputCol="label", handleInvalid="skip")

# Apply the transformations
pipeline = Pipeline(stages=[tokenizer, remover,  indexer])
model = pipeline.fit(df)
transformed_data = model.transform(df)

# Show the transformed data
transformed_data.show()


+---+----------+--------------------+----------+--------------------+--------------------+-----+
|_c0|      date|                text|sentiments|               words|      filtered_words|label|
+---+----------+--------------------+----------+--------------------+--------------------+-----+
|  0|2022-12-19|#Hungary had rece...|       Pos|[hungary, had, re...|[hungary, receive...|  0.0|
|  1|2023-03-17|This might help @...|       Pos|[this, might, hel...|[might, help, mic...|  0.0|
|  2|2022-07-03|#HongKong and #Ma...|       Neg|[hongkong, and, m...|[hongkong, macau,...|  1.0|
|  3|2023-01-13|I got the first d...|       Neg|[i, got, the, fir...|[got, first, dose...|  1.0|
|  4|2021-06-30|.@UPS delivers fi...|       Pos|[ups, delivers, f...|[ups, delivers, f...|  0.0|
|  5|2021-06-01|@BorisJohnson GIV...|       Pos|[borisjohnson, gi...|[borisjohnson, gi...|  0.0|
|  9|2021-06-24|Got my 1st #Pfize...|       Pos|[got, my, 1st, pf...|[got, 1st, pfizer...|  0.0|
| 10|2022-02-27|#Moderna and #

In [13]:
avg_sentiments = transformed_data.groupBy('date').agg({'label': 'avg'})
avg_sentiments.show()

+----------+-------------------+
|      date|         avg(label)|
+----------+-------------------+
|2023-01-21| 0.3333333333333333|
|2021-11-03|                0.5|
|2022-10-05|                0.4|
|2023-05-01| 0.6363636363636364|
|2023-05-18|0.45454545454545453|
|2023-04-21| 0.5833333333333334|
|2023-04-17| 0.4166666666666667|
|2022-10-07| 0.7857142857142857|
|2021-12-23|                0.5|
|2023-04-28| 0.5555555555555556|
|2022-05-17| 0.5384615384615384|
|2023-02-10|0.45454545454545453|
|2022-03-30|                0.4|
|2021-10-25|0.45454545454545453|
|2021-11-15| 0.5714285714285714|
|2021-08-30| 0.7777777777777778|
|2023-03-11|                0.5|
|2023-04-26| 2.3636363636363638|
|2023-05-04|                0.5|
|2023-03-10| 0.7142857142857143|
+----------+-------------------+
only showing top 20 rows



In [17]:
import pandas as pd

pandas_df = avg_sentiments.toPandas()
pandas_df.to_csv('PFV_op.csv', index=False, encoding='utf-8')

# SAVE TO MOGODB


In [18]:
from pymongo import MongoClient

# Connect to MongoDB
client = MongoClient('mongodb://localhost:27017')

# Access the desired database and collection
db = client['PFV']
collection = db['Sentiment_Date_wise']

# Convert pandas DataFrame to a list of dictionaries
data_dict = pandas_df.to_dict(orient='records')

# Insert the data into the MongoDB collection
collection.insert_many(data_dict)

<pymongo.results.InsertManyResult at 0x10966bc8>