In [1]:
import os
import sys
import pandas as pd
from tqdm import tqdm
import requests
from sqlalchemy.orm import Session
from dotenv import load_dotenv

load_dotenv()
News_API_KEYS = os.environ.get('News_API_KEYS')
News_API_KEYS = News_API_KEYS.split(',')
News_API_KEYS = [key.strip() for key in News_API_KEYS]

# modify sys.path for it to contain the main repo path so we can import modules such as below
parent_dir = os.path.abspath(os.path.join(os.getcwd(), '../..'))
if parent_dir not in sys.path:
    sys.path.insert(0, parent_dir)

from db.db_conn import engine, session_scope, ping_db
from db.models import DimensionOrganization, DimensionNews

In [2]:
ping_db(engine)

True

In [8]:
def get_organisation(organisation: str, next_api_key_index: int) -> pd.DataFrame:
    if organisation is None:
        raise ValueError('No organisation provided.')
    
    while next_api_key_index < len(News_API_KEYS):
        try:
            # Define the API endpoint and parameters
            url = "https://api.thenewsapi.com/v1/news/all"
            params = {
                'api_token': News_API_KEYS[next_api_key_index],
                'language': 'en',
                'search': organisation,
                'published_after': '2024-07-01'
            }

            # Make the GET request
            response = requests.get(url, params=params)
            response.raise_for_status()  # Raises an error for bad responses
            news_data = response.json().get('data', [])
            
            return pd.DataFrame(news_data), next_api_key_index

        except Exception as e:
            if response.status_code == 402:
                print(f"API rate limit exceeded for key {next_api_key_index}. Switching to next API key.")
                next_api_key_index += 1
            else:
                print(f"Could not get news for {organisation} due to error: {e}")
                break

    return pd.DataFrame(), next_api_key_index  # Return empty DataFrame on error
    
def collect_news_data(session):
    news_data = []
    next_api_key_index = 0

    try:
        # Retrieve all organizations
        records = session.query(DimensionOrganization).all()
        for record in tqdm(records, total=len(records), desc="Collecting news data..."):
            organization_name = record.organization_name
            news_df, next_api_key_index = get_organisation(organization_name, next_api_key_index)

            # Ensure we have data to insert
            if not news_df.empty:
                for _, row in news_df.iterrows():
                    # Append each news item as a dictionary to the list, including uuid
                    news_data.append({
                        "organization_id": record.organization_id,  # Using organization_id for relationship
                        "uuid": row.get("uuid"),  # Add uuid field here
                        "title": row.get("title"),
                        "description": row.get("description"),
                        "keywords": row.get("keywords"),
                        "snippet": row.get("snippet"),
                        "url": row.get("url"),
                        "image_url": row.get("image_url"),
                        "language": row.get("language"),
                        "published_at": row.get("published_at"),
                        "source": row.get("source"),
                        "categories": row.get("categories")
                    })
    
    except Exception as e:
        print(e)
        print("Error occurred while collecting news data.")

    # Convert the list of dictionaries to a DataFrame
    news_df_final = pd.DataFrame(news_data)
    return news_df_final

def add_news_data_to_db(news_df, session: Session):
    try:
        # Iterate over each row in the DataFrame and insert into DimensionNews table if not a duplicate
        for _, row in news_df.iterrows():
            # Check if the news entry already exists using the uuid
            existing_entry = session.query(DimensionNews).filter_by(uuid=row.get("uuid")).first()

            # Only add the entry if it does not already exist
            if not existing_entry:
                news_entry = DimensionNews(
                    organization_id=row["organization_id"],
                    uuid=row.get("uuid"),  # Storing uuid from API
                    title=row.get("title"),
                    description=row.get("description"),
                    keywords=row.get("keywords"),
                    snippet=row.get("snippet"),
                    url=row.get("url"),
                    image_url=row.get("image_url"),
                    language=row.get("language"),
                    published_at=row.get("published_at"),
                    source=row.get("source"),
                    categories=row.get("categories")
                )
                session.add(news_entry)  # Add each news item to the session

        # Commit all entries at once after adding them to the session
        session.commit()
        print("News data added to DimensionNews table successfully.")

    except Exception as e:
        session.rollback()  # Rollback in case of error
        print(f"Error occurred while adding news data to database: {e}")

In [9]:
# Collect News Data
with session_scope() as session:
    news_df = collect_news_data(session)

Collecting news data...:   0%|          | 12/2593 [00:05<20:50,  2.06it/s]


KeyboardInterrupt: 

In [None]:
# Display or print the DataFrame for verification
print(news_df.head())

In [None]:
# Populate News to new fact table
with session_scope() as session:
    add_news_data_to_db(news_df, session)