In [0]:
!pip install yfinance

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
dbutils.library.restartPython()


In [0]:
import yfinance as yf


In [0]:
import yfinance as yf
import pandas as pd
from datetime import datetime

tickers = {
    "AAPL": "Technology",
    "MSFT": "Technology",
    "JPM": "Financials",
    "BAC": "Financials"
}

log_entries = []

for ticker, sector in tickers.items():
    try:
        print(f"\n📥 Загружаємо даніс для: {ticker} ({sector})")
        
        # оримання даних через yfinance
        stock = yf.Ticker(ticker)
        data_pd = stock.history(period="1y").reset_index()
        
        # Додаємо колонки Ticker та Sector
        data_pd["Ticker"] = ticker
        data_pd["Sector"] = sector
        
        # Зміна назв колонок с пробілами "Stock Splits" на підчерк
        data_pd.columns = [col.replace(" ", "_").replace(".", "") for col in data_pd.columns]
        
        # Конверт в Spark DataFrame
        data_spark = spark.createDataFrame(data_pd)
        
        # Им'я Delta таблиці для цього тикера в bronze рівні
        table_name = f"bronze_{ticker.lower()}"
        
        # зберігаємо в Delta таблицу 
        data_spark.write.format("delta").mode("overwrite").saveAsTable(table_name)
        
        print(f"✅ дані {ticker} збережені в таблицу {table_name}")
        
        rows_loaded = data_pd.shape[0]
        status = "success"
        
    except Exception as e:
        print(f"❌ помилка загрузкі {ticker}: {e}")
        rows_loaded = 0
        status = f"error: {str(e)}"
    
    log_entries.append({
        "step": "ingestion",
        "layer": "bronze",
        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "status": status,
        "rows_processed": rows_loaded,
        "comment": f"{ticker} ({sector})"
    })

# збереження логів пайплайна в окрему Delta таблицу
log_df = pd.DataFrame(log_entries)
spark_log_df = spark.createDataFrame(log_df)
spark_log_df.write.format("delta").mode("append").saveAsTable("pipeline_logs")

print("\n📄 Логи пайплайна оновлені и збережені в таблицу 'pipeline_logs'.")

display(spark.sql("SELECT * FROM pipeline_logs ORDER BY timestamp DESC"))



📥 Загружаємо даніс для: AAPL (Technology)
✅ дані AAPL збережені в таблицу bronze_aapl

📥 Загружаємо даніс для: MSFT (Technology)
✅ дані MSFT збережені в таблицу bronze_msft

📥 Загружаємо даніс для: JPM (Financials)
✅ дані JPM збережені в таблицу bronze_jpm

📥 Загружаємо даніс для: BAC (Financials)
✅ дані BAC збережені в таблицу bronze_bac

📄 Логи пайплайна оновлені и збережені в таблицу 'pipeline_logs'.


step,layer,timestamp,status,rows_processed,comment
ingestion,bronze,2025-05-31 12:12:54,success,250,BAC (Financials)
ingestion,bronze,2025-05-31 12:12:51,success,250,JPM (Financials)
ingestion,bronze,2025-05-31 12:12:49,success,250,MSFT (Technology)
ingestion,bronze,2025-05-31 12:12:47,success,250,AAPL (Technology)
aggregation,gold,2025-05-31 05:03:38,success,4,Агрегация среднего Close и суммарного Volume по тикеру и сектору
transformation,silver,2025-05-31 05:02:59,success,1000,Объединение и очистка данных из бронзового слоя
ingestion,bronze,2025-05-31 05:02:19,success,250,BAC (Financials)
ingestion,bronze,2025-05-31 05:02:17,success,250,JPM (Financials)
ingestion,bronze,2025-05-31 05:02:15,success,250,MSFT (Technology)
ingestion,bronze,2025-05-31 05:02:12,success,250,AAPL (Technology)
