### DATA API FOR UPSTOX HISTORICAL

In [None]:
import requests
import pandas as pd
from datetime import datetime, timedelta
import logging
from typing import Dict, List, Optional, Union
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class UpstoxHistoricalData:
    def __init__(self):
        """
        Initialize the Upstox Historical Data client.
        
        Args:
            access_token (str): Upstox API access token
        """
        load_dotenv()
        self.access_token = os.getenv('UPSTOX_ACCESS_TOKEN')
        if not self.access_token:
            logger.error("UPSTOX_ACCESS_TOKEN not found in .env file")
            raise ValueError("Please set UPSTOX_ACCESS_TOKEN in your .env file")
    

        self.base_url = "https://api.upstox.com"
        self.headers = {
            'Accept': 'application/json',
            'Authorization': f'Bearer {self.access_token}'
        }
        
        # Define valid units and their interval constraints
        self.unit_constraints = {
            'minutes': {
                'valid_intervals': range(1, 301),  # 1 to 300
                'max_records': {
                    '1-15': 30,  # 1 month for 1-15 minute intervals
                    '16-300': 90  # 1 quarter for >15 minute intervals
                }
            },
            'hours': {
                'valid_intervals': range(1, 6),  # 1 to 5
                'max_records': 90  # 1 quarter
            },
            'days': {
                'valid_intervals': [1],
                'max_records': 3650  # 1 decade
            },
            'weeks': {
                'valid_intervals': [1],
                'max_records': None  # No limit
            },
            'months': {
                'valid_intervals': [1],
                'max_records': None  # No limit
            }
        }

    def _validate_dates(self, from_date: str, to_date: str) -> tuple:
        """
        Validate and parse dates.
        
        Args:
            from_date (str): Start date in YYYY-MM-DD format
            to_date (str): End date in YYYY-MM-DD format
            
        Returns:
            tuple: (from_dt, to_dt) as datetime objects
            
        Raises:
            ValueError: If dates are invalid
        """
        try:
            from_dt = datetime.strptime(from_date, '%Y-%m-%d')
            to_dt = datetime.strptime(to_date, '%Y-%m-%d')
            
            if from_dt > to_dt:
                raise ValueError("from_date must be before to_date")
                
            current_date = datetime.now()
            if from_dt > current_date or to_dt > current_date:
                raise ValueError("Cannot fetch data for future dates")
                
            return from_dt, to_dt
            
        except ValueError as e:
            logger.error(f"Date validation error: {str(e)}")
            raise

    def _validate_unit_interval(self, unit: str, interval: int) -> None:
        """
        Validate unit and interval combination.
        
        Args:
            unit (str): Time unit (minutes, hours, days, weeks, months)
            interval (int): Time interval
            
        Raises:
            ValueError: If unit or interval is invalid
        """
        if unit not in self.unit_constraints:
            raise ValueError(f"Invalid unit. Must be one of: {list(self.unit_constraints.keys())}")
            
        if interval not in self.unit_constraints[unit]['valid_intervals']:
            raise ValueError(f"Invalid interval for {unit}. Valid intervals: {list(self.unit_constraints[unit]['valid_intervals'])}")

    def _adjust_date_range(self, from_dt: datetime, to_dt: datetime, unit: str, interval: int) -> datetime:
        """
        Adjust date range based on unit and interval constraints.
        
        Args:
            from_dt (datetime): Start date
            to_dt (datetime): End date
            unit (str): Time unit
            interval (int): Time interval
            
        Returns:
            datetime: Adjusted start date
        """
        max_days = self.unit_constraints[unit]['max_records']
        
        if max_days is None:
            return from_dt
            
        if unit == 'minutes':
            if interval <= 15:
                max_days = max_days['1-15']
            else:
                max_days = max_days['16-300']
                
        date_diff = (to_dt - from_dt).days
        if date_diff > max_days:
            logger.warning(f"Date range exceeds {max_days} days limit. Adjusting start date.")
            return to_dt - timedelta(days=max_days)
            
        return from_dt

    def get_historical_candles(
        self,
        instrument_key: str,
        unit: str,
        interval: int,
        to_date: str,
        from_date: Optional[str] = None
    ) -> pd.DataFrame:
        """
        Fetch historical candle data from Upstox API.
        
        Args:
            instrument_key (str): Instrument key for the symbol
            unit (str): Time unit (minutes, hours, days, weeks, months)
            interval (int): Time interval
            to_date (str): End date in YYYY-MM-DD format
            from_date (str, optional): Start date in YYYY-MM-DD format
            
        Returns:
            pd.DataFrame: Historical candle data with columns:
                - timestamp: Start time of the candle
                - open: Opening price
                - high: Highest price
                - low: Lowest price
                - close: Closing price
                - volume: Trading volume
                - oi: Open Interest
                
        Raises:
            ValueError: If parameters are invalid
            requests.exceptions.RequestException: If API request fails
        """
        try:
            # Validate unit and interval
            self._validate_unit_interval(unit, interval)
            
            # Validate and parse dates
            to_dt = datetime.strptime(to_date, '%Y-%m-%d')
            if from_date:
                from_dt = datetime.strptime(from_date, '%Y-%m-%d')
            else:
                # If from_date not provided, use appropriate default based on unit
                if unit == 'minutes':
                    from_dt = to_dt - timedelta(days=1)  # Default to 1 day for minutes
                else:
                    from_dt = to_dt - timedelta(days=30)  # Default to 30 days for other units
                    
            # Adjust date range if needed
            from_dt = self._adjust_date_range(from_dt, to_dt, unit, interval)
            from_date = from_dt.strftime('%Y-%m-%d')
            
            # Construct API URL
            url = f"{self.base_url}/v3/historical-candle/{instrument_key}/{unit}/{interval}/{to_date}/{from_date}"
            
            # Make API request
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            data = response.json()
            
            if data.get('status') == 'error':
                raise ValueError(f"API Error: {data.get('message', 'Unknown error')}")
                
            # Process candle data
            candles = data.get('data', {}).get('candles', [])
            if not candles:
                logger.warning("No candle data found in the response")
                return pd.DataFrame()
                
            # Convert to DataFrame
            df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume', 'oi'])
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            
            return candles
            # df.sort_values('timestamp')
            
        except requests.exceptions.RequestException as e:
            logger.error(f"API request failed: {str(e)}")
            if hasattr(e.response, 'text'):
                logger.error(f"API response: {e.response.text}")
            raise
        except Exception as e:
            logger.error(f"Error fetching historical data: {str(e)}")
            raise

    # Retrive Current Day Intraday candle data
    def get_intraday_candles(
        self,
        instrument_key: str,
        unit: str,
        interval: int) -> pd.DataFrame:
        """
        Fetch intraday candle data for the current trading day from Upstox API.
        
        Args:
            instrument_key (str): Instrument key for the symbol
            unit (str): Time unit (minutes, hours, days)
            interval (int): Time interval
                - For minutes: 1 to 300
                - For hours: 1 to 5
                - For days: 1
                
        Returns:
            pd.DataFrame: Intraday candle data with columns:
                - timestamp: Start time of the candle
                - open: Opening price
                - high: Highest price
                - low: Lowest price
                - close: Closing price
                - volume: Trading volume
                - oi: Open Interest
                
        Raises:
            ValueError: If parameters are invalid
            requests.exceptions.RequestException: If API request fails
        """
        try:
            # Define intraday-specific unit constraints
            intraday_constraints = {
                'minutes': {
                    'valid_intervals': range(1, 301),  # 1 to 300
                    'description': '1 to 300 minutes'
                },
                'hours': {
                    'valid_intervals': range(1, 6),  # 1 to 5
                    'description': '1 to 5 hours'
                },
                'days': {
                    'valid_intervals': [1],
                    'description': '1 day'
                }
            }
            
            # Validate unit
            if unit not in intraday_constraints:
                raise ValueError(
                    f"Invalid unit '{unit}'. Must be one of: {list(intraday_constraints.keys())}"
                )
                
            # Validate interval
            if interval not in intraday_constraints[unit]['valid_intervals']:
                raise ValueError(
                    f"Invalid interval {interval} for {unit}. "
                    f"Valid intervals: {intraday_constraints[unit]['description']}"
                )
                
            # Construct API URL
            url = f"{self.base_url}/v3/historical-candle/intraday/{instrument_key}/{unit}/{interval}"
            
            # Make API request
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            data = response.json()
            
            if data.get('status') == 'error':
                raise ValueError(f"API Error: {data.get('message', 'Unknown error')}")
                
            # Process candle data
            candles = data.get('data', {}).get('candles', [])
            if not candles:
                logger.warning("No intraday candle data found in the response")
                return pd.DataFrame()
                
            # Convert to DataFrame
            df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume', 'oi'])
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            
            return candles
            # return df.sort_values('timestamp')
            
        except requests.exceptions.RequestException as e:
            logger.error(f"API request failed: {str(e)}")
            if hasattr(e.response, 'text'):
                logger.error(f"API response: {e.response.text}")
            raise
        except Exception as e:
            logger.error(f"Error fetching intraday data: {str(e)}")
            raise
        


In [None]:
# ... existing code ...

class InstrumentKeyFinder:
    def __init__(self, instruments_file: str = None):
        """
        Initialize the InstrumentKeyFinder with the path to the instruments JSON file.
        
        Args:
            instruments_file (str): Path to the active_instruments.json file
        """
        if instruments_file is None:
            # Smart path resolution that works in both notebooks and scripts
            self.instruments_file = self._find_instruments_file()
        else:
            self.instruments_file = instruments_file
            
        self.instruments_data = self._load_instruments()
    
    def _find_instruments_file(self) -> str:
        """
        Find the instruments file using multiple fallback strategies.
        
        Returns:
            str: Path to the active_instruments.json file
        """
        # Strategy 1: Try relative path from current working directory
        possible_paths = [
            # From project root
            os.path.join('src', 'broker_module', 'upstox', 'instruments', 'active_instruments.json'),
            # From src directory
            os.path.join('broker_module', 'upstox', 'instruments', 'active_instruments.json'),
            # From broker_module directory
            os.path.join('upstox', 'instruments', 'active_instruments.json'),
            # From upstox directory
            os.path.join('instruments', 'active_instruments.json'),
            # Absolute path fallback (adjust this path as needed)
            os.path.join(os.getcwd(), 'src', 'broker_module', 'upstox', 'instruments', 'active_instruments.json')
        ]
        
        for path in possible_paths:
            if os.path.exists(path):
                logger.info(f"Found instruments file at: {path}")
                return path
        
        # If none of the relative paths work, try to construct from current working directory
        current_dir = os.getcwd()
        logger.info(f"Current working directory: {current_dir}")
        
        # Try to find the project structure
        if 'Project_Shanmugar' in current_dir:
            # We're in the project directory
            project_root = current_dir
        elif 'src' in current_dir:
            # We're in the src directory
            project_root = os.path.dirname(current_dir)
        else:
            # Assume we're in the project root
            project_root = current_dir
        
        final_path = os.path.join(project_root, 'src', 'broker_module', 'upstox', 'instruments', 'active_instruments.json')
        
        if os.path.exists(final_path):
            logger.info(f"Found instruments file at: {final_path}")
            return final_path
        
        # If all else fails, raise an error with helpful information
        raise FileNotFoundError(
            f"Could not find active_instruments.json file. "
            f"Current directory: {current_dir}. "
            f"Tried paths: {possible_paths}. "
            f"Please specify the full path to the instruments file."
        )
        

# ... existing code ...

In [None]:

# Example usage
if __name__ == "__main__":

    client = UpstoxHistoricalData()
    
    try:
        # Example 1: Get historical 5-minute candles
        historical_df = client.get_historical_candles(
            instrument_key="NSE_FO|36702",
            unit="minutes",
            interval=5,
            to_date="2024-05-20",
            from_date="2024-04-01"
        )
        
        print("\nHistorical Candle Data:")
        print(historical_df)
#       print(historical_df.head())
        print(f"\nTotal historical candles: {len(historical_df)}")
        
        # Example 2: Get intraday 5-minute candles
        intraday_df = client.get_intraday_candles(
            instrument_key="NSE_FO|36702",
            unit="minutes",
            interval=5
        )
        
        print("\nIntraday Candle Data:")
        print(intraday_df.head())
        print(f"\nTotal intraday candles: {len(intraday_df)}")
    
    except Exception as e:
        print(f"Error: {str(e)}")


In [6]:
# ... existing code ...

def get_market_data_by_company_name(
    company_name: str,
    data_type: str = "historical",
    unit: str = "minutes",
    interval: int = 5,
    to_date: str = None,
    from_date: str = None,
    instruments_file: str = None
) -> pd.DataFrame:
    """
    Get market data for a company by searching for its instrument key and fetching data.
    
    Args:
        company_name (str): Company name to search for (e.g., "RELIANCE", "TCS", "INFY")
        data_type (str): Type of data to fetch - "historical" or "intraday"
        unit (str): Time unit for data (minutes, hours, days, weeks, months)
        interval (int): Time interval
        to_date (str): End date in YYYY-MM-DD format (required for historical data)
        from_date (str): Start date in YYYY-MM-DD format (optional for historical data)
        instruments_file (str): Optional path to instruments file
        
    Returns:
        pd.DataFrame: Market data with columns: timestamp, open, high, low, close, volume, oi
        
    Raises:
        ValueError: If parameters are invalid or company not found
        FileNotFoundError: If instruments file not found
        Exception: For other API or data errors
    """
    try:
        # Step 1: Find the instrument key for the company
        logger.info(f"Searching for instrument key for company: {company_name}")
        finder = InstrumentKeyFinder(instruments_file)
        instrument_key = finder.find_instrument_key(company_name)
        logger.info(f"Found instrument key: {instrument_key}")
        
        # Step 2: Initialize the data client
        client = UpstoxHistoricalData()
        
        # Step 3: Fetch data based on type
        if data_type.lower() == "historical":
            if not to_date:
                # Default to today if no date specified
                to_date = datetime.now().strftime('%Y-%m-%d')
                logger.info(f"No to_date specified, using today: {to_date}")
            
            logger.info(f"Fetching historical data for {instrument_key}")
            logger.info(f"Parameters: unit={unit}, interval={interval}, to_date={to_date}, from_date={from_date}")
            
            data = client.get_historical_candles(
                instrument_key=instrument_key,
                unit=unit,
                interval=interval,
                to_date=to_date,
                from_date=from_date
            )
            
        elif data_type.lower() == "intraday":
            logger.info(f"Fetching intraday data for {instrument_key}")
            logger.info(f"Parameters: unit={unit}, interval={interval}")
            
            data = client.get_intraday_candles(
                instrument_key=instrument_key,
                unit=unit,
                interval=interval
            )
            
        else:
            raise ValueError(f"Invalid data_type: {data_type}. Must be 'historical' or 'intraday'")
        
        # Step 4: Process and return the data
        if isinstance(data, list) and len(data) > 0:
            # Convert list to DataFrame if needed
            df = pd.DataFrame(data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume', 'oi'])
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            df = df.sort_values('timestamp')
            
            logger.info(f"Successfully fetched {len(df)} data points for {company_name}")
            return df
        else:
            logger.warning(f"No data returned for {company_name}")
            return pd.DataFrame()
            
    except Exception as e:
        logger.error(f"Error fetching data for {company_name}: {str(e)}")
        raise

def search_and_fetch_data_interactive():
    """
    Interactive function to search for companies and fetch their market data.
    This function provides a user-friendly interface for data retrieval.
    """
    print("=== Market Data Fetcher ===")
    print("This tool helps you find company instruments and fetch market data.\n")
    
    try:
        # Get company name from user
        company_name = input("Enter company name to search (e.g., RELIANCE, TCS, INFY): ").strip()
        if not company_name:
            print("Company name cannot be empty.")
            return
        
        # Get data type
        print("\nSelect data type:")
        print("1. Historical data")
        print("2. Intraday data")
        
        while True:
            try:
                choice = int(input("Enter your choice (1 or 2): "))
                if choice in [1, 2]:
                    data_type = "historical" if choice == 1 else "intraday"
                    break
                else:
                    print("Please enter 1 or 2.")
            except ValueError:
                print("Please enter a valid number.")
        
        # Get time parameters
        print(f"\nSelect time parameters for {data_type} data:")
        
        # Time unit selection
        if data_type == "historical":
            print("Time units: minutes, hours, days, weeks, months")
            unit = input("Enter time unit (default: minutes): ").strip() or "minutes"
        else:
            print("Time units: minutes, hours, days")
            unit = input("Enter time unit (default: minutes): ").strip() or "minutes"
        
        # Time interval selection
        try:
            interval = int(input("Enter time interval (default: 5): ") or "5")
        except ValueError:
            interval = 5
            print("Invalid interval, using default: 5")
        
        # Date parameters for historical data
        to_date = None
        from_date = None
        if data_type == "historical":
            to_date = input("Enter end date (YYYY-MM-DD, default: today): ").strip()
            if not to_date:
                to_date = datetime.now().strftime('%Y-%m-%d')
            
            from_date = input("Enter start date (YYYY-MM-DD, optional): ").strip()
            if not from_date:
                from_date = None
        
        print(f"\nFetching {data_type} data for {company_name}...")
        print(f"Parameters: unit={unit}, interval={interval}")
        if to_date:
            print(f"Date range: {from_date or 'default'} to {to_date}")
        
        # Fetch the data
        data = get_market_data_by_company_name(
            company_name=company_name,
            data_type=data_type,
            unit=unit,
            interval=interval,
            to_date=to_date,
            from_date=from_date
        )
        
        # Display results
        if not data.empty:
            print(f"\n✅ Successfully fetched {len(data)} data points!")
            print(f"Data columns: {list(data.columns)}")
            print(f"Date range: {data['timestamp'].min()} to {data['timestamp'].max()}")
            print(f"Price range: ₹{data['low'].min():.2f} to ₹{data['high'].max():.2f}")
            
            # Show sample data
            print(f"\nSample data (first 5 rows):")
            print(data.head())
            
            # Option to save data
            save_choice = input("\nWould you like to save this data to a CSV file? (y/n): ").strip().lower()
            if save_choice in ['y', 'yes']:
                filename = f"{company_name}_{data_type}_{unit}_{interval}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
                data.to_csv(filename, index=False)
                print(f"Data saved to: {filename}")
        else:
            print(f"\n❌ No data found for {company_name}")
            
    except KeyboardInterrupt:
        print("\n\nOperation cancelled by user.")
    except Exception as e:
        print(f"\n❌ Error: {str(e)}")
        logger.error(f"Interactive data fetch error: {str(e)}")

# Example usage functions
def example_usage():
    """
    Example usage of the get_market_data_by_company_name function.
    """
    print("=== Example Usage ===")
    
    # Example 1: Get historical data for RELIANCE
    try:
        print("\n1. Fetching historical data for RELIANCE...")
        reliance_data = get_market_data_by_company_name(
            company_name="RELIANCE",
            data_type="historical",
            unit="minutes",
            interval=15,
            to_date="2024-05-20",
            from_date="2024-05-19"
        )
        print(f"✅ Fetched {len(reliance_data)} data points for RELIANCE")
        
    except Exception as e:
        print(f"❌ Error fetching RELIANCE data: {str(e)}")
    
    # Example 2: Get intraday data for TCS
    try:
        print("\n2. Fetching intraday data for TCS...")
        tcs_data = get_market_data_by_company_name(
            company_name="TCS",
            data_type="intraday",
            unit="minutes",
            interval=5
        )
        print(f"✅ Fetched {len(tcs_data)} data points for TCS")
        
    except Exception as e:
        print(f"❌ Error fetching TCS data: {str(e)}")

# ... existing code ...

In [7]:
# Get historical data for a specific company
data = get_market_data_by_company_name(
    company_name="RELIANCE",
    data_type="historical",
    unit="minutes",
    interval=15,
    to_date="2024-05-20",
    from_date="2024-05-19"
)

INFO:__main__:Searching for instrument key for company: RELIANCE
INFO:__main__:Current working directory: c:\Users\Rajkumar\Desktop\Yuktrix_Algo_Trading\Project_Shanmugar\src\broker_module\upstox\data
ERROR:__main__:Error fetching data for RELIANCE: Could not find active_instruments.json file. Current directory: c:\Users\Rajkumar\Desktop\Yuktrix_Algo_Trading\Project_Shanmugar\src\broker_module\upstox\data. Tried paths: ['src\\broker_module\\upstox\\instruments\\active_instruments.json', 'broker_module\\upstox\\instruments\\active_instruments.json', 'upstox\\instruments\\active_instruments.json', 'instruments\\active_instruments.json', 'c:\\Users\\Rajkumar\\Desktop\\Yuktrix_Algo_Trading\\Project_Shanmugar\\src\\broker_module\\upstox\\data\\src\\broker_module\\upstox\\instruments\\active_instruments.json']. Please specify the full path to the instruments file.


FileNotFoundError: Could not find active_instruments.json file. Current directory: c:\Users\Rajkumar\Desktop\Yuktrix_Algo_Trading\Project_Shanmugar\src\broker_module\upstox\data. Tried paths: ['src\\broker_module\\upstox\\instruments\\active_instruments.json', 'broker_module\\upstox\\instruments\\active_instruments.json', 'upstox\\instruments\\active_instruments.json', 'instruments\\active_instruments.json', 'c:\\Users\\Rajkumar\\Desktop\\Yuktrix_Algo_Trading\\Project_Shanmugar\\src\\broker_module\\upstox\\data\\src\\broker_module\\upstox\\instruments\\active_instruments.json']. Please specify the full path to the instruments file.