In [21]:
import mysql.connector
import requests
import pandas as pd
import yfinance as yf
from sqlalchemy import create_engine
from mysql.connector import Error  # Import Error for exception handling

# creating mysql connection
def get_mysql_connection():
    try:
        connection = mysql.connector.connect(
            host="localhost",
            user="root",
            password="Aher@123"
        )
        if connection.is_connected():
            print("Connected to MySQL Database")
            return connection
    except Error as e:
        print("Error while connecting to MySQL", e)
        return None

def create_database_if_not_exists(cursor, db):
    try:
        cursor.execute(f"SHOW DATABASES LIKE '{db}'")
        result = cursor.fetchone()
        if not result:
            cursor.execute(f"CREATE DATABASE {db}")
            print(f"Database {db} created.")
        else:
            print(f"Database {db} already exists.")
    except Error as e:
        print(f"Error creating/checking database: {e}")

# Create table if it doesn't exist
def create_table_if_not_exists(cursor, db, symbol):
    try:
        cursor.execute(f"USE {db}")  # Ensure we're using the correct database
        cursor.execute(f"SHOW TABLES LIKE '{symbol}'")
        result = cursor.fetchone()
        if not result:
            cursor.execute(
                f"""
            CREATE TABLE {symbol} (
                id INT AUTO_INCREMENT PRIMARY KEY,
                date DATETIME,
                open FLOAT,
                high FLOAT,
                low FLOAT,
                close FLOAT,
                volume INT
            );
            """
            )
            print(f"Table {symbol} created.")
        else:
            print(f"Table {symbol} already exists.")
    except Error as e:
        print(f"Error creating/checking symbol: {e}")


# fetch historical data
def fetch_historical_data(symbol):
    try:
        if symbol:
            histdata = yf.download(symbol)
            histdata.reset_index(inplace=True)
            histdata.rename(columns={'Date': 'date'}, inplace=True) #rename Date column
            print(f"symbol data is being fetched for {symbol}")
            return histdata
        else:
            print("Symbol is not provided.")
            return None
    except Exception as e:  # Using Exception to catch any error from yfinance
        print("Error fetching historical data", e)
        return None


def fetch_livedata(symbol, api_key):
    try:
        if symbol and api_key:
            url = "https://www.alphavantage.co/query"
            params = {
                "function": "TIME_SERIES_DAILY",
                "symbol": symbol,
                "apikey": api_key,
                "outputsize": "full",
            }
            response = requests.get(url, params=params)
            livedata = response.json()
            print(f"live data is being fetched for {symbol}")

            # check for errors
            if "Time Series (Daily)" not in livedata:
                print("Error fetching live data")
                return pd.DataFrame()

            daily_data = livedata["Time Series (Daily)"]
            df = pd.DataFrame.from_dict(daily_data, orient="index")
            df.reset_index(inplace=True)
            df.columns = ["Date", "Open", "High", "Low", "Close", "Volume"]
            df["Date"] = pd.to_datetime(df["Date"])
            df.rename(columns={'Date': 'date', 'Open': 'open', 'High':'high', 'Low':'low', 'Close':'close', 'Volume':'volume'}, inplace=True) # rename columns to match database
            df = df.sort_values(by="date", ascending=True)
            if 'id' in df.columns:
                df.drop(columns=['id'], inplace=True)
            return df
        else:
            print("Symbol or API key is not provided.")
            return None
    except Exception as e:
        print("Error fetching live data:", e)
        return None


def insert_data_insert_into_table(cursor, db, table, data):
    if data is None or data.empty:  #
        print("No data to insert")
        return

    encoded_password = "Aher%40123"
    engine = create_engine(f"mysql+mysqlconnector://root:{encoded_password}@localhost/{db}")

    try:
        df = pd.DataFrame(data)
        df.columns = map(str.lower, df.columns) 

        df.rename(columns={'date': 'date', 'open': 'open', 'high':'high', 'low':'low', 'close':'close', 'volume':'volume'}, inplace=True) # rename columns to match database

        
        if 'date' in df.columns:
            df['date'] = pd.to_datetime(df['date'])  # Convert to datetime

        
        df.drop_duplicates(subset=['date'], keep='last', inplace=True)
        
        if 'id' in df.columns:
            df = df.drop(columns=['id'])



        df.to_sql(name=table, con=engine, if_exists='append', index=False)  # use to_sql

        print(f"{len(df)} rows inserted into {table}.")

    except Exception as e:
        print(f"Error inserting data into {table}: {e}")


def run_pipeline(db, symbol, api_key):
    # Connect to the database
    conn = get_mysql_connection()
    if not conn:  
        return
    cursor = conn.cursor()

    create_database_if_not_exists(cursor, db)
    create_table_if_not_exists(cursor, db, symbol)

    historical_data = fetch_historical_data(symbol)
    if historical_data is None or historical_data.empty:
        print("No historical data available for the symbol")
    else:
        insert_data_insert_into_table(cursor, db, symbol, historical_data)

    livedata = fetch_livedata(symbol, api_key)
    if livedata is None or livedata.empty:
        print("No live data available for the symbol.  Falling back to historical data.")
        historical_data = fetch_historical_data(symbol)
        if historical_data is None or historical_data.empty:
            print("No historical data available either.  Cannot proceed.")
        else:
            insert_data_insert_into_table(cursor, db, symbol, historical_data)
    else:
        insert_data_insert_into_table(cursor, db, symbol, livedata)

    conn.commit()

    try:
        query = f"SELECT * FROM {db}.{symbol}"
        df = pd.read_sql(query, conn)
        print(df.head()) 

    except Exception as e:
        print(f"Error reading the data: {e}")

    finally: 
        cursor.close()
        conn.close()



def get_valid_symbol():
    while True:
        symbol = input("Enter the stock symbol (e.g., AAPL): ").upper()
        if symbol:  
            return symbol
        else:
            print("Invalid symbol. Please enter a valid symbol.")


if __name__ == "__main__":
    symbol = get_valid_symbol()
    run_pipeline(db="stock_data", symbol=symbol, api_key="771O3VPDZ5UH78E3")

Connected to MySQL Database
Database stock_data already exists.
Table HINDUNILVR created.


[*********************100%***********************]  1 of 1 completed

1 Failed download:
['HINDUNILVR']: YFTzMissingError('$%ticker%: possibly delisted; no timezone found')


symbol data is being fetched for HINDUNILVR
No historical data available for the symbol


[*********************100%***********************]  1 of 1 completed

1 Failed download:
['HINDUNILVR']: YFTzMissingError('$%ticker%: possibly delisted; no timezone found')


live data is being fetched for HINDUNILVR
Error fetching live data
No live data available for the symbol.  Falling back to historical data.
symbol data is being fetched for HINDUNILVR
No historical data available either.  Cannot proceed.
Empty DataFrame
Columns: [id, timestamp, open, high, low, close, volume]
Index: []


  df = pd.read_sql(query, conn)


In [16]:
import mysql.connector
import requests
import pandas as pd
import yfinance as yf
from sqlalchemy import create_engine
from mysql.connector import Error, ProgrammingError # Import Error for exception handling
# from yfinance import YFTzMissingError  # Import the specific exception
import datetime

# creating mysql connection
def get_mysql_connection():
    try:
        connection = mysql.connector.connect(
            host="localhost",
            user="root",
            password="Aher@123"
        )
        if connection.is_connected():
            print("Connected to MySQL Database")
            return connection
    except Error as e:
        print("Error while connecting to MySQL", e)
        return None

def create_database_if_not_exists(cursor, db):
    try:
        cursor.execute(f"SHOW DATABASES LIKE '{db}'")
        result = cursor.fetchone()
        if not result:
            cursor.execute(f"CREATE DATABASE {db}")
            print(f"Database {db} created.")
        else:
            print(f"Database {db} already exists.")
    except Error as e:
        print(f"Error creating/checking database: {e}")

# Create table if it doesn't exist
def create_table_if_not_exists(cursor, db, table):
    try:
        cursor.execute(f"USE `{db}`")  # Ensure we're using the correct database; quote the DB name
        cursor.execute(f"SHOW TABLES LIKE '{table}'")
        result = cursor.fetchone()
        if not result:
            cursor.execute(
                f"""
            CREATE TABLE `{table}` (  # quote the table name
                id INT AUTO_INCREMENT PRIMARY KEY,
                date DATETIME,
                open FLOAT,
                high FLOAT,
                low FLOAT,
                close FLOAT,
                volume INT
            );
            """
            )
            print(f"Table {table} created.")
        else:
            print(f"Table {table} already exists.")
    except ProgrammingError as e:
        print(f"Error creating/checking table: {e}")


# fetch historical data
def fetch_historical_data(symbol):
    try:
        if symbol:
            histdata = yf.download(symbol)
            histdata.reset_index(inplace=True)
            histdata.rename(columns={'Date': 'date'}, inplace=True) #rename Date column
            print(f"symbol data is being fetched for {symbol}")
            return histdata
        else:
            print("Symbol is not provided.")
            return None
    # except YFTzMissingError as e:  # Catch the specific YFTzMissingError
    #     print(f"YFTzMissingError: Data not found for symbol {symbol}.  Possibly delisted or incorrect symbol.")
    #     return None
    except Exception as e:  # Using Exception to catch any other error from yfinance
        print("Error fetching historical data", e)
        return None


def fetch_livedata(symbol, api_key):
    try:
        if symbol and api_key:
            url = "https://www.alphavantage.co/query"
            params = {
                "function": "TIME_SERIES_DAILY",
                "symbol": symbol,
                "apikey": api_key,
                "outputsize": "full",
            }
            response = requests.get(url, params=params)
            livedata = response.json()
            print(f"live data is being fetched for {symbol}")

            # check for errors
            if "Time Series (Daily)" not in livedata:
                print("Error fetching live data")
                return pd.DataFrame()

            daily_data = livedata["Time Series (Daily)"]
            df = pd.DataFrame.from_dict(daily_data, orient="index")
            df.reset_index(inplace=True)
            df.columns = ["Date", "Open", "High", "Low", "Close", "Volume"]
            df["Date"] = pd.to_datetime(df["Date"])
            df.rename(columns={'Date': 'date', 'Open': 'open', 'High':'high', 'Low':'low', 'Close':'close', 'Volume':'volume'}, inplace=True) # rename columns to match database
            df = df.sort_values(by="date", ascending=True)
            return df
        else:
            print("Symbol or API key is not provided.")
            return None
    except Exception as e:
        print("Error fetching live data:", e)
        return pd.DataFrame() # Return an empty dataframe


# def insert_data_insert_into_table(cursor, db, table, data):
#     if data is None or data.empty:  # check if the dataframe is empty or None
#         print("No data to insert")
#         return

#     encoded_password = "Aher%40123"
#     engine = create_engine(f"mysql+mysqlconnector://root:{encoded_password}@localhost/{db}")

#     try:
#         # Prepare the dataframe: Convert column names to lowercase, handle date if necessary.
#         df = pd.DataFrame(data)
#         # Flatten column names and ensure they are all lowercase strings
#         df.columns = [
#             " ".join(col).lower() if isinstance(col, tuple) else str(col).lower() for col in df.columns
#         ]


#         # Standardize column names to match the database table
#         df.rename(columns={'date': 'date', 'open': 'open', 'high':'high', 'low':'low', 'close':'close', 'volume':'volume'}, inplace=True) # rename columns to match database

#         # Handle date column: Ensure it's in datetime format and named 'date'
#         if 'date' in df.columns:
#             df['date'] = pd.to_datetime(df['date'])  # Convert to datetime

#         # Drop duplicates based on date, before inserting
#         df.drop_duplicates(subset=['date'], keep='last', inplace=True)

#         #Change data types if needed
#         df['open'] = df['open'].astype(float)
#         df['high'] = df['high'].astype(float)
#         df['low'] = df['low'].astype(float)
#         df['close'] = df['close'].astype(float)
#         df['volume'] = df['volume'].astype(int)


#         # Use SQLAlchemy to insert data. This is the best approach for performance and type handling.
#         df.to_sql(name=table, con=engine, if_exists='append', index=False)  # use to_sql

#         print(f"{len(df)} rows inserted into {table}.")

#     except Exception as e:
#         print(f"Error inserting data into {table}: {e}")
def insert_data_insert_into_table(cursor, db, table, data):
    if data is None or data.empty:  # check if the dataframe is empty or None
        print("No data to insert")
        return

    encoded_password = "Aher%40123"
    engine = create_engine(f"mysql+mysqlconnector://root:{encoded_password}@localhost/{db}")

    try:
        # Prepare the dataframe: Convert column names to lowercase, handle date if necessary.
        df = pd.DataFrame(data)
        # Flatten column names and ensure they are all lowercase strings
        df.columns = [
            " ".join(col).lower() if isinstance(col, tuple) else str(col).lower() for col in df.columns
        ]


        # Standardize column names to match the database table
        df.rename(columns={'date': 'date', 'open': 'open', 'high':'high', 'low':'low', 'close':'close', 'volume':'volume'}, inplace=True) # rename columns to match database

        # Handle date column: Ensure it's in datetime format and named 'date'
        if 'date' in df.columns:
            df['date'] = pd.to_datetime(df['date'])  # Convert to datetime

        # Drop duplicates based on date, before inserting
        df.drop_duplicates(subset=['date'], keep='last', inplace=True)

        # Print column data types before insertion for debugging
        print("Dataframe before insertion:", df.dtypes)

        #Change data types if needed
        try:
            df['open'] = df['open'].astype(float)
            df['high'] = df['high'].astype(float)
            df['low'] = df['low'].astype(float)
            df['close'] = df['close'].astype(float)
            df['volume'] = df['volume'].astype(int)
        except Exception as e:
            print(f"Error converting data types: {e}")
            return

        # Use SQLAlchemy to insert data. This is the best approach for performance and type handling.
        df.to_sql(name=table, con=engine, if_exists='append', index=False)  # use to_sql

        print(f"{len(df)} rows inserted into {table}.")

    except Exception as e:
        print(f"Error inserting data into {table}: {e}")


def run_pipeline(db, symbol, api_key):
    # Connect to the database
    conn = get_mysql_connection()
    if not conn:  # Exit if connection fails
        return
    cursor = conn.cursor()

    create_database_if_not_exists(cursor, db)
    create_table_if_not_exists(cursor, db, symbol)

    livedata = fetch_livedata(symbol, api_key)

    if livedata is None or livedata.empty:
        print("No live data available for the symbol.  Falling back to historical data.")
        historical_data = fetch_historical_data(symbol)
        if historical_data is None or historical_data.empty:
            print("No historical data available either.  Cannot proceed.")
        else:
            insert_data_insert_into_table(cursor, db, symbol, historical_data)
    else:
        insert_data_insert_into_table(cursor, db, symbol, livedata)



    conn.commit()

    try:
        # Quote the table name to handle dots in the symbol
        quoted_table = f"`{symbol}`"  # Use backticks for quoting identifiers
        query = f"SELECT * FROM `{db}`.{quoted_table}"  # Correctly quote database and table names.
        df = pd.read_sql(query, conn)
        print(df.head())  # Show a preview of the data

    except Exception as e:
        print(f"Error reading the data: {e}")

    finally:  # Ensures resources are closed even if errors occur
        cursor.close()
        conn.close()


# Input validation
def get_valid_symbol():
    while True:
        symbol = input("Enter the stock symbol (e.g., AAPL): ").upper()
        if symbol:  # Checks for non-empty input
            return symbol
        else:
            print("Invalid symbol. Please enter a valid symbol.")

# Example Usage:
if __name__ == "__main__":
    symbol = get_valid_symbol()
    run_pipeline(db="stock_data", symbol=symbol, api_key="771O3VPDZ5UH78E3")

Connected to MySQL Database
Database stock_data already exists.
Table HINDUNILVR.NS already exists.
live data is being fetched for HINDUNILVR.NS
Error fetching live data
No live data available for the symbol.  Falling back to historical data.


[*********************100%***********************]  1 of 1 completed

symbol data is being fetched for HINDUNILVR.NS
Error inserting data into HINDUNILVR.NS: Index(['date'], dtype='object')
Empty DataFrame
Columns: [id, timestamp, open, high, low, close, volume]
Index: []



  df = pd.read_sql(query, conn)


In [5]:
df=yf.download("HINDUNILVR.NS")
df.head()

[*********************100%***********************]  1 of 1 completed


Price,Close,High,Low,Open,Volume
Ticker,HINDUNILVR.NS,HINDUNILVR.NS,HINDUNILVR.NS,HINDUNILVR.NS,HINDUNILVR.NS
Date,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2
1996-01-01,34.524017,34.954136,34.524017,34.744663,11000
1996-01-02,34.892696,35.023967,34.535195,34.524023,203500
1996-01-03,34.686005,34.828445,34.521218,34.892684,58000
1996-01-04,34.688805,34.800524,34.577087,34.632946,111500
1996-01-05,34.632935,34.856374,34.353637,34.688793,39500
