In [2]:
pip install pyspark

In [None]:
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.sentiment.vader import SentimentIntensityAnalyzer
import pandas as pd
import re
nltk.download('stopwords')
nltk.download('vader_lexicon')
nltk.download('punkt')
from pyspark.sql.functions import col
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import udf, lower, split, when, col
from pyspark.sql.types import ArrayType, StringType, MapType, FloatType
from pyspark.sql.functions import concat_ws

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression, NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, Tokenizer, HashingTF, IDF, VectorAssembler
from pyspark.ml.linalg import Vectors
from wordcloud import WordCloud
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Twitter BTC Sentiment Analysis").getOrCreate()

In [None]:
# Load the datasets
df_small = spark.read.csv("data/balanced_twitter_btc_small.csv", header=True, inferSchema=True)
df_big = spark.read.csv("data/balanced_twitter_btc_big.csv", header=True, inferSchema=True)
df_unbalanced = spark.read.csv("data/unbalanced_twitter_btc_big.csv", header=True, inferSchema=True)

In [None]:
# Data preprocessing
def preprocess_text(df):
    # Remove URLs
    df = df.withColumn('cleaned_text', regexp_replace('text', '(https?://\S+|www\.\S+)', ''))
    
    # Remove mentions and hashtags
    df = df.withColumn('cleaned_text', regexp_replace('cleaned_text', '@\w+|#\w+', ''))
    
    # Convert to lowercase
    df = df.withColumn('cleaned_text', lower('cleaned_text'))
    
    # Remove special characters and numbers
    df = df.withColumn('cleaned_text', regexp_replace('cleaned_text', '[^a-zA-Z\s]', ''))
    
    return df

df_small = preprocess_text(df_small)
df_big = preprocess_text(df_big)
df_unbalanced = preprocess_text(df_unbalanced)

In [None]:
# Feature extraction pipeline
tokenizer = Tokenizer(inputCol="cleaned_text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
hashingTF = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=1000)
idf = IDF(inputCol="raw_features", outputCol="features")

pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf])

In [None]:
# Train test split
train_data, test_data = df_small.randomSplit([0.8, 0.2], seed=42)

# Fit and transform the pipeline
pipeline_model = pipeline.fit(train_data)
train_features = pipeline_model.transform(train_data)
test_features = pipeline_model.transform(test_data)

In [None]:
# Train Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_features)

# Make predictions
predictions = lr_model.transform(test_features)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")