In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=1a5721300fadd1126f0956ec5da2e1d660ff61ad83bd74ee93130fc9d4e2fc94
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from google.colab import drive
drive.mount('/content/drive')

hdfs_path = "hdfs://user/hadoop/data.csv"

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("TweetClassification").getOrCreate()



In [5]:
df0 = spark.read.csv("drive/MyDrive/TwitterData/bigboy.csv", header=True, inferSchema=True)
df1 = spark.read.csv("drive/MyDrive/TwitterData/datum1.csv", header=True, inferSchema=True)
df2 = spark.read.csv("drive/MyDrive/TwitterData/datum2.csv", header=True, inferSchema=True)
df3 = spark.read.csv("drive/MyDrive/TwitterData/datum3.csv", header=True, inferSchema=True)

In [6]:
from functools import reduce
from pyspark.sql import DataFrame

df0 = df0.drop("number")
dfs = [df0,df1, df2, df3]

df = reduce(DataFrame.union, dfs)



In [77]:
df.printSchema()
print("Columns: ", df.columns)
print("Rows: ", df.count())

root
 |-- id: string (nullable = true)
 |-- full_text: string (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- tokens: string (nullable = true)
 |-- sentiment: integer (nullable = true)

Columns:  ['id', 'full_text', 'hashtags', 'lang', 'user_name', 'tokens', 'sentiment']
Rows:  149


In [76]:

df = df.dropDuplicates()

df = df.na.drop(subset=["full_text"])


In [72]:
df = df.select("id", "full_text", "hashtags", "lang", "user_name")

In [73]:
df.show()

+-------------------+--------------------+--------------------+----+---------------+
|                 id|           full_text|            hashtags|lang|      user_name|
+-------------------+--------------------+--------------------+----+---------------+
|1469001500462620679|#Kholi #DLM #Liqo...|Kholi,DLM,Liqouro...|  en|        TiitweN|
|1469002575668301828|The crew @spiderm...|SpiderManNoWayHom...|  en|      erathrim_|
|1469001504019390473|#RT @houstonisdwa...|             RT,HISD|  en|  HISD_Outreach|
|1469001491436478464|@classicallygia H...|Survivor41,Surviv...|  en|        seb1959|
|1469001500919840770|Signing day nears...|Rutgers,RFootball...|  en|  TodderickHunt|
|1469001489444216838|#HIDAYAT_THE_RAPI...|  HIDAYAT_THE_RAPIST|  en|x5XvlQYxBYrYykF|
|1469001497845321730|@Greene5952 Love ...|              Crypto|  en|   AP_Crypto_TX|
|1469001496255614976|This is why #MMA ...|          MMA,UFC269|  en|  ChiranjitOjha|
|1469001498877116425|What’s up with th...|     AndJustLikeThat|  

In [81]:
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import Tokenizer, StopWordsRemover

nltk.download('stopwords')
nltk.download('punkt')
nltk.download('vader_lexicon')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package vader_lexicon to /root/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


True

In [94]:
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, BooleanType
import re

def remove_emojis(text):
    return text.encode('ascii', 'ignore').decode('ascii')

remove_emojis_udf = udf(remove_emojis, StringType())
df = df.withColumn("full_text", remove_emojis_udf("full_text"))

def is_english(text):
    try:
        text.encode(encoding='utf-8').decode('ascii')
        return True
    except UnicodeDecodeError:
        return False

is_english_udf = udf(is_english, BooleanType())
df = df.filter(is_english_udf(df['full_text']))

tokenizer = Tokenizer(inputCol="full_text", outputCol="tokens")
df = tokenizer.transform(df)

remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
df = remover.transform(df)

df.drop("tokens")


def remove_special_characters(text):
    cleaned_text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
    return cleaned_text

remove_special_characters_udf = udf(remove_special_characters, StringType())

def remove_urls(text):
    cleaned_text = re.sub(r'http\S+', '', text)
    return cleaned_text
remove_urls_udf = udf(remove_urls, StringType())

df = df.withColumn("full_text", remove_special_characters_udf(df["full_text"]))
df = df.withColumn("full_text", remove_urls_udf(df["full_text"]))

df.show()


+--------------------+--------------------+--------------------+----+--------------------+---------+--------------------+--------------------+--------------------+
|                  id|           full_text|            hashtags|lang|           user_name|sentiment|        tokens_no_sw|              tokens|     filtered_tokens|
+--------------------+--------------------+--------------------+----+--------------------+---------+--------------------+--------------------+--------------------+
| 1469001496255614976|This is why MMA i...|          MMA,UFC269|  en|       ChiranjitOjha|        2|[mma, unique, eve...|[this, is, why, m...|[mma, unique, eve...|
|For more informat...|                   0|                  en|   3|Thu Dec 09 17:50:...|        1|                 [0]|                 [0]|                 [0]|
| 1469001491847524355|Early interventio...|LearntheSigns,Act...|  en|            Excelby8|        2|[early, intervent...|[early, intervent...|[early, intervent...|
|#BackTheBlue 👮‍

In [95]:
sid = SentimentIntensityAnalyzer()

def analyze_sentiment(text):
    sentiment_score = sid.polarity_scores(text)
    if sentiment_score['compound'] > 0.05:
        return 2
    elif sentiment_score['compound'] < -0.05:
        return 0
    else:
        return 1

sentiment_udf = udf(analyze_sentiment, IntegerType())

df = df.withColumn('sentiment', sentiment_udf(df['full_text']))

df.show()


+--------------------+--------------------+--------------------+----+--------------------+---------+--------------------+--------------------+--------------------+
|                  id|           full_text|            hashtags|lang|           user_name|sentiment|        tokens_no_sw|              tokens|     filtered_tokens|
+--------------------+--------------------+--------------------+----+--------------------+---------+--------------------+--------------------+--------------------+
| 1469001496255614976|This is why MMA i...|          MMA,UFC269|  en|       ChiranjitOjha|        2|[mma, unique, eve...|[this, is, why, m...|[mma, unique, eve...|
|For more informat...|                   0|                  en|   3|Thu Dec 09 17:50:...|        1|                 [0]|                 [0]|                 [0]|
| 1469001491847524355|Early interventio...|LearntheSigns,Act...|  en|            Excelby8|        2|[early, intervent...|[early, intervent...|[early, intervent...|
|#BackTheBlue 👮‍