In [1]:
import pandas as pd

In [10]:
df = pd.read_csv("Tweets.csv")

df = df.iloc[:, 1:]

df.columns = ["id", "date", "flag", "user", "text"]

df.to_csv("Ready_Tweets.csv", index=False)

In [11]:
df2 = pd.read_csv("Ready_Tweets.csv")

In [12]:
df2

Unnamed: 0,id,date,flag,user,text
0,1467810672,Mon Apr 06 22:19:49 PDT 2009,NO_QUERY,scotthamilton,is upset that he can't update his Facebook by ...
1,1467810917,Mon Apr 06 22:19:53 PDT 2009,NO_QUERY,mattycus,@Kenichan I dived many times for the ball. Man...
2,1467811184,Mon Apr 06 22:19:57 PDT 2009,NO_QUERY,ElleCTF,my whole body feels itchy and like its on fire
3,1467811193,Mon Apr 06 22:19:57 PDT 2009,NO_QUERY,Karoli,"@nationwideclass no, it's not behaving at all...."
4,1467811372,Mon Apr 06 22:20:00 PDT 2009,NO_QUERY,joy_wolf,@Kwesidei not the whole crew
...,...,...,...,...,...
1599994,2193601966,Tue Jun 16 08:40:49 PDT 2009,NO_QUERY,AmandaMarie1028,Just woke up. Having no school is the best fee...
1599995,2193601969,Tue Jun 16 08:40:49 PDT 2009,NO_QUERY,TheWDBoards,TheWDB.com - Very cool to hear old Walt interv...
1599996,2193601991,Tue Jun 16 08:40:49 PDT 2009,NO_QUERY,bpbabe,Are you ready for your MoJo Makeover? Ask me f...
1599997,2193602064,Tue Jun 16 08:40:49 PDT 2009,NO_QUERY,tinydiamondz,Happy 38th Birthday to my boo of alll time!!! ...


In [23]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("TwitterSentimentAnalysis").getOrCreate()


In [64]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, FloatType, DecimalType, DoubleType

# Define the schema for your data
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("date", StringType(), True),
    StructField("flag", StringType(), True),
    StructField("user", StringType(), True),
    StructField("text", StringType(), True)
])

# Load the CSV data into a DataFrame
tweets_df = spark.read.csv("Ready_Tweets.csv", header=True, schema=schema)


In [52]:
# Create a temporary table for your DataFrame
tweets_df.createOrReplaceTempView("tweets")

In [53]:
# Example: Select top 10 tweets
top_10_tweets = spark.sql("SELECT * FROM tweets LIMIT 10")
top_10_tweets.show()

+----------+--------------------+--------+---------------+--------------------+
|        id|                date|    flag|           user|                text|
+----------+--------------------+--------+---------------+--------------------+
|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2Hood4Hollywood|@Tatiana_K nope t...|
|1467812025|Mon Apr 06 22:20:...|NO_QUERY|        mimismo|@twittera que me ...|
|1467812416|Mon Apr 06 22:20:...|NO_QUER

In [54]:
total_rows = tweets_df.count()
print("Total rows in the dataset:", total_rows)

[Stage 16:>                                                         (0 + 2) / 2]

Total rows in the dataset: 1599999


                                                                                

In [55]:
tweets_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



In [56]:
from pyspark.sql.functions import lower, regexp_replace
from nltk.tokenize import word_tokenize

# Example text preprocessing
tweets_df = tweets_df.withColumn("text", lower(tweets_df["text"]))
tweets_df = tweets_df.withColumn("text", regexp_replace(tweets_df["text"], "[^a-zA-Z\\s]", ""))

In [65]:
from pyspark.sql.functions import udf
from textblob import TextBlob

# Define a UDF to perform sentiment analysis
def get_sentiment(text):
    analysis = TextBlob(text)
    return analysis.sentiment.polarity

sentiment_udf = udf(get_sentiment, DoubleType())
tweets_df = tweets_df.withColumn("sentiment", sentiment_udf(tweets_df["text"]))

# Show the results
tweets_df.select("text", "sentiment").show()

[Stage 22:>                                                         (0 + 1) / 1]

+--------------------+--------------------+
|                text|           sentiment|
+--------------------+--------------------+
|is upset that he ...|                 0.0|
|@Kenichan I dived...|                 0.5|
|my whole body fee...|                 0.2|
|@nationwideclass ...|              -0.625|
|@Kwesidei not the...|                 0.2|
|         Need a hug |                 0.0|
|@LOLTrish hey  lo...| 0.27083333333333337|
|@Tatiana_K nope t...|                 0.0|
|@twittera que me ...|                 0.0|
|spring break in p...|-0.21428571428571427|
|I just re-pierced...|                 0.0|
|@caregiving I cou...|                 0.0|
|@octolinz16 It it...|                 0.0|
|@smarrison i woul...|               0.075|
|@iamjazzyfizzle I...|                 0.0|
|Hollis' death sce...|                 0.0|
|about to file taxes |                 0.0|
|@LettyA ahh ive a...|             0.78125|
|@FakerPattyPattz ...|                 0.0|
|@alydesigns i was...|          

                                                                                

In [41]:
import nltk
nltk.download('vader_lexicon')

[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /home/muhammad/nltk_data...


True

In [42]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from nltk.sentiment.vader import SentimentIntensityAnalyzer

# Initialize the NLTK SentimentIntensityAnalyzer
analyzer = SentimentIntensityAnalyzer()

# Define a User Defined Function (UDF) to perform sentiment analysis
def analyze_sentiment(text):
    sentiment = analyzer.polarity_scores(text)
    if sentiment['compound'] >= 0.05:
        return "Positive"
    elif sentiment['compound'] <= -0.05:
        return "Negative"
    else:
        return "Neutral"

sentiment_udf = udf(analyze_sentiment, StringType())

# Add a new column 'sentiment' to the DataFrame
tweets_df = tweets_df.withColumn("sentiment", sentiment_udf(tweets_df["text"]))

# Show the results
tweets_df.select("text", "sentiment").show()




+--------------------+---------+
|                text|sentiment|
+--------------------+---------+
|is upset that he ...| Negative|
|kenichan i dived ...| Positive|
|my whole body fee...| Negative|
|nationwideclass n...| Negative|
|kwesidei not the ...|  Neutral|
|         need a hug | Positive|
|loltrish hey  lon...| Positive|
|tatianak nope the...|  Neutral|
|twittera que me m...|  Neutral|
|spring break in p...|  Neutral|
|i just repierced ...|  Neutral|
|caregiving i coul...| Negative|
|octolinz it it co...| Negative|
|smarrison i would...| Positive|
|iamjazzyfizzle i ...| Positive|
|hollis death scen...| Negative|
|about to file taxes |  Neutral|
|lettya ahh ive al...| Positive|
|fakerpattypattz o...| Positive|
|alydesigns i was ...|  Neutral|
+--------------------+---------+
only showing top 20 rows



                                                                                

In [6]:
spark.stop()