In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, isnan
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.functions import vector_to_array
from pyspark.sql.types import DoubleType, StringType
import yfinance as yf
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import plotly.graph_objects as go
import pandas as pd
import json

# Initialize Spark session
spark = SparkSession.builder.appName("StockRiskAssessment").getOrCreate()

# Global sentiment cache
sentiment_cache = {}

# Step 1: Load data from CSV file
def load_stock_data(filepath):
    print("Loading stock data...")
    stock_df = spark.read.csv(filepath, header=True, inferSchema=True)
    print(f"Data loaded: {stock_df.count()} records found.")
    return stock_df

# Step 2: Prepare features for clustering
def prepare_features(stock_df, broadcast_sentiment_cache):
    print("Preparing features...")
    stock_df = stock_df.dropna(subset=['Volatility', 'Avg_Volume', 'PE_Ratio', 'Dividend_Yield', 'Beta', 'Returns'])

    # Sentiment analysis using UDF
    analyzer = SentimentIntensityAnalyzer()

    @udf(DoubleType())
    def analyze_sentiment(ticker):
        sentiment_cache = broadcast_sentiment_cache.value  # Access the broadcasted sentiment_cache
        if ticker in sentiment_cache:
            return sentiment_cache[ticker]  # Use cached value
        news = yf.Ticker(ticker).news
        if not news:
            sentiment_cache[ticker] = 0.0
            return 0.0
        scores = [analyzer.polarity_scores(item['title'])['compound'] for item in news]
        sentiment_cache[ticker] = float(sum(scores) / len(scores)) if scores else 0.0
        return sentiment_cache[ticker]

    stock_df = stock_df.withColumn("Sentiment_Score", analyze_sentiment(col("Ticker")))

    # Select features
    feature_columns = ['Volatility', 'Avg_Volume', 'PE_Ratio', 'Dividend_Yield', 'Beta', 'Sentiment_Score']
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    stock_df = assembler.transform(stock_df)
    return stock_df

# Step 3: Apply K-Means clustering
def apply_kmeans(stock_df, n_clusters=3):
    print("Standardizing features and applying K-Means clustering...")
    
    # Standardize the features
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
    scaled_df = scaler.fit(stock_df).transform(stock_df)
    
    # Extracting the 'values' field from 'scaled_features' (vector) to an array using vector_to_array
    scaled_df = scaled_df.withColumn("scaled_values", vector_to_array(col("scaled_features")))

    # Check for any missing data in 'scaled_values'
    missing_data = scaled_df.filter(
        col("scaled_values").isNull()
    ).count()

    if missing_data > 0:
        print(f"Warning: {missing_data} rows with missing values in scaled_features detected. Dropping them.")
        scaled_df = scaled_df.filter(col("scaled_values").isNotNull())

    # Repartition the dataframe to ensure we have evenly distributed partitions for KMeans
    scaled_df = scaled_df.repartition(10)  # You can adjust the number of partitions depending on your system
    
    # Apply KMeans clustering
    kmeans = KMeans(featuresCol="scaled_features", k=n_clusters, seed=42)
    model = kmeans.fit(scaled_df)
    clustered_df = model.transform(scaled_df)
    
    return clustered_df

# Step 4: Add risk levels to the dataset
def label_risk_levels(clustered_df):
    print("Labeling risk levels...")
    risk_mapping = {0: 'Low', 1: 'Medium', 2: 'High'}
    risk_udf = udf(lambda x: risk_mapping[x], StringType())
    clustered_df = clustered_df.withColumn("Risk_Level", risk_udf(col("prediction")))
    return clustered_df

# Step 5: Recommend top 5 unique stocks
def recommend_top_stocks(clustered_df, risk_level='Low', top_n=5):
    print(f"Recommending top {top_n} unique {risk_level}-risk stocks...")
    filtered_df = clustered_df.filter(col("Risk_Level") == risk_level)
    unique_stocks = filtered_df.dropDuplicates(["Ticker"])

    if unique_stocks.count() < top_n:
        print(f"Not enough stocks for {risk_level} risk level. Providing all available stocks.")
        return unique_stocks

    unique_stocks = unique_stocks.withColumn("Composite_Score", 
        col("Volatility") * 0.7 + col("Returns") * -0.3)
    recommended_stocks = unique_stocks.orderBy(col("Composite_Score")).limit(top_n)

    # Rearranging columns for better readability
    recommended_stocks.show(truncate=False)
    return recommended_stocks

# Step 6: Save sentiment cache to HDFS
def save_sentiment_cache_to_hdfs(sentiment_cache, hdfs_path):
    print("Saving sentiment cache...")
    try:
        # Save the sentiment_cache dictionary to a JSON file
        with open(hdfs_path, 'w') as f:
            json.dump(sentiment_cache, f)
        print("Sentiment cache saved.")
    except Exception as e:
        print(f"Failed to save sentiment cache. ({e})")

# Step 7: Load sentiment cache from HDFS
def load_sentiment_cache_from_hdfs(hdfs_path):
    print("Loading sentiment cache...")
    try:
        # Read the JSON file as a dictionary
        with open(hdfs_path, 'r') as f:
            sentiment_dict = json.load(f)

        # Ensure all sentiment scores are floats to avoid type mismatch
        sentiment_dict = {ticker: float(score) for ticker, score in sentiment_dict.items()}

        # Update the global sentiment_cache dictionary
        global sentiment_cache
        sentiment_cache = sentiment_dict
        
        print("Sentiment cache loaded.")
        return sentiment_dict
    except Exception as e:
        print(f"Failed to load sentiment cache. Starting fresh. ({e})")
        return {}

# Step 8: Plot cumulative returns for recommended stocks
def plot_returns(recommended_stocks, start_date, end_date):
    print("Plotting returns...")
    # Extract ticker symbols into a list
    tickers = [row['Ticker'] for row in recommended_stocks.select('Ticker').collect()]
    
    sp500_data = yf.download('^GSPC', start=start_date, end=end_date)
    sp500_data['Returns'] = sp500_data['Adj Close'].pct_change()

    fig = go.Figure()
    fig.add_trace(go.Scatter(x=sp500_data.index, y=sp500_data['Returns'].cumsum(), mode='lines', name='S&P 500'))

    for ticker in tickers:
        stock_data = yf.download(ticker, start=start_date, end=end_date)
        stock_data['Returns'] = stock_data['Adj Close'].pct_change()
        fig.add_trace(go.Scatter(x=stock_data.index, y=stock_data['Returns'].cumsum(), mode='lines', name=ticker))

    fig.update_layout(
        title="Cumulative Returns of Top 5 Recommended Stocks vs. S&P 500",
        xaxis_title="Date",
        yaxis_title="Cumulative Returns"
    )
    
    return fig

# Step 9: Save output to HTML
def save_output_to_html(recommended_stocks, plot, html_file):
    print("Saving output to HTML...")
    recommended_stocks_pd = recommended_stocks.toPandas()
    table_html = recommended_stocks_pd.to_html(index=False, border=0, classes="table table-bordered table-striped")

    html_content = f"""
    <html>
    <head>
        <style>
            table {{ width: 100%; border-collapse: collapse; }}
            th, td {{ padding: 10px; text-align: left; border-bottom: 1px solid #ddd; }}
            th {{ background-color: #f2f2f2; }}
        </style>
    </head>
    <body>
        <h2>Top 5 Recommended Stocks for {recommended_stocks_pd['Risk_Level'][0]} Risk Level</h2>
        {table_html}
        {plot.to_html(full_html=False)}
    </body>
    </html>
    """
    with open(html_file, 'w') as f:
        f.write(html_content)
    print(f"HTML output saved to {html_file}")

# Main execution
if __name__ == "__main__":
    filepath = "/shared_data/all_stock_data_with_metrics.csv"
    hdfs_path = "/shared_data/sentiment_cache.json"
    html_output_path = "/shared_data/output.html"  # Updated for clarity

    sentiment_cache = load_sentiment_cache_from_hdfs(hdfs_path)
    
    # Broadcast sentiment cache to all worker nodes
    broadcast_sentiment_cache = spark.sparkContext.broadcast(sentiment_cache)

    stock_df = load_stock_data(filepath)
    stock_df = prepare_features(stock_df, broadcast_sentiment_cache)
    clustered_df = apply_kmeans(stock_df)
    clustered_df = label_risk_levels(clustered_df)

    print("Select your risk preference:")
    print("a. Low")
    print("b. Medium")
    print("c. High")
    choice = input("Enter your choice (a/b/c): ").lower()
    risk_mapping = {'a': 'Low', 'b': 'Medium', 'c': 'High'}
    risk_level = risk_mapping.get(choice, 'Low')

    recommended_stocks = recommend_top_stocks(clustered_df, risk_level, top_n=5)
    start_date = '2019-01-01'
    end_date = '2024-01-01'
    plot = plot_returns(recommended_stocks, start_date, end_date)

    save_sentiment_cache_to_hdfs(sentiment_cache, hdfs_path)
    save_output_to_html(recommended_stocks, plot, html_output_path)

# Stop Spark session
spark.stop()


Loading sentiment cache...
Sentiment cache loaded.
Loading stock data...
Data loaded: 1048575 records found.
Preparing features...
Standardizing features and applying K-Means clustering...
Labeling risk levels...
Select your risk preference:
a. Low
b. Medium
c. High


Enter your choice (a/b/c):  a


Recommending top 5 unique Low-risk stocks...
+----------+-----------+-----------+-----------+-----------+-----------+-------+------+------------+---------+--------------+-----+----------+-----------+-------------------+------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------+----------+----------+-------------------+
|Date      |Open       |High       |Low        |Close      |Adj Close  |Volume |Ticker|Returns     |PE_Ratio |Dividend_Yield|Beta |Avg_Volume|Volatility |Sentiment_Score    |features                                                          |scaled_features                                                                                                    |scaled_values                                                                

[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


Saving sentiment cache...
Sentiment cache saved.
Saving output to HTML...
HTML output saved to /shared_data/output.html
