In [None]:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import yfinance as yf
import pandas as pd
from pymongo import MongoClient

def extract_stock_data():
    tech_list = ['AAPL', 'NVDA', 'MSFT', 'GOOG', 'AMZN', 'META', 'BRK-B', 'AVGO', 'LLY', 'TSLA', 'WMT', 'JPM', 'V', 'XOM', 'UNH', 'ORCL', 'MA']
    end = datetime.now()
    start = datetime(end.year - 2, end.month, end.day)
    company_list = []
    company_name = ["APPLE", "NVIDIA", "Microsoft", "Alphabet(Google)", "Amazon", "Meta Platforms", 
                    "Berkshire Hathaway", "Broadcom", "Eli Lilly", "Tesla", "Walmart", 
                    "JPMorgan Chase", "Visa", "Exxon Mobil", "UnitedHealth", "Oracle", "Mastercard"]

    for stock, name in zip(tech_list, company_name):
        data = yf.download(stock, start=start, end=end)
        data.reset_index(inplace=True)
        data['Date'] = pd.to_datetime(data['Date'], errors='coerce').dt.normalize()
        data['5-days Moving Averages'] = data['Close'].rolling(window=5).mean()
        data['30-days Moving Averages'] = data['Close'].rolling(window=30).mean()
        data['Daily_Return'] = data['Close'].pct_change() * 100
        data["company_name"] = name
        data['Daily_Return'] = data['Daily_Return'].fillna(0)
        company_list.append(data)

    df = pd.concat(company_list, axis=0)
    df.to_csv('/tmp/stock_data.csv', index=False)
    print("Stock data extracted and saved to CSV")

def load_to_mongodb():
    client = MongoClient('mongodb+srv://<username>:<password>@cluster0.mongodb.net')
    db = client['stock_prices']
    collection = db['stock_prices_5Y_17Com']
    collection.delete_many({})

    df = pd.read_csv('/tmp/stock_data.csv')
    data_dict = df.to_dict(orient="records")
    collection.insert_many(data_dict)
    print("Data loaded into MongoDB")

def print_success():
    print("ETL process completed successfully!")

# Define the default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define the DAG
with DAG(
    'stock_price_etl',
    default_args=default_args,
    description='A simple ETL pipeline for stock data using Apache Airflow',
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False,
) as dag:

    extract_task = PythonOperator(
        task_id='extract_stock_data',
        python_callable=extract_stock_data
    )

    load_task = PythonOperator(
        task_id='load_to_mongodb',
        python_callable=load_to_mongodb
    )

    success_task = PythonOperator(
        task_id='print_success',
        python_callable=print_success
    )

    # Task dependencies
    extract_task >> load_task >> success_task
