In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from google.cloud import language_v1
import pandas as pd

# Create SparkSession
spark = SparkSession.builder.appName("SentimentAnalysis").getOrCreate()

# Read data from GCS
df = spark.read.csv("gs://datakimmy1/apple_news.csv", header=True, inferSchema=True)

# Show the first few rows of data
df.show(5)

# Define the sentiment analysis function
def analyze_sentiment(text):
    client = language_v1.LanguageServiceClient()
    document = language_v1.Document(content=text, type_=language_v1.Document.Type.PLAIN_TEXT)
    response = client.analyze_sentiment(request={'document': document})
    return response.document_sentiment.score   # Return sentiment score

# Use UDF to apply sentiment analysis to the Content column
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# Create UDF
sentiment_udf = udf(analyze_sentiment, FloatType())

# Add a new column to store sentiment scores
df_with_sentiment = df.withColumn("sentiment", sentiment_udf(col("Content")))

# Progress tracking
num_rows = df.count()  # Get total number of rows
for i in range(0, num_rows, 10):
    print(f"Processing rows {i + 1} to {min(i + 10, num_rows)}...")

# Retain only the "Time" and "sentiment" columns
df_final = df_with_sentiment.select("Time", "sentiment")

# Convert the DataFrame with sentiment scores to a Pandas DataFrame for local saving
pandas_df = df_final.toPandas()

# Print the first few rows of the final result
print("Sentiment score result:")
print(pandas_df.head())

# Save to GCS
output_path = "gs://datakimmy1/sentiment_analysis_results.csv"
df_final.write.csv(output_path, header=True, mode='overwrite')

# Close Spark Session
spark.stop()



                                                                                

+--------------------+----------------+--------------------+
|               Title|            Time|             Content|
+--------------------+----------------+--------------------+
|Messi’s MLS Cup P...|October 23, 2024|['Lionel Messi ma...|
|The new iPad mini...|October 23, 2024|['Beginning today...|
|Apple celebrates ...|October 17, 2024|['When we started...|
|Apple expands too...|October 16, 2024|['For the first t...|
|Apple introduces ...|October 15, 2024|['CUPERTINO, CALI...|
+--------------------+----------------+--------------------+
only showing top 5 rows

Processing rows 1 to 10...
Processing rows 11 to 20...
Processing rows 21 to 30...
Processing rows 31 to 40...
Processing rows 41 to 50...
Processing rows 51 to 60...
Processing rows 61 to 70...
Processing rows 71 to 80...
Processing rows 81 to 90...
Processing rows 91 to 100...
Processing rows 101 to 110...
Processing rows 111 to 120...
Processing rows 121 to 130...
Processing rows 131 to 140...
Processing rows 141 to 150

                                                                                

Sentiment score result：
               Time  sentiment
0  October 23, 2024        0.4
1  October 23, 2024        0.3
2  October 17, 2024        0.4
3  October 16, 2024        0.2
4  October 15, 2024        0.5


24/10/30 01:37:50 WARN ApplicationMaster: Reporter thread fails 1 time(s) in a row.
java.io.InterruptedIOException: Call interrupted
	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1560) ~[hadoop-client-api-3.3.6.jar:?]
	at org.apache.hadoop.ipc.Client.call(Client.java:1512) ~[hadoop-client-api-3.3.6.jar:?]
	at org.apache.hadoop.ipc.Client.call(Client.java:1409) ~[hadoop-client-api-3.3.6.jar:?]
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:258) ~[hadoop-client-api-3.3.6.jar:?]
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:139) ~[hadoop-client-api-3.3.6.jar:?]
	at com.sun.proxy.$Proxy40.allocate(Unknown Source) ~[?:?]
	at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:78) ~[hadoop-client-api-3.3.6.jar:?]
	at jdk.internal.reflect.GeneratedMethodAccessor69.invoke(Unknown Source) ~[?:?]
	at java.base/jdk.internal.reflec

In [None]:
from pyspark.sql import SparkSession 
import pandas as pd
import matplotlib.pyplot as plt

# Create SparkSession
spark = SparkSession.builder.appName("SentimentAnalysis").getOrCreate()

# Read the sentiment score table from GCS
sentiment_df = spark.read.csv("gs://datakimmy1/sentiment_analysis_results.csv", header=True, inferSchema=True)

# Convert the DataFrame to Pandas format and ensure the date column format is consistent
sentiment_pd_df = sentiment_df.select("Time", "sentiment").toPandas()

# Convert the sentiment column to float type
sentiment_pd_df['sentiment'] = sentiment_pd_df['sentiment'].astype(float)

# Generate a histogram of sentiment scores
plt.figure(figsize=(12, 6))
plt.hist(sentiment_pd_df['sentiment'].dropna(), bins=30, alpha=0.7, color='blue', edgecolor='black')
plt.title("Distribution of Apple News' Sentiment Scores")
plt.xlabel("Sentiment Score")
plt.ylabel("Frequency")
plt.grid(axis='y', alpha=0.75)
plt.show()


24/10/30 05:16:11 ERROR ClusterManager: Could not initialize cluster nodes=[cluster-adf4-w-0.us-central1-f.c.voltaic-athlete-440119-h1.internal, cluster-adf4-w-1.us-central1-f.c.voltaic-athlete-440119-h1.internal] nodeHostName=cluster-adf4-m.us-central1-f.c.voltaic-athlete-440119-h1.internal nodeHostAddress=10.128.0.34 currentNodeIndex=null
24/10/30 05:16:13 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/10/30 05:16:28 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources


In [None]:
from pyspark.sql import SparkSession 
from pyspark.sql.functions import col
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

# Create SparkSession
spark = SparkSession.builder.appName("SentimentStockAnalysis").getOrCreate()

# Read the sentiment score table and Apple stock price table from GCS
sentiment_df = spark.read.csv("gs://datakimmy1/sentiment_analysis_results.csv", header=True, inferSchema=True)
stock_df = spark.read.csv("gs://datakimmy1/AppleStockPrice.csv", header=True, inferSchema=True)

# Rename the date column in the sentiment score table for merging
sentiment_df = sentiment_df.withColumnRenamed("Time", "Date")

# Convert DataFrames to Pandas format and ensure the date columns are consistent
sentiment_pd_df = sentiment_df.select("Date", "sentiment").toPandas()
stock_pd_df = stock_df.select("Date", "Close").toPandas()

# Convert the Date columns to datetime type
sentiment_pd_df['Date'] = pd.to_datetime(sentiment_pd_df['Date'], dayfirst=True, errors='coerce')
stock_pd_df['Date'] = pd.to_datetime(stock_pd_df['Date'], dayfirst=True, errors='coerce')

# Generate a complete date range
full_date_range = pd.DataFrame(pd.date_range(start=sentiment_pd_df["Date"].min(), end=stock_pd_df["Date"].max()), columns=["Date"])

# Merge data
merged_df = full_date_range.merge(sentiment_pd_df, on="Date", how="left").merge(stock_pd_df, on="Date", how="left")
print(merged_df.tail(5))

# Define a function to calculate the average future gain over a specified window
def calculate_avg_future_gain(df, window=3):
    future_gains = []
    for i, row in df.iterrows():
        current_price = row['Close']
        future_prices = df['Close'].iloc[i + 1:i + 1 + window]  # Future closing prices for the next few days
        if pd.notna(row['sentiment']) and len(future_prices) == window:
            # Only calculate if sentiment is not NaN
            gains = (future_prices.values - current_price) / current_price * 100  # Percentage change
            avg_gain = np.mean(gains)
            future_gains.append(avg_gain)
        else:
            # If sentiment score is NaN or there aren't enough future days, skip
            future_gains.append(np.nan)
    df['avg_future_gain'] = future_gains
    return df

# Calculate the average future gain for each date with a sentiment value
merged_df = calculate_avg_future_gain(merged_df)

# Calculate average future gains based on different sentiment scores
avg_gain_by_sentiment = merged_df.groupby('sentiment')['avg_future_gain'].mean().reset_index()

# Handle NaN values by filling with linear interpolation
avg_gain_by_sentiment['avg_future_gain'] = avg_gain_by_sentiment['avg_future_gain'].interpolate(method='linear')

# Plot a line graph
plt.figure(figsize=(12, 6))
plt.plot(avg_gain_by_sentiment['sentiment'], avg_gain_by_sentiment['avg_future_gain'], marker='o', linestyle='-', color='b')
plt.xlabel("Sentiment Score")
plt.ylabel("Average Future Gain (%)")
plt.title("Average Future Stock Price Change by Sentiment Score")
plt.grid(True)
plt.xticks(avg_gain_by_sentiment['sentiment'])  # Set x-axis ticks to sentiment scores
plt.show()

# Stop the Spark Session
spark.stop()