In [None]:
# Databricks notebook source
import requests
import json
import os
from dotenv import load_dotenv
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [None]:
def get_secret(key_name, env_var_name):
    """Get a secret from credentials.json file"""
    try:
        # Try to read from credentials file
        credentials_path = "/dbfs/FileStore/configs/credentials.json"
        with open(credentials_path, 'r') as f:
            credentials = json.load(f)
            print(f"Loaded credentials from {credentials_path}")
            
        if key_name.replace("-", "_") in credentials:
            return credentials[key_name.replace("-", "_")]
        else:
            raise ValueError(f"Key {key_name} not found in credentials file")
            
    except Exception as e:
        # Fall back to environment variables
        load_dotenv()
        env_value = os.getenv(env_var_name)
        
        if not env_value:
            raise ValueError(f"Could not find secret {key_name} in credentials or environment")
        
        return env_value
    
# Load configuration
def load_config():
    """Load configuration from config.json"""
    config_path = "/dbfs/FileStore/configs/config.json"
    with open(config_path, 'r') as f:
        return json.load(f)

In [None]:
# # test cell
# Test if credentials are accessible
try:
    api_key = get_secret("alpha-vantage-api-key", "ALPHA_VANTAGE_API_KEY")
    print(f"Successfully retrieved API key: {api_key[:4]}...{api_key[-4:]}")
    
    # Test if we can call the Alpha Vantage API
    import requests
    test_url = f"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol=MSFT&outputsize=compact&apikey={api_key}"
    response = requests.get(test_url)
    
    if response.status_code == 200:
        data = response.json()
        if "Time Series (Daily)" in data:
            print("API test successful! Received time series data.")
            print(f"Data points: {len(data['Time Series (Daily)'])}")
        else:
            print(f"API returned: {data}")
    else:
        print(f"API request failed with status code: {response.status_code}")
        
except Exception as e:
    print(f"Error testing credentials: {e}")


In [None]:
# Load configuration
config = load_config()
print(f"Config loaded:\n {config}")
symbols = config["stock_symbols"]
bronze_path = config["data_storage"]["bronze_path"]

In [None]:
# Define Schema
# Define schema for stock data
stock_schema = StructType([
    StructField("symbol", StringType(), False),
    StructField("date", DateType(), False),
    StructField("open", DoubleType(), True),
    StructField("high", DoubleType(), True),
    StructField("low", DoubleType(), True),
    StructField("close", DoubleType(), True),
    StructField("volume", LongType(), True),
    StructField("ingestion_date", TimestampType(), False)
])

In [None]:
# API Data Fetching
# COMMAND ----------
def fetch_alpha_vantage_data(symbol, api_key):
    """Fetch stock data from Alpha Vantage API"""
    url = f"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={symbol}&outputsize=full&apikey={api_key}"
    response = requests.get(url)
    
    # Check for valid response
    if response.status_code != 200:
        print(f"Error fetching data for {symbol}: {response.status_code}")
        return []
    
    data = response.json()
    
    # Check for API errors or limits
    if "Error Message" in data:
        print(f"API Error for {symbol}: {data['Error Message']}")
        return []
    
    if "Note" in data:  # API limit reached
        print(f"API Limit reached: {data['Note']}")
        return []
        
    # Check if we have data
    if "Time Series (Daily)" not in data:
        print(f"No data found for {symbol}")
        return []
        
    time_series = data["Time Series (Daily)"]
    rows = []
    
    # Extract data points
    for date_str, values in time_series.items():
        row = (
            symbol,
            datetime.strptime(date_str, "%Y-%m-%d").date(),
            float(values["1. open"]),
            float(values["2. high"]),
            float(values["3. low"]),
            float(values["4. close"]),
            int(values["5. volume"]),
            datetime.now()
        )
        rows.append(row)
    
    return rows

In [None]:
# Testing timestamp types
from datetime import datetime
from pyspark.sql.functions import current_timestamp

# Print the type of current_timestamp() Spark SQL function
print(f"current_timestamp() is type: {type(current_timestamp())}")

# Print the type of datetime.now() Python function
current_time = datetime.now()
print(f"datetime.now() is type: {type(current_time)}")

# Create a small test DataFrame to see how both are handled
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

# Create a test schema with TimestampType
test_schema = StructType([
    StructField("id", StringType(), False),
    StructField("timestamp", TimestampType(), False)
])

# Test with Python datetime
python_rows = [("test1", datetime.now())]
python_df = spark.createDataFrame(python_rows, test_schema)
print("DataFrame created with Python datetime:")
python_df.printSchema()
python_df.show(truncate=False)

# Test with Spark current_timestamp - This works differently
# We need to create a DataFrame first and then add the timestamp column
spark_df = spark.createDataFrame([("test2",)], ["id"])
spark_df = spark_df.withColumn("timestamp", current_timestamp())
print("DataFrame created with Spark current_timestamp():")
spark_df.printSchema()
spark_df.show(truncate=False)

In [None]:
# Bronze Layer Ingestion Function
def ingest_to_bronze():
    """Ingest stock data to bronze layer"""
    # Get API key from Azure Key Vault
    api_key = get_secret("alpha-vantage-api-key", "ALPHA_VANTAGE_API_KEY")
    
    # Create empty dataframe with schema
    bronze_df = spark.createDataFrame([], stock_schema)
    
    # Process each symbol with delay to avoid API limits
    for symbol in symbols:
        print(f"Fetching data for {symbol}...")
        rows = fetch_alpha_vantage_data(symbol, api_key)
        
        if rows:
            # Create DataFrame for this symbol
            symbol_df = spark.createDataFrame(rows, stock_schema)
            
            # Union with main DataFrame
            bronze_df = bronze_df.union(symbol_df)
            print(f"Added {symbol_df.count()} rows for {symbol}")
        
        # Delay to respect API limits
        import time
        time.sleep(12)  # Alpha Vantage free tier allows 5 requests per minute
    
    # Write to bronze layer
    bronze_df.write.format("delta").mode("append").partitionBy("symbol") \
        .save(bronze_path)
    
    print(f"Bronze layer updated with {bronze_df.count()} records")
    return bronze_df