In [None]:
!pip install pyspark

import os

num_cores = os.cpu_count()
print(num_cores)

2


In [None]:
import nltk
import pyspark.sql.functions as F
import zipfile
import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from pyspark.sql.types import FloatType
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from nltk.corpus import stopwords
from google.colab import drive


spark = SparkSession.builder.appName("DataPreprocessing").getOrCreate()


drive.mount('/content/drive')

nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')
lemmatizer = WordNetLemmatizer()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [None]:
zip_file_path = '/content/drive/MyDrive/Dataset.zip'
extracted_folder_path = '/content/extractedFiles'

with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extracted_folder_path)

print("File has been successfully unzipped to:", extracted_folder_path)

File has been successfully unzipped to: /content/extractedFiles


In [None]:
dataset_path = '/content/extractedFiles'
df = spark.read.option("header", "true").csv(dataset_path)
df.count()

2885735

In [None]:
(df.select("content")).show()

+--------------------+
|             content|
+--------------------+
|JOE BIDEN SAYS HO...|
|                NULL|
|I'm a truth seeke...|
|@ProfPaulPoast He...|
|@SemproniusT @Ric...|
|@jakehase @Grouta...|
|@2_4_5T_2_4_D @ca...|
|@juliet_turner6 N...|
|@BurnsideNotTosh ...|
|America forced Pu...|
|                NULL|
|                   2|
|@RussianEmbassy @...|
|Abi Russia don in...|
|Russia, China, Ir...|
|@SamEBrewster @sa...|
|@NeilLowenthal1 P...|
|@polukhina_sonya ...|
|"@Resist47231614 ...|
|Allows Russia to ...|
+--------------------+
only showing top 20 rows



In [None]:
df = df.dropDuplicates()
df = df.dropna(subset=['content'])
df = df.withColumn("content", F.lower(df["content"]))

def clean_text(text):
    text = re.sub(r'http\S+|www\S+|@\w+', '', text)
    text = re.sub(r'\d+', '', text)
    text = re.sub(r'[^a-zA-Z\s]', '', text)
    text = ' '.join(text.split())
    return text

clean_text_udf = udf(clean_text, StringType())
df = df.withColumn("content", clean_text_udf(df["content"]))

tokenizer = Tokenizer(inputCol="content", outputCol="tokens_unlemmatized")
df = tokenizer.transform(df)

stop_words = list(set(word_tokenize(' '.join(stopwords.words('english')))))
stop_words.extend(['russian', 'u'])
remover = StopWordsRemover(inputCol="tokens_unlemmatized", outputCol="tokens", stopWords=stop_words)
df = remover.transform(df)

lemmatizer = WordNetLemmatizer()
lemmatize_udf = udf(lambda tokens: [lemmatizer.lemmatize(word) for word in tokens], StringType())
df = df.withColumn("tokens", lemmatize_udf(df["tokens"]))

df.select("content", "tokens").show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|content                                                                                                                                                                                                                                                               |tokens                                                                                                                                                                                                              |
+-------------------------------------------

In [None]:
nltk.download('vader_lexicon')

analyzer = SentimentIntensityAnalyzer()

def analyze_sentiment(text):
    compound = analyzer.polarity_scores(text)["compound"]
    return float(compound)

analyze_sentiment_udf = udf(analyze_sentiment, FloatType())

df = df.withColumn("sentiment", analyze_sentiment_udf(df["content"]))

df.select("content", "tokens", "sentiment").show(truncate=False)

[nltk_data] Downloading package vader_lexicon to /root/nltk_data...


+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
|content                                                                                                                                                                                                                                                               |tokens                                                                                                                                                                                                              |sentiment|
+-----------------------

In [None]:
from pyspark.sql.functions import when

conditions = [
    (df['sentiment'] <= -0.5),
    (df['sentiment'] > -0.5) & (df['sentiment'] < 0.5),
    (df['sentiment'] >= 0.5)
]

values = ['Negative', 'Neutral', 'Positive']

df = df.withColumn('Category', when(conditions[0], values[0])
                                .when(conditions[1], values[1])
                                .when(conditions[2], values[2])
                                .otherwise(None))

df.select("content", "tokens", "sentiment", "Category").show(truncate=False)


+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+--------+
|content                                                                                                                                                                                                                                                               |tokens                                                                                                                                                                                                              |sentiment|Category|
+-----

In [None]:
selected_columns = df.select("content", "tokens", "sentiment", "Category")

output_path = "/content/drive/MyDrive/train_final"

selected_columns.write.csv(output_path, header=True, mode="overwrite")
