In [None]:
# working on fixing bugs
# **Section 1: Set Up**
#------Imports--------#
import pandas as pd
import datetime
import pickle
import numpy as np
import requests
import time
import os
from tqdm import tqdm
from datetime import datetime, timedelta
import functions_framework
from google.cloud import storage
import io

# --- CONFIGURATION ---
GCS_BUCKET_NAME = os.environ.get('GCS_BUCKET_NAME', 'solar_system_bucket')
GCS_FILE_EXTENSION = ".csv"

####################################################################
# Read CSV from cloud function
def load_data_from_gcs(GCS_BUCKET_NAME, GCS_EXACT_FILE_NAME):
    """
    Downloads a specific CSV file from the specified GCS bucket
    and reads it into a pandas DataFrame.
    """
    print(f"Attempting to load data from gs://{GCS_BUCKET_NAME}/{GCS_EXACT_FILE_NAME}")

    try:
        storage_client = storage.Client()
        bucket = storage_client.bucket(GCS_BUCKET_NAME)
        blob = bucket.blob(GCS_EXACT_FILE_NAME)

        if not blob.exists():
            print(f"File not found: gs://{GCS_BUCKET_NAME}/{GCS_EXACT_FILE_NAME}")
            return pd.DataFrame() # Return empty dataframe if file doesn't exist

        data_buffer = io.BytesIO()
        blob.download_to_file(data_buffer)
        data_buffer.seek(0)
        df = pd.read_csv(data_buffer)
        print(f"Successfully loaded {len(df)} rows from GCS into DataFrame.")
        return df

    except Exception as e:
        import traceback
        print(f"Error loading data from GCS: {e}")
        traceback.print_exc()
        return pd.DataFrame({'Error': [f"Failed to load data: {e}"]}, index=[0])

# This function is now triggered by a Pub/Sub message, not HTTP
@functions_framework.cloud_event
def process_data_pipeline(cloud_event):
    """
    This function contains the actual data processing logic. It is triggered by a Pub/Sub event.
    """
    print("Pub/Sub trigger received. Starting data pipeline...")
    storage_client = storage.Client()
    bucket = storage_client.bucket(GCS_BUCKET_NAME)
    actual_gcs_bucket_name = GCS_BUCKET_NAME

    ##########################################################
    #EODHD nasdaq_df API
    API_KEY = '68433aff09ea73.10710364'
    EXCHANGE = 'NASDAQ'
    DAYS_BACK = 180
    MAX_CALLS_PER_RUN = 200
    SECONDS_BETWEEN_CALLS = 0

    today = datetime.utcnow().date()
    dates = [today - timedelta(days=i) for i in range(DAYS_BACK)]
    dates = sorted([d for d in dates if d.weekday() < 5])

    downloaded_dates = set()
    nasdaq_blob_name = 'nasdaq_df.csv'
    nasdaq_blob = bucket.blob(nasdaq_blob_name)
    df_existing = pd.DataFrame()

    if nasdaq_blob.exists():
        df_existing = load_data_from_gcs(GCS_BUCKET_NAME, nasdaq_blob_name)
        if not df_existing.empty and 'date' in df_existing.columns:
            df_existing['date'] = pd.to_datetime(df_existing['date']).dt.date
            downloaded_dates = set(df_existing['date'])

    pending_dates = [d for d in dates if d not in downloaded_dates]
    all_data = []

    dates_to_fetch = pending_dates
    if not dates_to_fetch and not downloaded_dates:
        dates_to_fetch = dates[:MAX_CALLS_PER_RUN]
    elif len(dates_to_fetch) > MAX_CALLS_PER_RUN:
        dates_to_fetch = pending_dates[:MAX_CALLS_PER_RUN]

    for i, date in enumerate(tqdm(dates_to_fetch, desc="Fetching EODHD data")):
        date_str = date.strftime('%Y-%m-%d')
        url = f'https://eodhd.com/api/eod-bulk-last-day/{EXCHANGE}?api_token={API_KEY}&fmt=json&date={date_str}'
        print(f"[{i+1}] Fetching {date_str}...")
        try:
            response = requests.get(url)
            response.raise_for_status()
            day_data = response.json()
            for entry in day_data:
                all_data.append({
                    'date': entry.get('date'), 'ticker': entry.get('code'),
                    'open': entry.get('open'), 'high': entry.get('high'),
                    'low': entry.get('low'), 'close': entry.get('close'),
                    'adjusted_close': entry.get('adjusted_close'), 'volume': entry.get('volume'),
                })
            if i < len(dates_to_fetch) - 1:
                time.sleep(SECONDS_BETWEEN_CALLS)
        except Exception as e:
            print(f"⚠️ Error on {date_str}: {e}")

    if all_data:
        df_new = pd.DataFrame(all_data)
        df_new['date'] = pd.to_datetime(df_new['date'])

        if not df_existing.empty:
            df_existing['date'] = pd.to_datetime(df_existing['date'])
            df_combined = pd.concat([df_existing, df_new], ignore_index=True)
        else:
            df_combined = df_new

        csv_string = df_combined.to_csv(index=False)
        nasdaq_blob.upload_from_string(csv_string, content_type='text/csv')
        print(f"Successfully saved data to gs://{actual_gcs_bucket_name}/{nasdaq_blob_name}")

    elif downloaded_dates:
         print("⚠️ No new data fetched, but existing data found.")
    else:
        print("⚠️ No new data fetched and no existing data found. Aborting.")
        return

    nasdaq_df = load_data_from_gcs(GCS_BUCKET_NAME, nasdaq_blob_name)
    if nasdaq_df.empty:
        print("Could not load nasdaq_df after processing. Aborting.")
        return
    nasdaq_df['date'] = pd.to_datetime(nasdaq_df['date'])

    #############################################################################
    # EODHD screener_data_df API
    MIN_MARKET_CAP = 10_000_000_000
    RESULTS_PER_PAGE = 500

    def get_filtered_nasdaq_stocks(api_key, min_cap, exchange="NASDAQ"):
        screener_all_data = []
        offset = 0
        while True:
            url = (
                "https://eodhd.com/api/screener"
                f"?api_token={api_key}&filters=[[\"exchange\",\"=\",\"{exchange}\"],"
                f"[\"market_capitalization\",\">=\",{min_cap}]]"
                f"&sort=market_capitalization.desc&limit={RESULTS_PER_PAGE}&offset={offset}&fmt=json"
            )
            response = requests.get(url)
            result = response.json()
            batch = result.get("data", [])
            if not batch: break
            screener_all_data.extend(batch)
            offset += RESULTS_PER_PAGE
        return pd.DataFrame(screener_all_data)

    screener_data_df = get_filtered_nasdaq_stocks(API_KEY, MIN_MARKET_CAP)
    meta_url = f'https://eodhd.com/api/exchange-symbol-list/NASDAQ?api_token={API_KEY}&fmt=json'
    meta_df = pd.DataFrame(requests.get(meta_url).json())
    common_df = meta_df[meta_df['Type'] == 'Common Stock'].copy()
    screener_data_df = pd.merge(screener_data_df, common_df[['Code', 'Country', 'Exchange', 'Currency', 'Type']], left_on='code', right_on='Code', how='inner').drop('Code', axis=1)

    temp_nasdaq_sorted = nasdaq_df.sort_values(by=['ticker', 'date']).copy()
    temp_nasdaq_sorted['prev_adjusted_close'] = temp_nasdaq_sorted.groupby('ticker')['adjusted_close'].shift(1)
    temp_nasdaq_sorted['daily_change'] = ((temp_nasdaq_sorted['adjusted_close'] - temp_nasdaq_sorted['prev_adjusted_close']) / temp_nasdaq_sorted['prev_adjusted_close']) * 100
    last_day_data_per_ticker = temp_nasdaq_sorted.groupby('ticker').tail(1).copy()
    last_day_changes = last_day_data_per_ticker.set_index('ticker')['daily_change'].to_dict()
    screener_data_df['last_day_change'] = screener_data_df['code'].map(last_day_changes)

    screener_blob_name = "screener_data_df.csv"
    screener_blob = bucket.blob(screener_blob_name)
    csv_string = screener_data_df.to_csv(index=False)
    screener_blob.upload_from_string(csv_string, content_type='text/csv')
    print(f"Successfully saved data to gs://{actual_gcs_bucket_name}/{screener_blob_name}")

    #################################################################################
    ## Filter nasdaq data
    filtered_nasdaq_df = nasdaq_df[nasdaq_df['ticker'].isin(screener_data_df[screener_data_df['Type'] == 'Common Stock']['code'])]

    filtered_nasdaq_blob_name = "filtered_nasdaq_df.csv"
    filtered_nasdaq_blob = bucket.blob(filtered_nasdaq_blob_name)
    csv_string = filtered_nasdaq_df.to_csv(index=False)
    filtered_nasdaq_blob.upload_from_string(csv_string, content_type='text/csv')
    print(f"Successfully saved data to gs://{actual_gcs_bucket_name}/{filtered_nasdaq_blob_name}")

    ##############################################################################
    # Function: Correlation Coeficient (Optimized)
    def calculate_lagged_correlation(df, lag_days, range_months):
      """
      Calculates the pairwise spearman correlation coefficient with a lag.
      This version is optimized to be much faster by pivoting the data first.
      """
      print(f"Starting optimized correlation calculation for {range_months} months...")
      end_datetime = datetime.now()
      start_datetime = end_datetime - pd.DateOffset(months=range_months)

      df['date'] = pd.to_datetime(df['date'])
      filtered_df_corr = df[(df['date'] >= start_datetime) & (df['date'] <= end_datetime)].copy()
      filtered_df_corr = filtered_df_corr[filtered_df_corr['volume'] > 0]

      # Pivot the data to have tickers as columns and dates as the index
      # This is the core of the optimization
      print("Pivoting data for faster processing...")
      pivot_df = filtered_df_corr.pivot(index='date', columns='ticker', values='adjusted_close')

      tickers = pivot_df.columns
      correlation_matrix = pd.DataFrame(index=tickers, columns=tickers, dtype=float)

      # Loop through all pairs of tickers
      for ticker_a in tqdm(tickers, desc=f"Calculating {range_months}-month correlations"):
        for ticker_b in tickers:
          if ticker_a != ticker_b:
            # Correlate stock A with a lagged version of stock B
            # This is much faster because the data is already aligned by the pivot
            correlation = pivot_df[ticker_a].corr(pivot_df[ticker_b].shift(lag_days), method='spearman')
            correlation_matrix.loc[ticker_a, ticker_b] = correlation

      print(f"Finished correlation calculation for {range_months} months.")
      return correlation_matrix

    ############################################################################
    ## Run Correlation Function
    base_file_name_three = "three_month_spearman_lagged_correlation.csv"
    base_file_name_six = "six_month_spearman_lagged_correlation.csv"
    blob_three = bucket.blob(base_file_name_three)
    blob_six = bucket.blob(base_file_name_six)

    run_correlations = bool(all_data) or not blob_three.exists() or not blob_six.exists()

    if run_correlations:
        print("New data fetched or correlation files missing. Calculating correlations...")
        three_month_spearman_lagged_correlations = calculate_lagged_correlation(filtered_nasdaq_df, lag_days=1, range_months=3)
        six_month_spearman_lagged_correlations = calculate_lagged_correlation(filtered_nasdaq_df, lag_days=1, range_months=6)

        csv_string_three = three_month_spearman_lagged_correlations.to_csv(index=True)
        csv_string_six = six_month_spearman_lagged_correlations.to_csv(index=True)
        blob_three.upload_from_string(csv_string_three, content_type='text/csv')
        blob_six.upload_from_string(csv_string_six, content_type='text/csv')
        print(f"Successfully saved correlation data.")
    else:
        print("No new data, loading existing correlations.")
        three_month_spearman_lagged_correlations = load_data_from_gcs(GCS_BUCKET_NAME, base_file_name_three)
        six_month_spearman_lagged_correlations = load_data_from_gcs(GCS_BUCKET_NAME, base_file_name_six)
        if not three_month_spearman_lagged_correlations.empty:
          three_month_spearman_lagged_correlations = three_month_spearman_lagged_correlations.set_index(three_month_spearman_lagged_correlations.columns[0])
        if not six_month_spearman_lagged_correlations.empty:
          six_month_spearman_lagged_correlations = six_month_spearman_lagged_correlations.set_index(six_month_spearman_lagged_correlations.columns[0])

    if six_month_spearman_lagged_correlations.empty or three_month_spearman_lagged_correlations.empty:
        print("Correlation files are still not found or empty after processing. Aborting.")
        return

    ##########################################################################
    ## Process correlated Data
    def process_and_score_stocks(six_month_correlations, three_month_correlations, screener_data_df, source_ticker, min_nodes, max_nodes, threshold_percent):
        correlation_df = six_month_correlations.rename_axis('source', axis=0)
        grouped_correlation_data = correlation_df.stack().reset_index()
        grouped_correlation_data.columns = ['source', 'target', 'six_month_spearman_correlation']
        grouped_correlation_data = grouped_correlation_data[(grouped_correlation_data['source'] != grouped_correlation_data['target']) & (grouped_correlation_data['target'] != source_ticker)].copy()
        source_connections = grouped_correlation_data[grouped_correlation_data['source'] == source_ticker].copy()
        if source_connections.empty: return pd.DataFrame(), pd.DataFrame()
        source_connections['three_month_spearman_correlation'] = source_connections.apply(lambda row: three_month_correlations.loc[row['source'], row['target']] if row['source'] in three_month_correlations.index and row['target'] in three_month_correlations.columns else 0, axis=1)
        positive_corr_group = source_connections[(source_connections['six_month_spearman_correlation'] > 0) & (source_connections['three_month_spearman_correlation'] > 0)].copy()
        if positive_corr_group.empty: return pd.DataFrame(), pd.DataFrame()
        screener_cols_to_add = ['code', 'market_capitalization', 'last_day_change']
        screener_info = screener_data_df[screener_cols_to_add].rename(columns={'code': 'target'})
        positive_corr_group = pd.merge(positive_corr_group, screener_info, on='target', how='left')
        positive_corr_group.dropna(subset=['market_capitalization', 'last_day_change'], inplace=True)
        if positive_corr_group.empty: return pd.DataFrame(), pd.DataFrame()
        epsilon = 1e-9
        w_3m = 0.6; w_6m = 0.4
        positive_corr_group['unified_correlation'] = (w_3m * positive_corr_group['three_month_spearman_correlation'] + w_6m * positive_corr_group['six_month_spearman_correlation'])
        positive_corr_group['Market Cap'] = positive_corr_group['market_capitalization']
        source_screener_info = screener_data_df[screener_data_df['code'] == source_ticker]
        source_market_cap = source_screener_info['market_capitalization'].iloc[0] if not source_screener_info.empty else epsilon
        source_log_cap = np.log(max(source_market_cap, epsilon))
        all_market_caps = positive_corr_group['Market Cap'].tolist()
        all_market_caps.append(source_market_cap)
        log_caps = np.log(pd.Series(all_market_caps).clip(lower=epsilon))
        min_log_cap, max_log_cap = log_caps.min(), log_caps.max()
        log_cap_range = max_log_cap - min_log_cap
        if log_cap_range > 0: positive_corr_group['market_cap_influence'] = np.log(positive_corr_group['Market Cap'].clip(lower=epsilon))
        else: positive_corr_group['market_cap_influence'] = 20
        correlation_weight_factor = 1.0
        positive_corr_group['gravitational_force'] = ((positive_corr_group['unified_correlation'] * correlation_weight_factor) * positive_corr_group['market_cap_influence'])
        max_abs_force = positive_corr_group['gravitational_force'].abs().max()
        if pd.isna(max_abs_force) or max_abs_force == 0: return pd.DataFrame(), pd.DataFrame()
        force_threshold = max_abs_force * threshold_percent
        filtered_by_force_threshold = positive_corr_group[positive_corr_group['gravitational_force'].abs() >= force_threshold].copy()
        if len(filtered_by_force_threshold) < min_nodes: final_filtered_df = positive_corr_group.sort_values(by='gravitational_force', key=abs, ascending=False).head(min_nodes).copy()
        elif len(filtered_by_force_threshold) > max_nodes: final_filtered_df = filtered_by_force_threshold.sort_values(by='gravitational_force', key=abs, ascending=False).head(max_nodes).copy()
        else: final_filtered_df = filtered_by_force_threshold.copy()
        if final_filtered_df.empty: return pd.DataFrame(), pd.DataFrame()
        final_filtered_df['Daily Change'] = final_filtered_df['last_day_change']
        final_filtered_df['signed_gravitational_force'] = final_filtered_df.apply(lambda row: row['gravitational_force'] if row['Daily Change'] >= 0 else -row['gravitational_force'], axis=1)
        net_gravitational_force = final_filtered_df['signed_gravitational_force'].sum()
        max_potential_force = final_filtered_df['market_cap_influence'].sum()
        min_corr, max_corr = final_filtered_df['gravitational_force'].min(), final_filtered_df['gravitational_force'].max()
        corr_range = max_corr - min_corr if max_corr > min_corr else 1.0
        if corr_range > 0: final_filtered_df['Orbital Radius'] = 1 - ((final_filtered_df['gravitational_force'] - min_corr) / corr_range)
        else: final_filtered_df['Orbital Radius'] = 0.5
        all_caps = pd.concat([final_filtered_df['Market Cap'], pd.Series([source_market_cap])], ignore_index=True)
        log_all_caps = np.log(all_caps.clip(lower=epsilon))
        min_log_cap = log_all_caps.min(); max_log_cap = log_all_caps.max(); log_cap_range = max_log_cap - min_log_cap
        if log_cap_range > 0:
            log_df_caps = np.log(final_filtered_df['Market Cap'].clip(lower=epsilon))
            final_filtered_df['Planet Radius'] = (log_df_caps - min_log_cap) / log_cap_range
        else: final_filtered_df['Planet Radius'] = 0.5
        if log_cap_range > 0: source_planet_radius = (source_log_cap - min_log_cap) / log_cap_range
        else: source_planet_radius = 0.5
        final_filtered_df['gravitational_percent'] = (final_filtered_df['signed_gravitational_force'] / final_filtered_df['gravitational_force'].sum()) * 100
        gravitational_impact = (net_gravitational_force / max_potential_force) * 100 if max_potential_force > 0 else 0
        source_market_cap_influence = 20 if log_cap_range <= 0 else (source_log_cap)
        source_data_df = pd.DataFrame([{'ticker': source_ticker, 'net_gravitational_force': net_gravitational_force, 'max_potential_force': max_potential_force, 'gravitational_impact': gravitational_impact, 'source_market_cap_influence': source_market_cap_influence, 'source_planet_radius': source_planet_radius}])
        final_columns = ['source', 'target', 'Daily Change', 'six_month_spearman_correlation', 'three_month_spearman_correlation', 'unified_correlation', 'Orbital Radius', 'Market Cap', 'Planet Radius', 'market_cap_influence', 'gravitational_force', 'signed_gravitational_force', 'gravitational_percent']
        for col in final_columns:
            if col not in final_filtered_df.columns: final_filtered_df[col] = np.nan
        processed_data_df = final_filtered_df[final_columns].copy()
        return processed_data_df, source_data_df

    ########################################################################
    ##List of Top Predictions
    min_nodes = 5; max_nodes = 30; threshold_percent = 0.9
    top_gravitational_impacts = []

    unified_correlation_df = six_month_spearman_lagged_correlations.copy()

    for ticker in tqdm(unified_correlation_df.index, desc="Processing tickers for gravitational impact"):
        try:
            processed_df, source_data = process_and_score_stocks(six_month_spearman_lagged_correlations, three_month_spearman_lagged_correlations, screener_data_df, ticker, min_nodes, max_nodes, threshold_percent)
            if not source_data.empty:
                top_gravitational_impacts.append({
                    'ticker': ticker,
                    'net_gravitational_force': source_data['net_gravitational_force'].iloc[0],
                    'max_potential_force': source_data['max_potential_force'].iloc[0],
                    'gravitational_impact': source_data['gravitational_impact'].iloc[0]
                })
        except Exception as e:
            print(f"Error processing ticker {ticker}: {e}")

    gravitational_impact_df = pd.DataFrame(top_gravitational_impacts)

    impact_blob_name = "gravitational_impact_df.csv"
    impact_blob = bucket.blob(impact_blob_name)
    csv_string = gravitational_impact_df.to_csv(index=False)
    impact_blob.upload_from_string(csv_string, content_type='text/csv')
    print(f"Successfully saved data to gs://{actual_gcs_bucket_name}/{impact_blob_name}")
    print('Data pipeline finished successfully.')

    return "Pipeline finished."
