In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp

# Initialize SparkSession with legacy time parser policy
spark = SparkSession \
    .builder \
    .appName("PySparkSQL") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

# Define the path to the CSV file
file_path = "/home/hduser/Desktop/ProjectTweets.csv"

# Load the CSV data into a DataFrame
tweets_df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").load(file_path)

# Rename the columns according to your CSV structure
tweets_df = tweets_df.withColumnRenamed("_c0", "Sequencial") \
                     .withColumnRenamed("_c1", "Serial_Number") \
                     .withColumnRenamed("_c2", "Date") \
                     .withColumnRenamed("_c3", "NO_QUERY") \
                     .withColumnRenamed("_c4", "User_Name") \
                     .withColumnRenamed("_c5", "Tweet")

# Use the old timestamp format that matches the legacy parser
old_timestamp_format = "EEE MMM dd HH:mm:ss z yyyy"
tweets_df = tweets_df.withColumn("Date", to_timestamp(tweets_df["Date"], old_timestamp_format))

# Drop the unnecessary columns
tweets_df = tweets_df.drop("Serial_Number", "NO_QUERY", "User_Name")

# Show the DataFrame to verify the columns are dropped
tweets_df.show()

2023-11-05 12:22:24,446 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-11-05 12:22:25,554 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Py4JError: An error occurred while calling None.org.apache.spark.sql.SparkSession. Trace:
py4j.Py4JException: Constructor org.apache.spark.sql.SparkSession([class org.apache.spark.SparkContext, class java.util.HashMap]) does not exist
	at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
	at py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
	at py4j.Gateway.invoke(Gateway.java:237)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)



In [None]:
# Count the total number of tweets
tweet_count = tweets_df.count()

print(f"Total number of tweets: {tweet_count}")

In [None]:
from pyspark.sql.functions import col
nightmare_tweets = tweets_df.filter(col("Tweet").contains("nightmare"))
nightmare_tweet_count = nightmare_tweets.count()
print(f"Number of tweets containing 'nightmare': {nightmare_tweet_count}")


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, StringType
from textblob import TextBlob

# Initialize SparkSession (assuming it's already started in your environment)
spark = SparkSession.builder.appName("SentimentAnalysis").getOrCreate()

# Define a function to compute sentiment
def get_tweet_sentiment(tweet_text):
    # Use TextBlob to get the sentiment polarity
    analysis = TextBlob(tweet_text)
    # Set threshold for positive and negative sentiments
    if analysis.sentiment.polarity > 0:
        return 1
    elif analysis.sentiment.polarity == 0:
        return 0
    else:
        return -1

# Define the function to truncate the tweet text for better display
def truncate_string(s, length=50):
    return s if len(s) <= length else s[:length-3] + '...'

# Register the UDFs
sentiment_udf = udf(get_tweet_sentiment, IntegerType())
truncate_udf = udf(truncate_string, StringType())

# Apply the sentiment UDF to the DataFrame
tweets_df = tweets_df.withColumn('SentimentScore', sentiment_udf(col('Tweet')))

# Apply the truncation UDF to the DataFrame to shorten the tweet text
tweets_df = tweets_df.withColumn('TruncatedTweet', truncate_udf(col('Tweet')))

# Select the desired columns to show and display the DataFrame as a table, excluding 'Sequencial'
tweets_df.select('SentimentScore', 'Date', 'TruncatedTweet').show(truncate=False)

## Create a Average Sentimental by Day

In [None]:
from pyspark.sql import functions as F

# Group by the date and calculate the average sentiment score, ensure the grouped date column has a simple alias
time_series_df = tweets_df.groupBy(F.to_date("Date").alias("Date")).agg(F.avg("SentimentScore").alias("AvgSentiment"))

# Sort the data by the aliased date
time_series_df = time_series_df.orderBy("Date")

# Show the DataFrame to verify the time series data
time_series_df.show()


### Given the nature of the dataset (sentiment scores over time), you might start with LSTM to capture potential non-linear patterns. Then, you could use ARIMA as a comparison to see how a more traditional linear method performs on your data.

In [None]:
# Assuming you have the PySpark DataFrame named `time_series_df`
pandas_df = time_series_df.toPandas()

In [None]:
# Save the DataFrame to a CSV file
pandas_df.to_csv('/home/hduser/Desktop/tweet_sentiment_time_series.csv', index=False)

In [None]:
import pandas as pd

# Load the dataset
dataset = pd.read_csv('/home/hduser/Desktop/tweet_sentiment_time_series.csv')


In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import datetime as dt  # Import the datetime module

# Assuming 'dataset' is a pandas DataFrame that includes 'Date' and 'AvgSentiment'
# Convert date to ordinal to represent it numerically
dataset['Date'] = pd.to_datetime(dataset['Date']).map(dt.datetime.toordinal)

# Scale the AvgSentiment values
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(dataset['AvgSentiment'].values.reshape(-1, 1))

# Create the time steps for LSTM (let's assume we're using 5 days to predict the next day)
def create_dataset(data, time_step=1):
    X, y = [], []
    for i in range(len(data) - time_step - 1):
        a = data[i:(i + time_step), 0]
        X.append(a)
        y.append(data[i + time_step, 0])
    return np.array(X), np.array(y)

time_step = 5
X, y = create_dataset(scaled_data, time_step)

# The rest of your LSTM code would go here
# ...

# Make sure to reshape 'X' to be [samples, time steps, features] which is required for LSTM
X = X.reshape(X.shape[0], X.shape[1], 1)

In [None]:
from keras.models import Sequential
from keras.layers import Dense, LSTM
from sklearn.model_selection import train_test_split

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Define the LSTM model architecture
model = Sequential()
model.add(LSTM(units=50, return_sequences=True, input_shape=(X_train.shape[1], 1)))
model.add(LSTM(units=50, return_sequences=False))
model.add(Dense(units=25))
model.add(Dense(units=1))

# Compile the model
model.compile(optimizer='adam', loss='mean_squared_error')

# Train the model
model.fit(X_train, y_train, batch_size=1, epochs=1)  # You can adjust the batch size and epochs

# Evaluate the model
testing_loss = model.evaluate(X_test, y_test)

# Forecasting
# You'll need to prepare your input data similarly to how you created your X dataset,
# and then you can use model.predict() to make your forecasts.

# After making predictions, you need to rescale them back to the original range
# This is just a placeholder for the prediction phase
# predictions = model.predict(X_test)
# predictions = scaler.inverse_transform(predictions)

# It's important to remember that you'll need to adjust hyperparameters like the number of LSTM units, 
# batch size, and epochs based on your specific dataset and the results you observe.
