In [0]:
%pip install -r ../../requirements.txt
%restart_python

In [0]:
import yfinance as yf
from datetime import datetime, date
import os
import time
from pyspark.sql.functions import lit
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
from pyspark.sql import functions as F
from pyspark.sql.types import DateType, TimestampType
import yaml
from pyspark.sql.functions import to_date, col
import sys

# Adding the project root folder to sys.path so that imports work
sys.path.append(os.path.abspath('../../'))
from tests.bronze_tests import verify_bronze_batch


# --- DELTA LAKE IMPORT ---
from delta.tables import DeltaTable
from pyspark.sql import types as T
#logger
from libs.logger import log_execution

# --- 1. Environment Configuration ---
try:
    # dbutils.widgets.get() is the standard method for reading Job parameters
    ENV = dbutils.widgets.get("env_name") 
    print(f"Environment configured by Job (ENV): {ENV}")
except Exception:
    # Fallback for interactive/manual execution
    ENV = 'TEST' 
    print(f"Interactive/Manual execution. Using default ENV: {ENV}")

# --- 2. Load Configuration ---
try:
    # Adjust path to config.yaml if necessary
    with open('../../config/config.yaml', 'r') as file:
        full_config = yaml.safe_load(file)
except FileNotFoundError:
    print("ERROR: 'config.yaml' file not found! Check the path.")
    raise

CFG = full_config.get(ENV)
if not CFG:
    raise ValueError(f"Configuration not found for environment: {ENV} in YAML file.")

# Define key configuration variables
catalog_name = CFG['catalog_name']
schema_name = CFG['schema_name']
volume_name = CFG['volume_name']
sql_table = CFG['sql_table_name']
LOGS_PATH = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}/bronze_execution_logs/"
BRONZE_TABLE_PATH = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}/yfinance_bronze_data/"


In [0]:
# --- 3. Get Tickers from Databricks SQL Table ---
try:
    sql_query = f"SELECT Ticket, company_name FROM {sql_table}"
    spark_df_tickets = spark.sql(sql_query)
    df_tickets_list = spark_df_tickets.collect()
    company_info_dict = {row['Ticket']: row['company_name'] for row in df_tickets_list}
    gpw_ticket_list = list(company_info_dict.keys())
    print("Successfully created Spark DataFrame from SQL table.")
    print("Tickers to process:", gpw_ticket_list)
except Exception as e:
    print(f"Error reading from SQL table: {e}.")
    gpw_ticket_list = []
    company_info_dict = {}

In [0]:
# Create the base directory if it doesn't exist
try:
    dbutils.fs.mkdirs(BRONZE_TABLE_PATH)
    print(f"Successfully created directory for the SINGLE BRONZE TABLE: {BRONZE_TABLE_PATH}")
except Exception as e:
    print(f"Directory already exists or an error occurred during creation: {e}")

# Set end date for yfinance download
end_date = date.today()

if not gpw_ticket_list:
    print("No company data to process. Using fallback data for demonstration.")

In [0]:
import pandas as pd
from datetime import date
from pyspark.sql import functions as F
from pyspark.sql import types as T
from delta.tables import DeltaTable
import os
import time

# --- Helper function to check if a path is a Delta Table ---
def is_delta_table(path):
    """Checks if the given path contains a _delta_log folder."""
    try:
        dbutils.fs.ls(os.path.join(path, "_delta_log")) 
        return True
    except Exception:
        return False

# --- Schema Definition ---
bronze_schema = T.StructType([
    T.StructField("Date", T.DateType(), True),
    T.StructField("Open", T.DoubleType(), True),
    T.StructField("High", T.DoubleType(), True),
    T.StructField("Low", T.DoubleType(), True),
    T.StructField("Close", T.DoubleType(), True),
    T.StructField("Volume", T.LongType(), True),
    T.StructField("Ticket", T.StringType(), True),
    T.StructField("company_name", T.StringType(), True)
])

try:
    print(f"Starting Bronze Ingestion for environment: {ENV}")
    log_execution(spark, "01_BRONZE_INGESTION", "STARTED", LOGS_PATH)

    is_full_table_existing = is_delta_table(BRONZE_TABLE_PATH)
    all_new_data_list = []

    # --- 1. OPTIMIZED METADATA FETCHING ---
    last_dates_dict = {}
    if is_full_table_existing:
        try:
            print("Fetching current state of Bronze Table...")
            max_dates_df = spark.read.format("delta").load(BRONZE_TABLE_PATH) \
                                .groupBy("Ticket") \
                                .agg(F.max(F.col("Date").cast("date")).alias("max_date")) \
                                .collect()
            last_dates_dict = {row['Ticket']: row['max_date'] for row in max_dates_df}
        except Exception as e:
            print(f"Warning: Metadata fetch failed: {e}")

    print("\n--- PHASE 1: COLLECTING DATA ---")

    for t in gpw_ticket_list:
        yfinance_ticker = f"{t}.WA"
        last_download_date = last_dates_dict.get(t)
        start_date_for_update = None

        if is_full_table_existing and last_download_date:
            if last_download_date < date.today():
                start_date_for_update = (pd.to_datetime(last_download_date) + pd.Timedelta(days=1)).strftime('%Y-%m-%d')
            else:
                continue 
        
        try:
            # Download data
            if not is_full_table_existing or not last_download_date:
                data = yf.download(yfinance_ticker, period='max', end=end_date)
            else:
                data = yf.download(yfinance_ticker, start=start_date_for_update, end=end_date)
            
            if data is not None and not data.empty:
                # FIX for InvalidIndexError: Remove duplicate indices if they exist
                data = data[~data.index.duplicated(keep='last')]
                
                df_tmp = data.reset_index()
                
                # Clean MultiIndex columns if present
                if isinstance(df_tmp.columns, pd.MultiIndex):
                    df_tmp.columns = df_tmp.columns.map(lambda x: x[0] if isinstance(x, tuple) else x)
                
                # Final check: Ensure "Date" column is unique before adding it to the list
                df_tmp = df_tmp.loc[:, ~df_tmp.columns.duplicated()]

                # Add Metadata
                df_tmp['Ticket'] = t
                df_tmp['company_name'] = company_info_dict.get(t, 'N/A')
                df_tmp['Date'] = pd.to_datetime(df_tmp['Date']).dt.date
                
                # Use only columns defined in schema to avoid "Extra column" errors during concat
                cols_to_keep = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Ticket', 'company_name']
                df_tmp = df_tmp[[c for c in cols_to_keep if c in df_tmp.columns]]
                
                all_new_data_list.append(df_tmp)
                print(f"Collected {t}")
            
        except Exception as e:
            print(f"Error downloading {t}: {e}")

    # --- 2. PHASE 2: BATCH MERGE ---
    if all_new_data_list:
        print("\n--- PHASE 2: BATCH MERGE ---")
        
        # Combined DataFrame with unique columns
        combined_pandas_df = pd.concat(all_new_data_list, ignore_index=True)
        
        # Safety: Final check for duplicates in the combined batch
        combined_pandas_df = combined_pandas_df.drop_duplicates(subset=['Date', 'Ticket'])
        
        spark_batch_df = spark.createDataFrame(combined_pandas_df, schema=bronze_schema)
        
        if not is_full_table_existing:
            spark_batch_df.write.format("delta").mode("overwrite").partitionBy("Ticket").save(BRONZE_TABLE_PATH)
            # Constraints
            spark.sql(f"ALTER TABLE delta.`{BRONZE_TABLE_PATH}` ADD CONSTRAINT date_not_null CHECK (Date IS NOT NULL)")
            spark.sql(f"ALTER TABLE delta.`{BRONZE_TABLE_PATH}` ADD CONSTRAINT ticket_not_null CHECK (Ticket IS NOT NULL)")
        else:
            deltaTable = DeltaTable.forPath(spark, BRONZE_TABLE_PATH)
            deltaTable.alias("target").merge(
                source=spark_batch_df.alias("source"),
                condition="target.Date = source.Date AND target.Ticket = source.Ticket"
            ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
            
        print(f"Successfully merged {len(all_new_data_list)} tickers in one transaction.")
    else:
        print("Nothing to update.")

    log_execution(spark, "01_BRONZE_INGESTION", "SUCCESS", LOGS_PATH)

except Exception as e:
    log_execution(spark, "01_BRONZE_INGESTION", "FAILED", LOGS_PATH, message=str(e)[:500])
    raise e