In [0]:
%pip install pandas nltk


In [0]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ArrayType, StringType
import pandas as pd
import numpy as np
import os

NLTK_INITIALIZED = False

def initialize_nltk():
    global NLTK_INITIALIZED
    import nltk
    
    if not NLTK_INITIALIZED:
        NLTK_DATA_PATH = '/tmp/nltk_data'
        os.makedirs(NLTK_DATA_PATH, exist_ok=True)
        
        nltk.data.path.append(NLTK_DATA_PATH)
        try:
            nltk.data.find('tokenizers/punkt')
        except LookupError:
            try:
                nltk.download('punkt_tab', download_dir=NLTK_DATA_PATH, quiet=True)
                nltk.download('punkt', download_dir=NLTK_DATA_PATH, quiet=True)
                nltk.download('vader_lexicon', download_dir=NLTK_DATA_PATH, quiet=True)
            except:
                pass
        
        NLTK_INITIALIZED = True

@pandas_udf(ArrayType(StringType()))
def filter_relevant_sentences(content_series: pd.Series, company_name_series: pd.Series) -> pd.Series:
    initialize_nltk()
    from nltk.tokenize import sent_tokenize
    
    output_rows = []
    
    for article, company in zip(content_series, company_name_series):
        if not article or not isinstance(article, str) or not company:
            output_rows.append([])
            continue
            
        try:
            sentences = sent_tokenize(article)
        except Exception:
            output_rows.append([])
            continue
            
        target_company = str(company).lower().strip()
        
        relevant = [
            sent for sent in sentences 
            if target_company in sent.lower()
        ]
        
        output_rows.append(relevant)
        
    return pd.Series(output_rows)

In [0]:
from pyspark.sql.functions import pandas_udf
import pandas as pd
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer

# Initialize NLTK VADER (Small download, usually allowed or easy to bundle)
nltk.download('vader_lexicon', download_dir='/tmp/nltk_data')
nltk.data.path.append('/tmp/nltk_data')

@pandas_udf('double')
def vader_sentiment(text_series: pd.Series) -> pd.Series:
    analyzer = SentimentIntensityAnalyzer()
    scores = []

    for text in text_series:
        if not text:
            scores.append(0.0)
            continue
        # 'compound' score ranges from -1 (Neg) to +1 (Pos)
        scores.append(analyzer.polarity_scores(text)['compound'])

    return pd.Series(scores)

In [0]:
import pyspark.sql.functions as sf

companies_df = spark.read.table("stock_prediction.default.companies")

# Removes punctuation
companies_df = companies_df.withColumn(
    "clean_name", 
    sf.regexp_replace(sf.col("name"), '[^a-zA-Z0-9\\s]', '')
)

regex_pattern = r"\b(Inc|Corporation|Incorporated|Corp|Ltd|Co)\b"
companies_df = companies_df.withColumn(
    "clean_name", 
    sf.regexp_replace(sf.col("clean_name"), regex_pattern, '')
)

companies_df = companies_df.withColumn(
    "clean_name",
    sf.lower(sf.col("clean_name"))
)

companies_df = companies_df.withColumnRenamed("id", "company_id")

display(companies_df)

In [0]:
from pyspark.sql.functions import col, expr

articles_df = spark.read.table("stock_prediction.default.articles")

# Cross join articles with companies to check for company mentions in article content
joined_df = articles_df.crossJoin(companies_df)

# Filter where clean_name is contained in content_cleaned (case-insensitive)
joined_df = joined_df.filter(
    expr("lower(content_cleaned) LIKE concat('%', clean_name, '%')")
)

joined_df = joined_df.select("id", "company_id", "name", "title", "clean_name", "content_cleaned", "published_at")

articles_with_relevant_sentences = joined_df.withColumn(
    "filtered_sentences", 
    filter_relevant_sentences(col("content_cleaned"), col("clean_name"))
)

display(articles_with_relevant_sentences.limit(100))

In [0]:
from pyspark.sql.functions import col, explode, when, avg, expr

final_df = articles_with_relevant_sentences.select(
    "id", 
    "company_id", 
    "name",
    "title",
    "published_at",
    explode("filtered_sentences").alias("sentence")
)

final_df = final_df.withColumn(
    "numeric_score", 
    vader_sentiment(col("sentence"))
)

final_df = final_df.groupBy("id", "company_id") \
    .agg(
        expr("first(title)").alias("article_title"),
        expr("first(name)").alias("name"),
        expr("first(published_at)").alias("published_date"),
        avg("numeric_score").alias("score")
    )

final_df = final_df.select(col("id").alias("article_id"), "company_id", "score")
final_df.createOrReplaceTempView("articles_sentiment_scored_view")
display(final_df)

In [0]:
%sql

MERGE INTO stock_prediction.default.article_sentiment_scored AS target
USING articles_sentiment_scored_view AS source
ON target.article_id = source.article_id AND target.company_id = source.company_id

WHEN MATCHED THEN
  UPDATE SET score = source.score
WHEN NOT MATCHED THEN
  INSERT (`article_id`, `company_id`, `score`)
  VALUES (source.article_id, source.company_id, source.score)