# TextAnalysisProject Implementation Notebook

This notebook documents the steps that have been implemented so far in the **TextAnalysisProject**. It verifies:
1. Raw data files in S3.
2. Glue Data Catalog tables created by Crawlers.
3. The Glue ETL job for lexicon-based daily sentiment scoring.


## 1. Prerequisites

- AWS CLI must be configured with an IAM user or role that has access to the `TextDataProjectRole`.
- Required Python packages:
  ```bash
  pip install boto3 pyathena s3fs pandas
  ```
- The IAM role `TextDataProjectRole` should be able to access:
  - S3 buckets: `textdataproject-raw-snpnews-zitian`, `textdataproject-raw-lexicon-zitian`, `textdataproject-processed-features`, `textdataproject-query-results`
  - Glue Catalog and jobs
  - Athena queries


## 2. Download Data from Kaggle

Before uploading data to S3, we need to download the raw data from Kaggle. This section documents the process of downloading the required datasets.

In [None]:
aws configure
pip3 install kaggle
mkdir -p ~/.kaggle
mv ~/Downloads/kaggle.json ~/.kaggle/
chmod 600 ~/.kaggle/kaggle.json

kaggle datasets download -d dyutidasmahaptra/s-and-p-500-with-financial-news-headlines-20082024
unzip s-and-p-500-with-financial-news-headlines-20082024.zip -d snp_news

kaggle datasets download -d dyutidasmahaptra/financial-sentiment-lexicon
unzip financial-sentiment-lexicon.zip -d lexicon

## 3. Upload Data to S3

After downloading the datasets, we need to upload them to their respective S3 buckets for further processing.

In [None]:
aws s3 cp ./snp_news/ s3://textdataproject-raw-snpnews-zitian/ --recursive
aws s3 cp ./lexicon/ s3://textdataproject-raw-lexicon-zitian/ --recursive

## 4. Verify Raw Data in S3

We first check that the raw headline CSV (`sp500_headlines_2008_2024.csv`) and the lexicon CSV (`financial_sentiment_lexicon.csv`) are present in their respective buckets.


In [5]:
import boto3

s3 = boto3.client('s3')

# List objects in raw headlines bucket
print("Contents of textdataproject-raw-snpnews-zitian:")
resp1 = s3.list_objects_v2(Bucket="textdataproject-raw-snpnews-zitian", Prefix="")
for obj in resp1.get("Contents", []):
    print("-", obj["Key"])

# List objects in raw lexicon bucket
print("\nContents of textdataproject-raw-lexicon-zitian:")
resp2 = s3.list_objects_v2(Bucket="textdataproject-raw-lexicon-zitian", Prefix="")
for obj in resp2.get("Contents", []):
    print("-", obj["Key"])


Contents of textdataproject-raw-snpnews-zitian:
- sp500_headlines_2008_2024.csv

Contents of textdataproject-raw-lexicon-zitian:
- financial_sentiment_lexicon.csv


## 5. Verify Glue Data Catalog Tables

Glue Crawlers should have created:

- Database `snpnews_db` with table `sp500_headlines_2008_2024_csv`
- Database `lexicon_db` with table `financial_sentiment_lexicon_csv`

We use boto3 to list Glue databases and tables.


In [6]:
import boto3

glue = boto3.client('glue')

# List Glue databases
dbs = glue.get_databases()
print("Glue Databases:")
for db in dbs['DatabaseList']:
    print("-", db['Name'])

# List tables in snpnews_db
print("\nTables in snpnews_db:")
tables1 = glue.get_tables(DatabaseName="snpnews_db")
for t in tables1['TableList']:
    print("-", t['Name'])

# List tables in lexicon_db
print("\nTables in lexicon_db:")
tables2 = glue.get_tables(DatabaseName="lexicon_db")
for t in tables2['TableList']:
    print("-", t['Name'])


Glue Databases:
- lexicon_db
- snpnews_db

Tables in snpnews_db:
- sp500_headlines_2008_2024_csv

Tables in lexicon_db:
- financial_sentiment_lexicon_csv


## 6. Verify Glue ETL Job Output

The Glue ETL job `glue_job_news_sentiment_daily` should have written daily sentiment Parquet files to:
```
s3://textdataproject-processed-features/daily_sentiment_with_cp/
```
We list the contents of that prefix to confirm.


In [7]:
# List output files of Glue ETL job
resp3 = s3.list_objects_v2(
    Bucket="textdataproject-processed-features-zitian", 
    Prefix="daily_sentiment_with_cp/")
print("Contents of daily_sentiment_with_cp:")
for obj in resp3.get("Contents", []):
    print("-", obj["Key"])

resp4 = s3.list_objects_v2(
    Bucket="textdataproject-processed-features-zitian",
    Prefix="clean_headlines_for_finbert/"
)
print("Contents of clean_headlines_for_finbert:")
for obj in resp4.get("Contents", []):
    print("-", obj["Key"])


Contents of daily_sentiment_with_cp:
- daily_sentiment_with_cp/date=2008-01-02/part-00000-d0acb746-624d-4ede-b280-0e2172efda57.c000.snappy.parquet
- daily_sentiment_with_cp/date=2008-01-03/part-00000-d0acb746-624d-4ede-b280-0e2172efda57.c000.snappy.parquet
- daily_sentiment_with_cp/date=2008-01-07/part-00000-d0acb746-624d-4ede-b280-0e2172efda57.c000.snappy.parquet
- daily_sentiment_with_cp/date=2008-01-09/part-00000-d0acb746-624d-4ede-b280-0e2172efda57.c000.snappy.parquet
- daily_sentiment_with_cp/date=2008-01-10/part-00000-d0acb746-624d-4ede-b280-0e2172efda57.c000.snappy.parquet
- daily_sentiment_with_cp/date=2008-01-22/part-00000-d0acb746-624d-4ede-b280-0e2172efda57.c000.snappy.parquet
- daily_sentiment_with_cp/date=2008-01-29/part-00000-d0acb746-624d-4ede-b280-0e2172efda57.c000.snappy.parquet
- daily_sentiment_with_cp/date=2008-01-30/part-00000-d0acb746-624d-4ede-b280-0e2172efda57.c000.snappy.parquet
- daily_sentiment_with_cp/date=2008-02-01/part-00000-d0acb746-624d-4ede-b280-0e2172

## 7. Verify Athena Table for Daily Sentiment

We check that the Athena table `sentiment_db.daily_sentiment_with_cp` exists and can return sample rows.


In [12]:
from pyathena import connect
import pandas as pd

conn = connect(
    s3_staging_dir='s3://textdataproject-query-results-zitian/', 
    region_name='eu-north-1'
)

# list all table in sentiment_db
print("Athena Tables in sentiment_db:")
tables = pd.read_sql("SHOW TABLES IN sentiment_db", conn)
print(tables)

print("\nSample rows from daily_sentiment_with_cp:")
sample_sentiment = pd.read_sql("""
    SELECT date, daily_total_score, daily_pos_count, daily_neg_count, daily_cp
    FROM sentiment_db.daily_sentiment_with_cp
    ORDER BY date
    LIMIT 5
""", conn)
print(sample_sentiment)

print("\nSample rows from clean_headlines_for_finbert:")
sample_headlines = pd.read_sql("""
    SELECT date, title_clean
    FROM sentiment_db.clean_headlines_for_finbert
    ORDER BY date
    LIMIT 5
""", conn)
print(sample_headlines)


Athena Tables in sentiment_db:


  tables = pd.read_sql("SHOW TABLES IN sentiment_db", conn)


                      tab_name
0  clean_headlines_for_finbert
1      daily_sentiment_with_cp

Sample rows from daily_sentiment_with_cp:


  sample_sentiment = pd.read_sql("""


         date  daily_total_score  daily_pos_count  daily_neg_count  daily_cp
0  2008-01-02                0.0                0                0   1447.16
1  2008-01-03                0.0                0                0   1447.16
2  2008-01-07                0.0                0                0   1416.18
3  2008-01-09               -0.6                0                1   1409.13
4  2008-01-10                0.0                0                0   1420.33

Sample rows from clean_headlines_for_finbert:


  sample_headlines = pd.read_sql("""


         date                                        title_clean
0  2008-01-02                        predictions for the s p    
1  2008-01-02  dow tallies biggest first session of year poin...
2  2008-01-02   jpmorgan predicts      will be  nothing but net 
3  2008-01-03  u s  stocks higher after economic data  monsan...
4  2008-01-07  u s  stocks climb as hopes increase for more f...


## 8.

In [None]:
import pandas as pd
import boto3
from pyathena import connect
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch

conn = connect(
    s3_staging_dir='s3://textdataproject-query-results-zitian/',
    region_name='eu-north-1'
)

query = """
SELECT date, title_clean
FROM sentiment_db.clean_headlines_for_finbert
"""
df = pd.read_sql(query, conn)

# load FinBERT model and tokenizer
tokenizer = AutoTokenizer.from_pretrained("yiyanghkust/finbert-tone")
model = AutoModelForSequenceClassification.from_pretrained("yiyanghkust/finbert-tone")

labels = ["negative", "neutral", "positive"]
def finbert_predict(text_list, batch_size=16):
    all_labels = []
    all_scores = []
    for i in range(0, len(text_list), batch_size):
        batch_texts = text_list[i:i+batch_size]
        enc = tokenizer(batch_texts, padding=True, truncation=True, return_tensors="pt")
        with torch.no_grad():
            outputs = model(**enc)
            probs = torch.nn.functional.softmax(outputs.logits, dim=-1).cpu().numpy()
        
        for p in probs:
            idx = p.argmax()
            all_labels.append(labels[idx])
            all_scores.append(float(p[idx]))
    return all_labels, all_scores

df["sentiment_label"], df["sentiment_score"] = finbert_predict(df["title_clean"].tolist())

print(df.head())


---
**Next Steps (not yet implemented here):**
1. Run SageMaker Notebook for FinBERT sentiment analysis.
2. Aggregate FinBERT results into `daily_sentiment_finbert`.
3. Prepare quarterly documents and run EMR Spark LDA.
4. (Optional) Extract entities/events.
5. Merge all features in Athena into `daily_full_features`.
6. Visualize in QuickSight and model in SageMaker.
