In [3]:
!pip install pyspark
!pip install spark-nlp



In [4]:
from pyspark.sql import SparkSession
import sparknlp
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import Tokenizer, Normalizer, LemmatizerModel, StopWordsCleaner
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, StringType, IntegerType
import requests
import json
import time
import random

# Start Spark session with Spark NLP
spark = sparknlp.start()

# Function to fetch news data from TickerTick API
def fetch_news_from_ticker_tick(tickers, n=10):
    api_url = "https://api.tickertick.com/feed"
    news_data = []
    for ticker in tickers:
        params = {
            'q': f"tt:{ticker}",
            'n': n
        }
        response = requests.get(api_url, params=params)
        if response.status_code == 200:
            data = response.json()
            for story in data['stories']:
                news_data.append((ticker, story['title']))
        else:
            print(f"Failed to fetch data for {ticker}, Status: {response.status_code}")
        time.sleep(1)  # Sleep to avoid hitting rate limits
    return news_data

# Tickers to use now
ticker_list = ["AAPL", "MSFT", "AMZN", "GOOGL", "TSLA"]

# Fetch news data
news_data = fetch_news_from_ticker_tick(ticker_list)

# Create DataFrame
df = spark.createDataFrame(news_data, schema=["ticker", "text"])

# Set up the NLP pipeline
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalized")
stop_words_cleaner = StopWordsCleaner().setInputCols(["normalized"]).setOutputCol("cleanTokens")
lemmatizer = LemmatizerModel.pretrained("lemma_antbnc", lang="en").setInputCols(["cleanTokens"]).setOutputCol("lemmas")

pipeline = Pipeline(stages=[document_assembler, tokenizer, normalizer, stop_words_cleaner, lemmatizer])
model = pipeline.fit(df)
result = model.transform(df)

# Convert column to string array type
convert_to_string_array = udf(lambda x: x, ArrayType(StringType()))
result = result.withColumn("tokens", convert_to_string_array(col("lemmas.result")))

# Feature transformation
hashingTF = HashingTF(inputCol="tokens", outputCol="rawFeatures")
featurizedData = hashingTF.transform(result)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

# Prepare data for modeling
random_label_udf = udf(lambda: random.randint(0, 1), IntegerType())
rescaledData = rescaledData.withColumn("label", random_label_udf())

# Split data and train the model
train, test = rescaledData.randomSplit([0.8, 0.2], seed=12345)
nb = NaiveBayes(featuresCol="features", labelCol="label")
nbModel = nb.fit(train)
predictions = nbModel.transform(test)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
nbAccuracy = evaluator.evaluate(predictions)
print(f"Naive Bayes Test set accuracy: {nbAccuracy:.2f}")

# Define a UDF to classify sentiment based on probabilities
def classify_sentiment(probabilities):
    positive_prob = probabilities[1]
    if positive_prob > 0.6:
        return 'Positive - News is likely viewed favorably.'
    elif positive_prob < 0.4:
        return 'Negative - News is likely viewed unfavorably.'
    else:
        return 'Neutral - News is seen as neither clearly good nor bad.'

classify_sentiment_udf = udf(classify_sentiment, StringType())

# Function to fetch sentiment for a given ticker
def get_sentiment_for_ticker(ticker):
    ticker_df = df.filter(df.ticker == ticker)
    if ticker_df.count() == 0:
        return "No data available for ticker: " + ticker
    processed_df = model.transform(ticker_df)
    processed_df = processed_df.withColumn("tokens", convert_to_string_array(col("lemmas.result")))
    featurizedData = hashingTF.transform(processed_df)
    rescaledData = idfModel.transform(featurizedData)
    predictions = nbModel.transform(rescaledData)
    predictions = predictions.withColumn("sentiment", classify_sentiment_udf(predictions.probability))

    sentiment_counts = predictions.groupBy("sentiment").count().collect()
    sentiment_summary = {row['sentiment']: f"{row['count']} articles, likely viewed as {row['sentiment'].split(' - ')[1]}" for row in sentiment_counts}

    # Calculate the opinion score
    total_articles = sum(count['count'] for count in sentiment_counts)
    positive_articles = next((count['count'] for count in sentiment_counts if 'Positive' in count['sentiment']), 0)
    if total_articles > 0:
        opinion_score = positive_articles / total_articles
    else:
        opinion_score = 0

    return {
        'sentiment_summary': sentiment_summary,
        'opinion_score': f"{opinion_score:.2f} (Positive Articles: {positive_articles}, Total Articles: {total_articles})"
    }

# Example usage
ticker_input = input("Enter a stock ticker: ").upper()
sentiment_results = get_sentiment_for_ticker(ticker_input)
print(f"Sentiment summary for {ticker_input}: {sentiment_results['sentiment_summary']}")
print(f"Opinion Score: {sentiment_results['opinion_score']}")


lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[ | ]

24/11/23 08:00:00 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.


[OK!]


24/11/23 08:00:04 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/11/23 08:00:05 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/11/23 08:00:06 WARN DAGScheduler: Broadcasting large task binary with size 8.2 MiB
                                                                                

Naive Bayes Test set accuracy: 0.40


24/11/23 08:00:15 WARN DAGScheduler: Broadcasting large task binary with size 8.1 MiB

Sentiment summary for AAPL: {'Positive - News is likely viewed favorably.': '3 articles, likely viewed as News is likely viewed favorably.', 'Negative - News is likely viewed unfavorably.': '6 articles, likely viewed as News is likely viewed unfavorably.', 'Neutral - News is seen as neither clearly good nor bad.': '1 articles, likely viewed as News is seen as neither clearly good nor bad.'}
Opinion Score: 0.30 (Positive Articles: 3, Total Articles: 10)


24/11/23 08:00:17 WARN DAGScheduler: Broadcasting large task binary with size 8.1 MiB
                                                                                