In [45]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline
import re
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [49]:
# Initialize a Spark session
spark = SparkSession.builder.appName("TwitterSentimentAnalysis").master('local[*]').getOrCreate()

ConnectionRefusedError: [Errno 61] Connection refused

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("branch", StringType(), True),
    StructField("sentiment", StringType(), True),
    StructField("tweet", StringType(), True)
])

training = spark.read.csv("twitter_training.csv", header=True, schema=schema)
validation = spark.read.csv("twitter_validation.csv", header=True, schema=schema)


In [47]:
# Stop the Spark session
spark.stop()


ConnectionRefusedError: [Errno 61] Connection refused

In [None]:
training.printSchema()
validation.printSchema()

In [None]:
training.show()

In [None]:
validation.show()

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Group data by sentiment and count the occurrences
sentiment_counts = training.groupBy('sentiment').count().orderBy('sentiment')

# Collect data to the driver
sentiment_counts_collect = sentiment_counts.collect()

# Extract sentiment labels and counts
sentiments = [row['sentiment'] for row in sentiment_counts_collect]
counts = [row['count'] for row in sentiment_counts_collect]

# Plotting
plt.barh(sentiments, counts, color=sns.color_palette('Dark2'))
plt.gca().spines[['top', 'right']].set_visible(False)
plt.xlabel('Count')
plt.ylabel('Sentiment')
plt.title('Sentiment Distribution')
plt.show()


In [None]:
from pyspark.sql.functions import sum, col

# Count null values in each column
missing_values1 = training.agg(*[sum(col(c).isNull().cast("int")).alias(c) for c in training.columns])

# Show the result
missing_values1.show()


In [None]:
# Drop rows with any null values
training = training.na.drop()

# Show the updated DataFrame
training.show()


In [None]:
import matplotlib.pyplot as plt

# Calculate sentiment distribution
sentiment_distribution = training.groupBy('sentiment').count().orderBy('sentiment').toPandas()

# Define colors
colors = ['red', 'green', 'blue', 'gray']

# Create subplots
fig, axs = plt.subplots(1, 2, figsize=(16, 6))

# Plotting pie chart for sentiment distribution with custom colors
axs[0].pie(sentiment_distribution['count'], labels=sentiment_distribution['sentiment'], autopct='%1.1f%%',
            startangle=90, wedgeprops={'linewidth': 0.5}, textprops={'fontsize': 12},
            explode=[0.1, 0.1, 0.1, 0.1], colors=colors, shadow=True)
axs[0].set_title('Sentiment Distribution - Pie Chart')

# Plotting bar plot for sentiment distribution
axs[1].bar(sentiment_distribution['sentiment'], sentiment_distribution['count'], color=colors)
axs[1].set_title('Sentiment Distribution - Bar Plot')
axs[1].set_xlabel('Sentiment')
axs[1].set_ylabel('Count')
axs[1].tick_params(axis='x', rotation=45)
axs[1].grid(axis='y', linestyle='--', alpha=0.7)
# Add text on top of each bar in the bar plot
for i, count in enumerate(sentiment_distribution['count']):
    axs[1].text(i, count + 0.5, str(count), ha='center', va='bottom', fontsize=10)

plt.tight_layout()
plt.show()


In [None]:
# Filter out rows with non-string values in the specified column
def filter_non_string(df, column):
    df = df.filter(df[column].isNotNull())
    df = df.withColumn(column, df[column].cast(StringType()))
    return df

In [None]:
# Convert text to lowercase
def normalize_text(text):
    return text.lower()

In [None]:
# Remove HTML tags from the text
def remove_html_tags(text):
    return re.sub(r'<.*?>', '', text)

In [None]:
# Remove URLs or hyperlinks from the text
def remove_urls(text):
    return re.sub(r'http\S+|www\S+', '', text)

In [None]:
# Exclude numerical digits from the text
def remove_numbers(text):
    return re.sub(r'\d+', '', text)

In [None]:
# Remove punctuation marks from the text
def remove_punctuation(text):
    return re.sub(r'[^\w\s]', '', text)

In [None]:
# Remove emojis from the text
def remove_emojis(text):
    emoji_pattern = re.compile("["
                               u"\U0001F600-\U0001F64F"  # emoticons
                               u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                               u"\U0001F680-\U0001F6FF"  # transport & map symbols
                               u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                               u"\U00002500-\U00002BEF"  # chinese char
                               u"\U00002702-\U000027B0"
                               u"\U00002702-\U000027B0"
                               u"\U000024C2-\U0001F251"
                               u"\U0001f926-\U0001f937"
                               u"\U00010000-\U0010ffff"
                               u"\u2640-\u2642"
                               u"\u2600-\u2B55"
                               u"\u200d"
                               u"\u23cf"
                               u"\u23e9"
                               u"\u231a"
                               u"\ufe0f"  # dingbats
                               u"\u3030"
                               "]+", flags=re.UNICODE)
    return emoji_pattern.sub(r'', text)


In [None]:
from pyspark.ml.feature import RegexTokenizer

def tokenize_text(dataframe, input_col, output_col):
    """Tokenize the text column in the given DataFrame."""
    tokenizer = RegexTokenizer(inputCol=input_col, outputCol=output_col, pattern="\\W")
    return tokenizer.transform(dataframe)

In [None]:
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

def remove_stopwords(tokens):
    """Eliminate common stopwords from the tokenized text using PySpark."""
    remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
    return remover.transform(tokens).select("filtered_tokens")

In [None]:
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType

def vectorize_data(text_data):
    """Vectorize the tokenized text data using TF-IDF in PySpark."""
    # Join the tokenized text into strings
    text_data_strings = [" ".join(tokens) for tokens in text_data]

    # Create DataFrame from text data
    text_df = spark.createDataFrame(zip(range(len(text_data_strings)), text_data_strings), ["id", "text"])

    # Tokenize text
    tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
    tokenized_df = tokenizer.transform(text_df)

    # Count Vectorizer
    cv = CountVectorizer(inputCol="tokens", outputCol="raw_features")
    cv_model = cv.fit(tokenized_df)
    count_vectorized_df = cv_model.transform(tokenized_df)

    # Compute IDF
    idf = IDF(inputCol="raw_features", outputCol="features")
    idf_model = idf.fit(count_vectorized_df)
    tfidf_vectorized_df = idf_model.transform(count_vectorized_df)

    # Select only the features column
    select_features_udf = udf(lambda features: features.toArray().tolist(), ArrayType())
    return tfidf_vectorized_df.select("id", select_features_udf("features")).withColumnRenamed("<lambda>(features)", "tfidf_vectors"), cv_model


In [None]:
# Define UDFs for preprocessing steps
normalize_text_udf = udf(normalize_text, StringType())
remove_html_tags_udf = udf(remove_html_tags, StringType())
remove_urls_udf = udf(remove_urls, StringType())
remove_numbers_udf = udf(remove_numbers, StringType())
remove_punctuation_udf = udf(remove_punctuation, StringType())
remove_emojis_udf = udf(remove_emojis, StringType())

# Preprocess text
def preprocess_text(df, column):
    df = filter_non_string(df, column)
    df = df.withColumn(column, normalize_text_udf(col(column)))
    df = df.withColumn(column, remove_html_tags_udf(col(column)))
    df = df.withColumn(column, remove_urls_udf(col(column)))
    df = df.withColumn(column, remove_numbers_udf(col(column)))
    df = df.withColumn(column, remove_punctuation_udf(col(column)))
    df = df.withColumn(column, remove_emojis_udf(col(column)))
    return df

# Usage:
df1_processed = preprocess_text(training, 'tweet')

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer

def preprocess_text(df):
    # Filter non-string values in the 'tweet' column
    df = filter_non_string(df, 'tweet')
    # Define UDFs for text preprocessing steps
    normalize_text_udf = udf(normalize_text, StringType())
    remove_html_tags_udf = udf(remove_html_tags, StringType())
    remove_urls_udf = udf(remove_urls, StringType())
    remove_numbers_udf = udf(remove_numbers, StringType())
    remove_punctuation_udf = udf(remove_punctuation, StringType())
    remove_emojis_udf = udf(remove_emojis, StringType())

    # Apply text preprocessing steps using DataFrame transformations
    df = df.withColumn("tweet", normalize_text_udf("tweet"))
    df = df.withColumn("tweet", remove_html_tags_udf("tweet"))
    df = df.withColumn("tweet", remove_urls_udf("tweet"))
    df = df.withColumn("tweet", remove_numbers_udf("tweet"))
    df = df.withColumn("tweet", remove_punctuation_udf("tweet"))

    # Tokenize text
    tokenizer = Tokenizer(inputCol="tweet", outputCol="tokens")
    df = tokenizer.transform(df)

    # Remove stopwords
    remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
    df = remover.transform(df).select("branch","tweet",'sentiment', "filtered_tokens")


    return df

# Assuming 'training' is your Spark DataFrame
df_processed = preprocess_text(training)


In [None]:
df_processed.show()

In [None]:
from pyspark.sql.functions import concat_ws

# Convert list of words into a single string for each entry in 'tweet' column
df_processed = df_processed.withColumn("tweet", concat_ws(" ", "filtered_tokens"))

# Show the updated DataFrame
df_processed.show()

In [None]:
df_processed.show()


In [None]:
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

# Split data into training and testing sets
train_df, test_df = df_processed.randomSplit([0.8, 0.2], seed=42)

# Assuming 'sentiment' column is your label column
indexer = StringIndexer(inputCol="sentiment", outputCol="label")
train_df = indexer.fit(train_df).transform(train_df)
test_df = indexer.fit(test_df).transform(test_df)

# Convert text data to TF-IDF features
hashingTF = HashingTF(inputCol="filtered_tokens", outputCol="raw_features", numFeatures=10000)
idf = IDF(inputCol="raw_features", outputCol="features")
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10, regParam=0.01)

pipeline = Pipeline(stages=[hashingTF, idf, lr])

# Train Logistic Regression classifier
lr_model = pipeline.fit(train_df)

# Predict on the testing data
predictions = lr_model.transform(test_df)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Accuracy:", accuracy)


In [None]:
from pyspark.ml.feature import HashingTF, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

# Split data into training and testing sets
train_df, test_df = df_processed.randomSplit([0.8, 0.2], seed=42)

# Assuming 'sentiment' column is your label column
indexer = StringIndexer(inputCol="sentiment", outputCol="label")
train_df = indexer.fit(train_df).transform(train_df)
test_df = indexer.fit(test_df).transform(test_df)

# Convert text data to Bag of Words features
# Using HashingTF
# hashingTF = HashingTF(inputCol="filtered_tokens", outputCol="raw_features", numFeatures=10000)
# Using CountVectorizer
cv = CountVectorizer(inputCol="filtered_tokens", outputCol="features", vocabSize=50000)

lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=1000, regParam=0.01)

pipeline = Pipeline(stages=[cv, lr])

# Train Logistic Regression classifier
lr_model = pipeline.fit(train_df)

# Predict on the testing data
predictions = lr_model.transform(test_df)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Accuracy:", accuracy)


In [None]:
# Convert text data to TF-IDF features
hashingTF = HashingTF(inputCol="filtered_tokens", outputCol="raw_features", numFeatures=10000)
idf = IDF(inputCol="raw_features", outputCol="features")
lr2= LogisticRegression(featuresCol='features', labelCol='label')

pipeline2 = Pipeline(stages=[hashingTF,idf, lr2])
# Fit the model on the training data
lr2_model = pipeline2.fit(train_df)

# Make predictions on the test data
predictions1 = lr_model.transform(test_df)

# Evaluate the model
evaluator1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy1 = evaluator1.evaluate(predictions1)

print("Logistic Regression Accuracy: %f" % accuracy)

In [None]:
# Save the model
lrmodel.save("lrmodel")


In [None]:
# Save the model
lrModel.save("lrmodel")
