In [0]:
# Import a libaray
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import *

In [0]:
# Creating DataFrame using Spark
df = spark.read.load('/FileStore/tables/twitter_training-1.csv', format='csv', sep=',', header='true', inferSchema='true')


In [0]:
df.show(1)

+----+-----------+--------+-----------------------------------------------------+
|2401|Borderlands|Positive|im getting on borderlands and i will murder you all ,|
+----+-----------+--------+-----------------------------------------------------+
|2401|Borderlands|Positive|                                 I am coming to th...|
+----+-----------+--------+-----------------------------------------------------+
only showing top 1 row



In [0]:
# Renaming DataFrame columns using withColumnRenamed()
df = df.withColumnRenamed("2401", "ID") \
       .withColumnRenamed("Borderlands", "Category") \
       .withColumnRenamed("Positive", "Sentiment") \
       .withColumnRenamed("im getting on borderlands and i will murder you all ,", "Text")


In [0]:
df.show(1)

+----+-----------+---------+--------------------+
|  ID|   Category|Sentiment|                Text|
+----+-----------+---------+--------------------+
|2401|Borderlands| Positive|I am coming to th...|
+----+-----------+---------+--------------------+
only showing top 1 row



In [0]:
# Group by 'Sentiment' column and count occurrences of each value
sentiment_counts = df.groupBy('Sentiment').count()

# Show the counts
sentiment_counts.show()


+----------+-----+
| Sentiment|count|
+----------+-----+
|Irrelevant|12990|
|  Positive|20831|
|   Neutral|18318|
|  Negative|22542|
+----------+-----+



In [0]:
# Replace "Irrelevant" with "Neutral" in the "Sentiment" column
df = df.withColumn("Sentiment", when(df["Sentiment"] == "Irrelevant", "Neutral").otherwise(df["Sentiment"]))


In [0]:
# Group by 'Sentiment' column and count occurrences of each value
sentiment_counts = df.groupBy('Sentiment').count()

# Show the counts
sentiment_counts.show()


+---------+-----+
|Sentiment|count|
+---------+-----+
| Positive|20831|
|  Neutral|31308|
| Negative|22542|
+---------+-----+



In [0]:
# Check for null values in a specific column
null_check_Sentiment = df.filter(df['Sentiment'].isNull())

# Show rows with null values in the specific column
null_check_Sentiment.show()


+---+--------+---------+----+
| ID|Category|Sentiment|Text|
+---+--------+---------+----+
+---+--------+---------+----+



In [0]:
# Check for null values in a specific column
null_check_Category = df.filter(df['Category'].isNull())

# Show rows with null values in the specific column
null_check_Category.show()


+---+--------+---------+----+
| ID|Category|Sentiment|Text|
+---+--------+---------+----+
+---+--------+---------+----+



In [0]:
# Text column has null value
# Check for null values in a specific column
null_check_Text = df.filter(df['Text'].isNull())

# Show rows with null values in the specific column
null_check_Text.show()


+----+--------------------+---------+----+
|  ID|            Category|Sentiment|Text|
+----+--------------------+---------+----+
|2411|         Borderlands|  Neutral|null|
|2496|         Borderlands|  Neutral|null|
|2503|         Borderlands|  Neutral|null|
|2532|         Borderlands| Positive|null|
|2595|         Borderlands| Positive|null|
|2595|         Borderlands| Positive|null|
|1602|CallOfDutyBlackop...|  Neutral|null|
|1602|CallOfDutyBlackop...|  Neutral|null|
|1602|CallOfDutyBlackop...|  Neutral|null|
|1613|CallOfDutyBlackop...|  Neutral|null|
|1622|CallOfDutyBlackop...| Negative|null|
|1632|CallOfDutyBlackop...|  Neutral|null|
|1632|CallOfDutyBlackop...|  Neutral|null|
|1708|CallOfDutyBlackop...| Positive|null|
|1717|CallOfDutyBlackop...|  Neutral|null|
|1719|CallOfDutyBlackop...| Positive|null|
|1719|CallOfDutyBlackop...| Positive|null|
|1719|CallOfDutyBlackop...| Positive|null|
|1731|CallOfDutyBlackop...| Negative|null|
|1737|CallOfDutyBlackop...| Negative|null|
+----+-----

In [0]:
# Count the number of null values in the 'Text' column
null_count_Text = df.filter(df['Text'].isNull()).count()

# Display the count of null values in the 'Text' column
print("Number of null values in the 'Text' column:", null_count_Text)


Number of null values in the 'Text' column: 686


In [0]:
# Drop rows with null values in the 'Text' column
df = df.dropna(subset=['Text'])


In [0]:
# Count the number of null values in the 'Text' column
null_count_Text = df.filter(df['Text'].isNull()).count()

# Display the count of null values in the 'Text' column
print("Number of null values in the 'Text' column:", null_count_Text)


Number of null values in the 'Text' column: 0


In [0]:
from pyspark.sql.functions import lower, regexp_replace, split
from pyspark.ml.feature import StopWordsRemover
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
import nltk
nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')

# Function to perform text preprocessing
def preprocess_text_spark(df, input_col, output_col):
    # Convert text to lowercase
    df = df.withColumn(output_col, lower(df[input_col]))
    
    # Remove special characters and punctuation
    df = df.withColumn(output_col, regexp_replace(df[output_col], '[^a-zA-Z\s]', ''))
    
    # Tokenization
    df = df.withColumn(output_col, split(df[output_col], ' '))
    
    # Remove stop words
    stop_words = set(stopwords.words('english'))
    remover = StopWordsRemover(inputCol=output_col, outputCol=output_col + '_filtered', stopWords=list(stop_words))
    df = remover.transform(df).drop(output_col)
    
    # Lemmatization
    lemmatizer = WordNetLemmatizer()
    lemmatize_udf = udf(lambda tokens: [lemmatizer.lemmatize(token) for token in tokens], ArrayType(StringType()))
    df = df.withColumn(output_col, lemmatize_udf(df[output_col + '_filtered'])).drop(output_col + '_filtered')
    
    return df


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [0]:
# Preprocess text using the function
df_preprocessed = preprocess_text_spark(df, 'Text', 'Preprocessed_Text')

# Show the preprocessed DataFrame
df_preprocessed.show(truncate=False)

+----+-----------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ID  |Category   |Sentiment|Text                                                                                                                                                                                                                                                                                                 |Preprocessed_Text                                                                                                                  

In [0]:
df_preprocessed.show(1)

+----+-----------+---------+--------------------+--------------------+
|  ID|   Category|Sentiment|                Text|   Preprocessed_Text|
+----+-----------+---------+--------------------+--------------------+
|2401|Borderlands| Positive|I am coming to th...|[coming, border, ...|
+----+-----------+---------+--------------------+--------------------+
only showing top 1 row



In [0]:
df_preprocessed = df_preprocessed.withColumn('Preprocessed_Sentence', concat_ws(' ', df_preprocessed['Preprocessed_Text']))


In [0]:
df_preprocessed.show(1)

+----+-----------+---------+--------------------+--------------------+---------------------+
|  ID|   Category|Sentiment|                Text|   Preprocessed_Text|Preprocessed_Sentence|
+----+-----------+---------+--------------------+--------------------+---------------------+
|2401|Borderlands| Positive|I am coming to th...|[coming, border, ...|   coming border kill|
+----+-----------+---------+--------------------+--------------------+---------------------+
only showing top 1 row

