## test phase

In [None]:
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
import gcsfs

# Replace with your actual GCS path
gcs_path = 'gs://mixedstorage-gary-ba870/news/parquet/A.parquet'

# Use GCSFS to access the file
fs = gcsfs.GCSFileSystem()

# Read Parquet file into Pandas DataFrame
with fs.open(gcs_path, 'rb') as f:
    df = pd.read_parquet(f)

# Display the DataFrame
print(df.head())


  ticker                                              title  \
0      A           Stocks That Hit 52-Week Highs On Tuesday   
1      A          Stocks That Hit 52-Week Highs On Thursday   
2      A            Stocks That Hit 52-Week Highs On Friday   
3      A            Stocks That Hit 52-Week Highs On Monday   
4      A  Agilent Wins Trade Secret, Patent Infringement...   

                                              teaser  \
0   \n\nBefore 10 a.m. ET Tuesday, 106 stocks hit...   
1   \n\n \n\nThursday&#39;s morning session saw 6...   
2   \n\n \n\nThis morning 470 companies set new 5...   
3   \n\nThis morning 127 companies reached new 52...   
4                                          -Reuters    

                         published  \
0  Tue, 05 Jan 2021 11:26:27 -0400   
1  Thu, 07 Jan 2021 15:19:48 -0400   
2  Fri, 08 Jan 2021 11:22:53 -0400   
3  Mon, 11 Jan 2021 11:19:47 -0400   
4  Mon, 01 Feb 2021 12:02:28 -0400   

                                                 url  


In [None]:
df.head()

Unnamed: 0,ticker,title,teaser,published,url
0,A,Stocks That Hit 52-Week Highs On Tuesday,"\n\nBefore 10 a.m. ET Tuesday, 106 stocks hit...","Tue, 05 Jan 2021 11:26:27 -0400",https://www.benzinga.com/news/21/01/19019861/s...
1,A,Stocks That Hit 52-Week Highs On Thursday,\n\n \n\nThursday&#39;s morning session saw 6...,"Thu, 07 Jan 2021 15:19:48 -0400",https://www.benzinga.com/news/21/01/19065209/s...
2,A,Stocks That Hit 52-Week Highs On Friday,\n\n \n\nThis morning 470 companies set new 5...,"Fri, 08 Jan 2021 11:22:53 -0400",https://www.benzinga.com/news/21/01/19078174/s...
3,A,Stocks That Hit 52-Week Highs On Monday,\n\nThis morning 127 companies reached new 52...,"Mon, 11 Jan 2021 11:19:47 -0400",https://www.benzinga.com/news/21/01/19102854/s...
4,A,"Agilent Wins Trade Secret, Patent Infringement...",-Reuters,"Mon, 01 Feb 2021 12:02:28 -0400",https://www.benzinga.com/news/21/02/19423085/a...


## Obtain news

In [None]:
!pip install benzinga

In [None]:
import os
import io
import time
import json
import pandas as pd
from datetime import datetime, timedelta
from benzinga import news_data
from concurrent.futures import ThreadPoolExecutor, as_completed
from google.cloud import storage



In [None]:
!pip install lxml

In [None]:
url_2024 = 'https://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
sp500_2024_df = pd.read_html(url_2024)[0]
sp500_2024_df['Date added'] = pd.to_datetime(sp500_2024_df['Date added'], errors='coerce')

# Filter rows where Date added is after 2020
sp500_ticks = sp500_2024_df[sp500_2024_df['Date added'] <= '2021-01-01']['Symbol'].tolist()

In [None]:
len(sp500_ticks)

437

In [None]:
sp500_ticks[:5]

In [None]:
for tick in sp500_ticks:
    if tick == 'GOOG':
        print('GOOG')

In [None]:
# ------------------- setting -------------------
API_KEY = 'bz.5ROYZKIAOA6AYH63C5LGLLXYTDKT2LEB'
TICKERS = sp500_ticks
START_DATE = datetime(2021, 1, 1)
END_DATE = datetime(2024, 12, 31)
INTERVAL_DAYS = 180
MAX_WORKERS = 4

# GCS setting
GCS_BUCKET = 'mixedstorage-gary-ba870'
GCS_CHECKPOINT_PREFIX = 'news/checkpoints/'
GCS_PARQUET_PREFIX = 'news/parquet/'


# initiation
news_client = news_data.News(API_KEY)
storage_client = storage.Client()
bucket = storage_client.bucket(GCS_BUCKET)


In [None]:
a = news_client.news(
                    company_tickers='GOOG',
                    date_from=datetime(2021, 1, 1).strftime('%Y-%m-%d'),
                    date_to=datetime(2021, 1, 15).strftime('%Y-%m-%d'),
                    pagesize=100,
                    page=0,
                    display_output='abstract'
                )

a

In [None]:
all_news = []
for article in a:
                all_news.append({
                    'ticker': 'GOOG',
                    'title': article.get('title'),
                    'author':article.get('author'),
                    'teaser': article.get('teaser'),
                    'published': article.get('created')
                })
all_news = pd.DataFrame(all_news)
all_news.head()

In [None]:
all_news.count()

In [None]:
all_news.author.value_counts()

In [None]:
def read_checkpoint(ticker: str) -> set:
    """
    read checkpoint from GCS adn return a set
    """
    checkpoint_blob = bucket.blob(f'{GCS_CHECKPOINT_PREFIX}{ticker}.json')
    if checkpoint_blob.exists():
        content = checkpoint_blob.download_as_text()
        return set(json.loads(content))
    return set()


def write_checkpoint(ticker: str, processed_ranges: set):
    """
    send processed file to  GCS checkpoint
    """
    checkpoint_blob = bucket.blob(f'{GCS_CHECKPOINT_PREFIX}{ticker}.json')
    checkpoint_blob.upload_from_string(json.dumps(sorted(processed_ranges)))


def fetch_news_for_ticker(ticker: str):
    """
    get news for a ticker and upload to GCS
    """
    print(f"\n========== begin：{ticker} ==========")
    processed_ranges = read_checkpoint(ticker)
    current_start = START_DATE
    all_data = []

    while current_start < END_DATE:
        current_end = min(current_start + timedelta(days=INTERVAL_DAYS), END_DATE)
        date_key = f"{current_start.date()}_{current_end.date()}"

        if date_key in processed_ranges:
            print(f"[{ticker}] skip the period:{date_key}")
            current_start = current_end
            continue

        date_from_str = current_start.strftime('%Y-%m-%d')
        date_to_str = current_end.strftime('%Y-%m-%d')
        page = 0

        print(f"[{ticker}] obtaining period：{date_key}")
        while True:
            try:
                response = news_client.news(
                    company_tickers=ticker,
                    date_from=date_from_str,
                    date_to=date_to_str,
                    pagesize=100,
                    page=page,
                    display_output='abstract'
                )
            except Exception as e:
                print(f"❌ error [{ticker}] {date_key} page{page}：{e}")
                break

            if not response:
                print(f"[{ticker}] {date_key} page{page}no more news, end.")
                break


            valid_articles = []
            for article in response:
                teaser = article.get('teaser')
                # tickers_in_article = article.get('stocks', [])
                if teaser: #and (ticker in tickers_in_article):
                    valid_articles.append({
                        'ticker': ticker,
                        'title': article.get('title'),
                        'teaser': teaser,
                        'published': article.get('created'),
                        'url': article.get('url')
                    })

            all_data.extend(valid_articles)
            print(f"[{ticker}] {date_key} page{page} get {len(valid_articles)} valid news (original {len(response)})")

            page += 1
            time.sleep(0.4)

        #  checkpoint
        processed_ranges.add(date_key)
        write_checkpoint(ticker, processed_ranges)

        current_start = current_end

    # upload Parquet
    if all_data:
        df = pd.DataFrame(all_data)
        parquet_buf = io.BytesIO()
        df.to_parquet(parquet_buf, engine='pyarrow', index=False)
        parquet_buf.seek(0)

        parquet_path = f'{GCS_PARQUET_PREFIX}{ticker}.parquet'
        blob = bucket.blob(parquet_path)
        blob.upload_from_file(parquet_buf, content_type='application/octet-stream')

        print(f"[{ticker}] ✅ uploaded {len(df)} news：gs://{GCS_BUCKET}/{parquet_path}")
    else:
        print(f"[{ticker}] ⚠ no more ")

    print(f"[{ticker}] end")

In [None]:

def main():
    for ticker in TICKERS:
        fetch_news_for_ticker(ticker)
    print("\n✅ process done！")


if __name__ == '__main__':
    main()

## Merge news files

In [None]:
import pyarrow.parquet as pq
import pyarrow as pa
import gcsfs
import os

bucket_path = 'mixedstorage-gary-ba870'
folder_path = 'news/parquet/'
output_file = 'merged_stocknews.parquet'
output_path = f'{folder_path}{output_file}'

fs = gcsfs.GCSFileSystem()


file_paths = fs.ls(bucket_path + '/' + folder_path)
parquet_files = [f'gs://{path}' for path in file_paths if path.endswith('.parquet')]

tables = []
for file in parquet_files:
    with fs.open(file, 'rb') as f:
        table = pq.read_table(f)
        tables.append(table)


combined_table = pa.concat_tables(tables)


with fs.open(f'gs://{bucket_path}/{output_path}', 'wb') as f:
    pq.write_table(combined_table, f)




In [None]:
print("Found files:", file_paths)
print("Filtered parquet files:", parquet_files)


Found files: ['mixedstorage-gary-ba870/news/checkpoints', 'mixedstorage-gary-ba870/news/parquet']
Filtered parquet files: []
