<a href="https://colab.research.google.com/github/MazharSaeedCCT/Integrated-CA1-Sem-2-MSc-/blob/main/Spark_Analysis_using_SQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Twitter Sentiment Analysis using mysql") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

# Read CSV into Spark DataFrame
tweets_df = spark.read.csv("/content/drive/MyDrive/ProjectTweets.csv", header=True, inferSchema=True)


In [3]:
tweets_df.show()

+---+----------+----------------------------+--------+---------------+-------------------------------------------------------------------------------------------------------------------+
|  0|1467810369|Mon Apr 06 22:19:45 PDT 2009|NO_QUERY|_TheSpecialOne_|@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D|
+---+----------+----------------------------+--------+---------------+-------------------------------------------------------------------------------------------------------------------+
|  1|1467810672|        Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|                                                                                               is upset that he ...|
|  2|1467810917|        Mon Apr 06 22:19:...|NO_QUERY|       mattycus|                                                                                               @Kenichan I dived...|
|  3|1467811184|        Mon Apr 06 22:19:...|NO_QUERY|        Ell

In [4]:
tweets_df.printSchema()

root
 |-- 0: integer (nullable = true)
 |-- 1467810369: long (nullable = true)
 |-- Mon Apr 06 22:19:45 PDT 2009: string (nullable = true)
 |-- NO_QUERY: string (nullable = true)
 |-- _TheSpecialOne_: string (nullable = true)
 |-- @switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D: string (nullable = true)



In [5]:
# Rename columns to meaningful names
from pyspark.sql.functions import col
# Rename columns to meaningful names
tweets_df = tweets_df.select(
    col("1467810369").alias("tweet_id"),
    col("Mon Apr 06 22:19:45 PDT 2009").alias("timestamp"),
    col("NO_QUERY").alias("query"),
    col("_TheSpecialOne_").alias("username"),
    col("`@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D`").alias("text")
)

# Display the schema and first few rows of the DataFrame
tweets_df.printSchema()
tweets_df.show(5, truncate=False)

root
 |-- tweet_id: long (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- query: string (nullable = true)
 |-- username: string (nullable = true)
 |-- text: string (nullable = true)

+----------+----------------------------+--------+-------------+---------------------------------------------------------------------------------------------------------------+
|tweet_id  |timestamp                   |query   |username     |text                                                                                                           |
+----------+----------------------------+--------+-------------+---------------------------------------------------------------------------------------------------------------+
|1467810672|Mon Apr 06 22:19:49 PDT 2009|NO_QUERY|scotthamilton|is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!|
|1467810917|Mon Apr 06 22:19:53 PDT 2009|NO_QUERY|mattycus     |@Kenichan I dived many times f

In [6]:
tweets_df.show()

+----------+--------------------+--------+---------------+--------------------+
|  tweet_id|           timestamp|   query|       username|                text|
+----------+--------------------+--------+---------------+--------------------+
|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2Hood4Hollywood|@Tatiana_K nope t...|
|1467812025|Mon Apr 06 22:20:...|NO_QUERY|        mimismo|@twittera que me ...|
|1467812416|Mon Apr 06 22:20:...|NO_QUER

In [7]:
tweets_df.printSchema()

root
 |-- tweet_id: long (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- query: string (nullable = true)
 |-- username: string (nullable = true)
 |-- text: string (nullable = true)



In [9]:
# Register the DataFrame as a temporary table
tweets_df.createOrReplaceTempView("tweets_table")

# Define the target database name
target_database = "spark"

# Save the DataFrame as a table in the target database
spark.sql(f"CREATE DATABASE IF NOT EXISTS {target_database}")
spark.sql(f"USE {target_database}")
tweets_df.write.saveAsTable("data", mode="overwrite")



In [10]:
# Define the database and table names
database = "spark"
table = "data"

# Switch to the specified database
spark.sql(f"USE {database}")

# Show the contents of the table
table_contents = spark.sql(f"SELECT * FROM {table}")
table_contents.show()

+----------+--------------------+--------+---------------+--------------------+
|  tweet_id|           timestamp|   query|       username|                text|
+----------+--------------------+--------+---------------+--------------------+
|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2Hood4Hollywood|@Tatiana_K nope t...|
|1467812025|Mon Apr 06 22:20:...|NO_QUERY|        mimismo|@twittera que me ...|
|1467812416|Mon Apr 06 22:20:...|NO_QUER

In [11]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
from textblob import TextBlob
import re

In [12]:
# Define a user-defined function (UDF) for text cleaning
def clean_text(text):
    # Convert text to lowercase
    text = text.lower()
    # Remove special characters, punctuation, and links using regex
    text = re.sub(r'[^a-zA-Z\s]', '', text)
    # Remove extra whitespaces
    text = ' '.join(text.split())
    return text

In [13]:
# Register the UDF
clean_text_udf = udf(clean_text, StringType())

In [14]:
# Apply the UDF to the 'text' column to clean the text data
tweets_df = table_contents.withColumn("clean_text", clean_text_udf(col("text")))


In [15]:
# Filter out tweets with empty or null cleaned text
tweets_df = tweets_df.filter(col("clean_text").isNotNull() & (col("clean_text") != ""))


In [16]:
# Define a user-defined function (UDF) to perform sentiment analysis using TextBlob
def analyze_sentiment(text):
    # Perform sentiment analysis using TextBlob
    analysis = TextBlob(text)
    # Return the sentiment polarity as a string
    return "positive" if analysis.sentiment.polarity > 0 else "negative" if analysis.sentiment.polarity < 0 else "neutral"


In [17]:
# Register the UDF
sentiment_udf = udf(analyze_sentiment, StringType())

In [18]:
# Apply the UDF to the 'clean_text' column to add a new column 'sentiment'
tweets_df = tweets_df.withColumn("sentiment", sentiment_udf(col("clean_text")))


In [19]:
# Display the schema and first few rows of the DataFrame with sentiment analysis results
tweets_df.printSchema()
tweets_df.show(5)

root
 |-- tweet_id: long (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- query: string (nullable = true)
 |-- username: string (nullable = true)
 |-- text: string (nullable = true)
 |-- clean_text: string (nullable = true)
 |-- sentiment: string (nullable = true)

+----------+--------------------+--------+-------------+--------------------+--------------------+---------+
|  tweet_id|           timestamp|   query|     username|                text|          clean_text|sentiment|
+----------+--------------------+--------+-------------+--------------------+--------------------+---------+
|1467810672|Mon Apr 06 22:19:...|NO_QUERY|scotthamilton|is upset that he ...|is upset that he ...|  neutral|
|1467810917|Mon Apr 06 22:19:...|NO_QUERY|     mattycus|@Kenichan I dived...|kenichan i dived ...| positive|
|1467811184|Mon Apr 06 22:19:...|NO_QUERY|      ElleCTF|my whole body fee...|my whole body fee...| positive|
|1467811193|Mon Apr 06 22:19:...|NO_QUERY|       Karoli|@nationwi

In [20]:
# Display only the "timestamp" column
tweets_df.select("timestamp").show(5, truncate=False)


+----------------------------+
|timestamp                   |
+----------------------------+
|Mon Apr 06 22:19:49 PDT 2009|
|Mon Apr 06 22:19:53 PDT 2009|
|Mon Apr 06 22:19:57 PDT 2009|
|Mon Apr 06 22:19:57 PDT 2009|
|Mon Apr 06 22:20:00 PDT 2009|
+----------------------------+
only showing top 5 rows



In [21]:
# Calculate value counts for the 'name' column
value_counts = tweets_df.groupBy("sentiment").count().orderBy("sentiment", ascending=False)

# Show value counts
value_counts.show()

+---------+------+
|sentiment| count|
+---------+------+
| positive|687006|
|  neutral|569160|
| negative|343833|
+---------+------+



In [22]:
tweets_df.printSchema()

root
 |-- tweet_id: long (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- query: string (nullable = true)
 |-- username: string (nullable = true)
 |-- text: string (nullable = true)
 |-- clean_text: string (nullable = true)
 |-- sentiment: string (nullable = true)



In [23]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Step 1: Preprocess the data
tweets_df = tweets_df.withColumn("timestamp", F.to_timestamp("timestamp", "EEE MMM dd HH:mm:ss z yyyy"))

# Handle missing values if any
tweets_df = tweets_df.dropna()

In [24]:
# Display only the "timestamp" column
tweets_df.select("timestamp").show(5, truncate=False)


+-------------------+
|timestamp          |
+-------------------+
|2009-04-07 05:19:49|
|2009-04-07 05:19:53|
|2009-04-07 05:19:57|
|2009-04-07 05:19:57|
|2009-04-07 05:20:00|
+-------------------+
only showing top 5 rows



In [25]:
# Ensure the dataset is sorted by timestamp
tweets_df = tweets_df.orderBy("timestamp")

In [26]:
# Display only the "timestamp" column
tweets_df.select("timestamp").show(5, truncate=False)


+-------------------+
|timestamp          |
+-------------------+
|2009-04-07 05:19:49|
|2009-04-07 05:19:53|
|2009-04-07 05:19:57|
|2009-04-07 05:19:57|
|2009-04-07 05:20:00|
+-------------------+
only showing top 5 rows



In [27]:
!pip install pmdarima


Collecting pmdarima
  Downloading pmdarima-2.0.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl (2.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m10.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: pmdarima
Successfully installed pmdarima-2.0.4


In [28]:
# Convert timestamp to numerical value for linear regression
tweets_df = tweets_df.withColumn("timestamp_numeric", F.unix_timestamp("timestamp"))


In [41]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import LinearRegression
import pyspark.sql.functions as F


In [42]:
# Convert sentiment labels to numeric indices
indexer = StringIndexer(inputCol="sentiment", outputCol="sentiment_index")
indexed_data = indexer.fit(tweets_df).transform(tweets_df)

In [55]:
# Assemble features
feature_columns = ["timestamp_numeric"]
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
# indexed_data = vector_assembler.transform(indexed_data)


In [44]:
tweets_df.show()

+----------+-------------------+--------+---------------+--------------------+--------------------+---------+-----------------+---------------+
|  tweet_id|          timestamp|   query|       username|                text|          clean_text|sentiment|timestamp_numeric|       features|
+----------+-------------------+--------+---------------+--------------------+--------------------+---------+-----------------+---------------+
|1467810672|2009-04-07 05:19:49|NO_QUERY|  scotthamilton|is upset that he ...|is upset that he ...|  neutral|       1239081589|[1.239081589E9]|
|1467810917|2009-04-07 05:19:53|NO_QUERY|       mattycus|@Kenichan I dived...|kenichan i dived ...| positive|       1239081593|[1.239081593E9]|
|1467811184|2009-04-07 05:19:57|NO_QUERY|        ElleCTF|my whole body fee...|my whole body fee...| positive|       1239081597|[1.239081597E9]|
|1467811193|2009-04-07 05:19:57|NO_QUERY|         Karoli|@nationwideclass ...|nationwideclass n...| negative|       1239081597|[1.239081

In [46]:
# Train-Test Split
(train_data, test_data) = indexed_data.randomSplit([0.8, 0.2], seed=42)


In [47]:
# Define Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="sentiment_index")


In [48]:
# Train the model
lr_model = lr.fit(train_data.select("features", "sentiment_index"))


In [49]:
# Make predictions
predictions_lr = lr_model.transform(test_data)

# Show predictions
predictions_lr.select("sentiment_index", "prediction").show()


+---------------+------------------+
|sentiment_index|        prediction|
+---------------+------------------+
|            0.0|0.6050889396270733|
|            0.0|0.6050891706995642|
|            1.0|0.6050894017720481|
|            0.0|0.6050898254049457|
|            0.0|0.6050904801103272|
|            1.0|0.6050906341586497|
|            0.0|0.6050916354727605|
|            2.0|0.6050924057143803|
|            1.0|0.6050939076855499|
|            0.0|0.6050939461976341|
|            2.0|0.6050941772701179|
|            2.0|0.6050945623909314|
|            2.0|0.6050946779271769|
|            0.0|0.6050954096567196|
|            0.0|0.6050954866808809|
|            0.0|0.6050955637050421|
|            1.0| 0.605095794777526|
|            0.0|0.6050964494829074|
|            2.0|0.6050966805553983|
|            1.0|0.6050967575795596|
+---------------+------------------+
only showing top 20 rows



In [56]:
tweets_df.show(5)

+----------+-------------------+--------+-------------+--------------------+--------------------+---------+-----------------+---------------+
|  tweet_id|          timestamp|   query|     username|                text|          clean_text|sentiment|timestamp_numeric|       features|
+----------+-------------------+--------+-------------+--------------------+--------------------+---------+-----------------+---------------+
|1467810672|2009-04-07 05:19:49|NO_QUERY|scotthamilton|is upset that he ...|is upset that he ...|  neutral|       1239081589|[1.239081589E9]|
|1467810917|2009-04-07 05:19:53|NO_QUERY|     mattycus|@Kenichan I dived...|kenichan i dived ...| positive|       1239081593|[1.239081593E9]|
|1467811184|2009-04-07 05:19:57|NO_QUERY|      ElleCTF|my whole body fee...|my whole body fee...| positive|       1239081597|[1.239081597E9]|
|1467811193|2009-04-07 05:19:57|NO_QUERY|       Karoli|@nationwideclass ...|nationwideclass n...| negative|       1239081597|[1.239081597E9]|
|14678

In [1]:
# import pandas as pd
# import pmdarima as pm

# # Convert Spark DataFrame to Pandas DataFrame
# tweets_pd_df = tweets_df.select("timestamp", "sentiment").toPandas()

# # Set timestamp as index
# tweets_pd_df["timestamp"] = pd.to_datetime(tweets_pd_df["timestamp"])
# tweets_pd_df.set_index("timestamp", inplace=True)



In [2]:
# # Map sentiment labels to numeric values
# sentiment_mapping = {"positive": 1, "neutral": 0, "negative": -1}
# tweets_pd_df["sentiment_numeric"] = tweets_pd_df["sentiment"].map(sentiment_mapping)

# # Fit ARIMA model
# arima_model = pm.auto_arima(tweets_pd_df["sentiment_numeric"], seasonal=False)

# # Forecast
# forecast, conf_int = arima_model.predict(n_periods=10, return_conf_int=True)

# # Print forecasts
# print("ARIMA Forecast:")
# print(forecast)
