diff --git a/docs/cli/data.md b/docs/cli/data.md index a6392a7..d5a8b3c 100644 --- a/docs/cli/data.md +++ b/docs/cli/data.md @@ -140,25 +140,33 @@ PyneCore uses a binary format (`.ohlcv`) for storing OHLCV data efficiently. How ### Converting to Other Formats -The `convert-to` command converts PyneCore format to CSV or JSON: +The `convert-to` command converts PyneCore OHLCV format to CSV or JSON: ```bash -pyne data convert-to PROVIDER [OPTIONS] +pyne data convert-to OHLCV_FILE [OPTIONS] ``` +Where `OHLCV_FILE` is the path to the OHLCV file to convert. + Options: -- `--symbol`, `-s`: Symbol to convert -- `--timeframe`, `-tf`: Timeframe in TradingView format -- `--format`, `-f`: Output format (csv, json) +- `--format`, `-f`: Output format (csv or json, default: csv) - `--as-datetime`, `-dt`: Save timestamp as datetime instead of UNIX timestamp +The command automatically: +- Adds `.ohlcv` extension if not specified +- Creates output file with the same name but different extension +- Looks in `workdir/data/` if only filename is provided + Example: ```bash -# Convert Bitcoin data to CSV -pyne data convert-to ccxt --symbol "BINANCE:BTC/USDT" --timeframe "1D" --format "csv" +# Convert OHLCV file to CSV +pyne data convert-to BTCUSDT_1D.ohlcv + +# Convert to JSON with human-readable dates +pyne data convert-to BTCUSDT_1D.ohlcv --format json --as-datetime -# Convert with human-readable dates -pyne data convert-to ccxt --symbol "BINANCE:BTC/USDT" --timeframe "1D" --format "csv" --as-datetime +# Short form (extension optional) +pyne data convert-to BTCUSDT_1D -f csv -dt ``` ### Converting from Other Formats @@ -172,21 +180,44 @@ pyne data convert-from FILE_PATH [OPTIONS] Where `FILE_PATH` is the path to the CSV or JSON file to convert. Options: -- `--provider`, `-p`: Data provider name (can be any name, defaults to "custom") -- `--symbol`, `-s`: Symbol name -- `--timeframe`, `-tf`: Timeframe in TradingView format -- `--fmt`, `-f`: Input format (csv, json) - defaults to the file extension if not specified +- `--provider`, `-p`: Data provider name (defaults to auto-detected from filename) +- `--symbol`, `-s`: Symbol name (defaults to auto-detected from filename) - `--timezone`, `-tz`: Timezone of the timestamps (defaults to UTC) +**Automatic Detection Features:** +- **Symbol Detection**: The command automatically detects symbols from common filename patterns +- **Provider Detection**: Recognizes provider names in filenames (BINANCE, BYBIT, CAPITALCOM, etc.) +- **Format Support**: Supports CSV and JSON files, auto-detected from file extension + +**Filename Pattern Examples:** +- `BTCUSDT.csv` → Symbol: BTC/USDT +- `EUR_USD.csv` → Symbol: EUR/USD +- `ccxt_BYBIT_BTC_USDT.csv` → Symbol: BTC/USDT, Provider: bybit +- `BINANCE_ETHUSDT_1h.csv` → Symbol: ETH/USDT, Provider: binance +- `capitalcom_EURUSD.csv` → Symbol: EUR/USD, Provider: capitalcom + Example: ```bash -# Convert CSV to PyneCore format -pyne data convert-from ./data/btcusd.csv --symbol "CUSTOM:BTC/USD" --timeframe "1D" +# Convert CSV with automatic detection +pyne data convert-from ./data/BTCUSDT.csv # Auto-detects BTC/USDT + +# Override auto-detected values if needed +pyne data convert-from ./data/btcusd.csv --symbol "BTC/USD" --provider "kraken" # Convert with timezone specification -pyne data convert-from ./data/eurusd.csv --symbol "CUSTOM:EUR/USD" --timeframe "60" --timezone "Europe/London" +pyne data convert-from ./data/eurusd.csv --timezone "Europe/London" ``` +**Generated TOML Configuration:** + +After conversion, a TOML configuration file is automatically generated with: +- **Smart Symbol Type Detection**: Automatically identifies forex, crypto, or other asset types +- **Tick Size Analysis**: Analyzes price data to determine the minimum price increment +- **Opening Hours Detection**: Detects trading hours from actual trading activity +- **Interval Detection**: Automatically determines the timeframe from timestamp intervals + +The generated TOML file includes all detected information and can be manually adjusted if needed. + ## Data File Structure PyneCore uses a structured approach to store OHLCV data: diff --git a/docs/cli/run.md b/docs/cli/run.md index e5a032c..129be7f 100644 --- a/docs/cli/run.md +++ b/docs/cli/run.md @@ -27,7 +27,7 @@ pyne run SCRIPT DATA [OPTIONS] Where: - `SCRIPT`: Path to the PyneCore script (.py) or Pine Script (.pine) file -- `DATA`: Path to the OHLCV data (.ohlcv) file +- `DATA`: Path to the data file (.ohlcv, .csv, .json, or .txt) - `OPTIONS`: Additional options to customize the execution ## Simple Example @@ -81,15 +81,61 @@ Example with API key: pyne run my_strategy.pine eurusd_data.ohlcv --api-key "your-api-key" ``` +## Automatic Data Conversion + +The `run` command now supports automatic conversion of non-OHLCV data formats. When you provide a CSV, JSON, or TXT file, the system automatically: + +1. **Detects the file format** from the extension +2. **Analyzes the filename** to extract symbol and provider information +3. **Converts the data** to OHLCV format +4. **Generates a TOML configuration** with detected parameters +5. **Runs the script** with the converted data + +### Supported Formats and Detection + +The automatic conversion supports: +- **CSV files**: Standard comma-separated values +- **JSON files**: JSON formatted OHLCV data +- **TXT files**: Tab, semicolon, or pipe-delimited data (coming soon) + +### Filename Pattern Detection + +The system recognizes common filename patterns: +- `BTCUSDT.csv` → Symbol: BTC/USDT +- `EUR_USD.json` → Symbol: EUR/USD +- `ccxt_BYBIT_BTC_USDT.csv` → Symbol: BTC/USDT, Provider: bybit +- `BINANCE_ETHUSDT_1h.csv` → Symbol: ETH/USDT, Provider: binance + +### Example with Automatic Conversion + +```bash +# Run a script with CSV data (automatic conversion) +pyne run my_strategy.py BTCUSDT.csv + +# The system will: +# 1. Detect BTC/USDT as the symbol +# 2. Convert CSV to OHLCV format +# 3. Generate BTCUSDT.toml with symbol info +# 4. Run the script with converted data +``` + +### Advanced Analysis During Conversion + +When converting data, the system performs advanced analysis: +- **Tick Size Detection**: Analyzes price movements to determine minimum price increment +- **Trading Hours Detection**: Identifies when the market is actively trading +- **Interval Auto-Correction**: Detects and fixes incorrect timeframe settings +- **Symbol Type Detection**: Identifies forex, crypto, or other asset types + ## Command Arguments The `run` command has two required arguments: - `SCRIPT`: The script file to run. If only a filename is provided, it will be searched in the `workdir/scripts/` directory. -- `DATA`: The OHLCV data file to use. If only a filename is provided, it will be searched in the `workdir/data/` directory. +- `DATA`: The data file to use. Supports .ohlcv, .csv, .json formats. If only a filename is provided, it will be searched in the `workdir/data/` directory. -Note: you don't need to write the `.py` and `.ohlcv` extensions in the command. +Note: you don't need to write the file extensions in the command. ## Command Options diff --git a/pytest.ini b/pytest.ini index 8642e04..4665b7e 100644 --- a/pytest.ini +++ b/pytest.ini @@ -6,4 +6,4 @@ log_cli_level = DEBUG log_cli_format = %(asctime)s %(levelname)6s %(module_func_line)30s - %(message)s log_cli_date_format = %Y-%m-%d %H:%M:%S -addopts = --import-mode=importlib -rs -x --spec --ignore-glob="**/data/**" +addopts = --import-mode=importlib -rs -x --spec --ignore-glob="**/data/*modified.py" diff --git a/src/pynecore/cli/commands/data.py b/src/pynecore/cli/commands/data.py index 0b2fedc..7a453b0 100644 --- a/src/pynecore/cli/commands/data.py +++ b/src/pynecore/cli/commands/data.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, TypeAlias from pathlib import Path from enum import Enum from datetime import datetime, timedelta, UTC @@ -13,23 +13,58 @@ from ..app import app, app_state from ...providers import available_providers from ...providers.provider import Provider +from ...lib.timeframe import in_seconds +from ...core.data_converter import DataConverter, SupportedFormats as InputFormats +from ...core.ohlcv_file import OHLCVReader from ...utils.rich.date_column import DateColumn -from pynecore.core.ohlcv_file import OHLCVReader, OHLCVWriter __all__ = [] app_data = Typer(help="OHLCV related commands") app.add_typer(app_data, name="data") -# Create an enum from it -AvailableProvidersEnum = Enum('Provider', {name.upper(): name.lower() for name in available_providers}) +# Trick to avoid type checking errors +if TYPE_CHECKING: + DateOrDays: TypeAlias = datetime -# Available intervals (The same fmt as described in timeframe.period) -TimeframeEnum = Enum('Timeframe', {name: name for name in ('1', '5', '15', '30', '60', '120', '240', '1D', '1W')}) -# Trick to avoid type checking errors -DateOrDays = datetime if TYPE_CHECKING else str + class AvailableProvidersEnum(Enum): + ... + +else: + # DateOrDays is either a datetime or a number of days + DateOrDays = str + + # Create an enum from available providers + AvailableProvidersEnum = Enum('Provider', {name.upper(): name.lower() for name in available_providers}) + + +# Available output formats +class OutputFormat(Enum): + CSV = 'csv' + JSON = 'json' + + +# TV-compatible timeframe validation function +def validate_timeframe(value: str) -> str: + """ + Validate TV-compatible timeframe string. + + :param value: Timeframe string to validate + :return: Validated timeframe string + :raises ValueError: If timeframe is invalid + """ + value = value.upper() + try: + # Test if it's a valid TV timeframe by trying to convert to seconds + in_seconds(value) + except (ValueError, AssertionError): + raise ValueError( + f"Invalid timeframe: {value}. Must be a valid timeframe in TradingView format " + f"(e.g. '1', '5', '60', '1D', '1W', '1M')." + ) + return value def parse_date_or_days(value: str) -> datetime | str: @@ -58,25 +93,26 @@ def parse_date_or_days(value: str) -> datetime | str: @app_data.command() def download( - provider: AvailableProvidersEnum = Argument(..., case_sensitive=False, show_default=False, # type: ignore + provider: AvailableProvidersEnum = Argument(..., case_sensitive=False, show_default=False, help="Data provider"), symbol: str | None = Option(None, '--symbol', '-s', show_default=False, help="Symbol (e.g. BYBIT:BTC/USDT:USDT)"), list_symbols: bool = Option(False, '--list-symbols', '-ls', help="List available symbols of the provider"), - timeframe: TimeframeEnum = Option('1D', '--timeframe', '-tf', case_sensitive=False, # type: ignore - help="Timeframe in TradingView fmt"), - time_from: DateOrDays = Option("continue", '--from', '-f', # type: ignore + timeframe: str = Option('1D', '--timeframe', '-tf', callback=validate_timeframe, + help="Timeframe in TradingView format (e.g., '1', '5S', '1D', '1W')"), + time_from: DateOrDays = Option("continue", '--from', '-f', callback=parse_date_or_days, formats=[], metavar="[%Y-%m-%d|%Y-%m-%d %H:%M:%S|NUMBER]|continue", help="Start date or days back from now, or 'continue' to resume last download," " or one year if no data"), - time_to: DateOrDays = Option(datetime.now(UTC).replace(second=0, microsecond=0), '--to', '-t', # type: ignore + time_to: DateOrDays = Option(datetime.now(UTC).replace(second=0, microsecond=0), '--to', '-t', callback=parse_date_or_days, formats=[], metavar="[%Y-%m-%d|%Y-%m-%d %H:%M:%S|NUMBER]", help="End date or days from start date"), show_info: bool = Option(False, '--symbol-info', '-si', help="Show symbol info"), - force_save_info: bool = Option(False, '--force-save-info', '-fi', help="Force save symbol info"), + force_save_info: bool = Option(False, '--force-save-info', '-fi', + help="Force save symbol info"), truncate: bool = Option(False, '--truncate', '-tr', help="Truncate file before downloading, all data will be lost"), ): @@ -104,7 +140,7 @@ def download( raise Exit(1) # Create provider instance - provider_instance: Provider = provider_class(symbol=symbol, timeframe=timeframe.value, + provider_instance: Provider = provider_class(symbol=symbol, timeframe=timeframe, ohlv_dir=app_state.data_dir) # Download symbol info if not exists @@ -198,13 +234,9 @@ def cb_progress(current_time: datetime): @app_data.command() def convert_to( - provider: AvailableProvidersEnum = Argument(..., case_sensitive=False, show_default=False, # type: ignore - help="Data provider"), - symbol: str | None = Option(None, '--symbol', '-s', show_default=False, - help="Symbol (e.g. BYBIT:BTCUSDT:USDT)"), - timeframe: TimeframeEnum = Option('1D', '--timeframe', '-tf', case_sensitive=False, # type: ignore - help="Timeframe in TradingView fmt"), - fmt: Enum('Format', {'csv': 'csv', 'json': 'json'}) = Option( # noqa # type: ignore + ohlcv_path: Path = Argument(..., dir_okay=False, file_okay=True, + help="Data file to convert (*.ohlcv)"), + fmt: OutputFormat = Option( 'csv', '--format', '-f', case_sensitive=False, help="Output format"), @@ -214,63 +246,106 @@ def convert_to( """ Convert downloaded data from pyne's OHLCV format to another format """ - # Import provider module from - provider_module = __import__(f"pynecore.providers.{provider.value}", fromlist=['']) - provider_class = getattr(provider_module, [p for p in dir(provider_module) if p.endswith('Provider')][0]) - ohlcv_path = provider_class.get_ohlcv_path(symbol, timeframe.value, app_state.data_dir) - + # Check file format and extension + if ohlcv_path.suffix == "": + # No extension, add .ohlcv + ohlcv_path = ohlcv_path.with_suffix(".ohlcv") + + # Expand data path + if len(ohlcv_path.parts) == 1: + ohlcv_path = app_state.data_dir / ohlcv_path + # Check if data exists + if not ohlcv_path.exists(): + secho(f"Data file '{ohlcv_path}' not found!", fg="red", err=True) + raise Exit(1) + + out_path = None with Progress(SpinnerColumn(finished_text="[green]✓"), TextColumn("{task.description}")) as progress: # Convert with OHLCVReader(str(ohlcv_path)) as ohlcv_reader: - if fmt.value == 'csv': + if fmt.value == OutputFormat.CSV.value: task = progress.add_task(description="Converting to CSV...", total=1) - ohlcv_reader.save_to_csv(str(ohlcv_path.with_suffix('.csv')), as_datetime=as_datetime) + out_path = str(ohlcv_path.with_suffix('.csv')) + ohlcv_reader.save_to_csv(out_path, as_datetime=as_datetime) - elif fmt.value == 'json': + elif fmt.value == OutputFormat.JSON.value: task = progress.add_task(description="Converting to JSON...", total=1) - ohlcv_reader.save_to_json(str(ohlcv_path.with_suffix('.json')), as_datetime=as_datetime) + out_path = str(ohlcv_path.with_suffix('.json')) + ohlcv_reader.save_to_json(out_path, as_datetime=as_datetime) + + else: + raise ValueError(f"Unsupported format: {fmt}") - # Complete task - progress.update(task, completed=1) + # Complete task + progress.update(task, completed=1) + + if out_path: + secho(f'Data file converted successfully to "{out_path}"!') @app_data.command() def convert_from( - file_path: Path = Argument(..., help="Path to CSV file to convert"), - provider: str = Option("custom", '--provider', '-p', + file_path: Path = Argument(..., help="Path to CSV/JSON/TXT file to convert"), + provider: str = Option(None, '--provider', '-p', help="Data provider, can be any name"), symbol: str | None = Option(None, '--symbol', '-s', show_default=False, - help="Symbol (e.g. BYBIT:BTCUSDT:USDT)"), - timeframe: TimeframeEnum = Option('1D', '--timeframe', '-tf', case_sensitive=False, # type: ignore - help="Timeframe in TradingView fmt"), - fmt: Enum('Format', {'csv': 'csv', 'json': 'json'}) | None = Option( # noqa # type: ignore - None, '--fmt', '-f', - case_sensitive=False, - help="Output fmt"), - tz: str = Option('UTC', '--timezone', '-tz', - help="Timezone"), + help="Symbol (default: from file name)"), + tz: str = Option('UTC', '--timezone', '-tz', help="Timezone"), ): """ Convert data from other sources to pyne's OHLCV format """ - with Progress(SpinnerColumn(finished_text="[green]✓"), TextColumn("{task.description}")) as progress: - ohlcv_path = Provider.get_ohlcv_path(symbol, timeframe.value, app_state.data_dir, provider) - if fmt is None: - fmt = file_path.suffix[1:] # noqa - else: - fmt = fmt.value - # Convert - with OHLCVWriter(ohlcv_path) as ohlcv_writer: - if fmt == 'csv': - task = progress.add_task(description="Converting from CSV...", total=1) - ohlcv_writer.load_from_csv(file_path, tz=tz) - - elif fmt == 'json': - task = progress.add_task(description="Converting from JSON...", total=1) - ohlcv_writer.load_from_json(file_path, tz=tz) - else: - secho(f"Error: Invalid format: {fmt}", err=True, fg=colors.RED) - raise Exit(1) + # Expand file path if only filename is provided (look in workdir/data) + if len(file_path.parts) == 1: + file_path = app_state.data_dir / file_path + + # Check if file exists + if not file_path.exists(): + secho(f'File "{file_path}" not found!', fg=colors.RED, err=True) + raise Exit(1) + + # Auto-detect symbol and provider from filename if not provided + detected_symbol, detected_provider = DataConverter.guess_symbol_from_filename(file_path) + + if symbol is None: + symbol = detected_symbol + + if provider is None and detected_provider is not None: + provider = detected_provider + + # Ensure we have required parameters + if symbol is None: + secho(f"Error: Could not detect symbol from filename '{file_path.name}'!", fg=colors.RED, err=True) + secho("Please provide a symbol using --symbol option.", fg=colors.YELLOW, err=True) + raise Exit(1) + + # Auto-detect file format + fmt = file_path.suffix[1:].lower() + if fmt not in InputFormats: + raise ValueError(f"Unsupported file format: {file_path}") + + # Use the enhanced DataConverter for automatic conversion + converter = DataConverter() + + try: + with Progress(SpinnerColumn(finished_text="[green]✓"), TextColumn("{task.description}")) as progress: + task = progress.add_task(description=f"Converting {fmt.upper()} to OHLCV format...", total=1) + + # Perform conversion with automatic TOML generation + converter.convert_to_ohlcv( + file_path=Path(file_path), + provider=provider, + symbol=symbol, + timezone=tz, + force=True + ) - # Complete task progress.update(task, completed=1) + + except Exception as e: + secho(f"Error: {e}", err=True, fg=colors.RED) + raise Exit(1) + + secho(f'Data file converted successfully to "{file_path}".') + secho(f'A configuration file was automatically generated for you at "{file_path.with_suffix(".toml")}". ' + f'Please check it and adjust it to match your needs.') diff --git a/src/pynecore/cli/commands/run.py b/src/pynecore/cli/commands/run.py index 04f08f1..851adcf 100644 --- a/src/pynecore/cli/commands/run.py +++ b/src/pynecore/cli/commands/run.py @@ -17,6 +17,7 @@ from ...utils.rich.date_column import DateColumn from pynecore.core.ohlcv_file import OHLCVReader +from pynecore.core.data_converter import DataConverter, DataFormatError, ConversionError from pynecore.core.syminfo import SymInfo from pynecore.core.script_runner import ScriptRunner @@ -85,6 +86,7 @@ def run( help="PyneSys API key for compilation (overrides configuration file)", envvar="PYNESYS_API_KEY", rich_help_panel="Compilation Options"), + ): """ Run a script (.py or .pine) @@ -102,6 +104,9 @@ def run( file is newer than the [italic]py[/] file or if the [italic].py[/] file doesn't exist. The compiled [italic].py[/] file will be saved into the same folder as the original [italic].pine[/] file. A valid [bold]PyneSys API[/bold] key is required for Pine Script compilation. You can get one at [blue]https://pynesys.io[/blue]. + + [bold]Data Support:[/bold] + Supports CSV, TXT, JSON, and OHLCV data files. Non-OHLCV files are automatically converted. Symbol is auto-detected from filename. """ # noqa # Expand script path @@ -137,7 +142,7 @@ def run( if api_key: api_config['api_key'] = api_key - if api_config['api_key']: + if api_config.get('api_key'): # Create the compiler instance compiler = PyneComp(**api_config) @@ -175,15 +180,46 @@ def run( # No extension, add .ohlcv data = data.with_suffix(".ohlcv") elif data.suffix != ".ohlcv": - # Has extension but not .ohlcv - secho(f"Cannot run with '{data.suffix}' files. The PyneCore runtime requires .ohlcv format.", - fg="red", err=True) - secho("If you're trying to use a different data format, please convert it first:", fg="red") - symbol_placeholder = "YOUR_SYMBOL" - timeframe_placeholder = "YOUR_TIMEFRAME" - secho(f"pyne data convert-from {data} --symbol {symbol_placeholder} --timeframe {timeframe_placeholder}", - fg="yellow") - raise Exit(1) + # Has extension but not .ohlcv - automatically convert + try: + converter = DataConverter() + + # Check if conversion is needed + if converter.is_conversion_required(data): + # Auto-detect symbol and provider from filename + detected_symbol, detected_provider = DataConverter.guess_symbol_from_filename(data) + + if not detected_symbol: + detected_symbol = data.stem.upper() + + with Progress( + SpinnerColumn(finished_text="[green]✓"), + TextColumn("[progress.description]{task.description}"), + console=console + ) as progress: + task = progress.add_task(f"Converting {data.suffix} to OHLCV format...", total=1) + + # Perform conversion with smart defaults + converter.convert_to_ohlcv( + data, + provider=detected_provider, + symbol=detected_symbol, + force=True + ) + + # After conversion, the OHLCV file has the same name but .ohlcv extension + data = data.with_suffix(".ohlcv") + + progress.update(task, completed=1) + else: + # File is already up-to-date, use existing OHLCV file + data = data.with_suffix(".ohlcv") + + except (DataFormatError, ConversionError) as e: + secho(f"Conversion failed: {e}", fg="red", err=True) + secho("Please convert the file manually:", fg="red") + secho(f"pyne data convert-from {data}", fg="yellow") + raise Exit(1) # Expand data path if len(data.parts) == 1: diff --git a/src/pynecore/core/data_converter.py b/src/pynecore/core/data_converter.py new file mode 100644 index 0000000..8951a35 --- /dev/null +++ b/src/pynecore/core/data_converter.py @@ -0,0 +1,712 @@ +""" +Automatic data file to OHLCV conversion functionality. + +This module provides automatic detection and conversion of CSV, TXT, and JSON files +to OHLCV format when needed, eliminating the manual step of running pyne data convert. +""" +from __future__ import annotations + +import json +from enum import Enum +from datetime import time +from pathlib import Path +from typing import Literal + +from pynecore.core.ohlcv_file import OHLCVWriter, OHLCVReader +from pynecore.utils.file_utils import copy_mtime, is_updated +from ..lib.timeframe import from_seconds +from .syminfo import SymInfo, SymInfoInterval, SymInfoSession + + +class DataFormatError(Exception): + """Raised when file format cannot be detected or is unsupported.""" + pass + + +class ConversionError(Exception): + """Raised when conversion fails.""" + pass + + +class SupportedFormats(Enum): + """Supported data file formats.""" + CSV = 'csv' + TXT = 'txt' + JSON = 'json' + + +class DataConverter: + """ + Main class for automatic data file conversion. + + Provides both CLI and programmatic interfaces for converting + CSV, TXT, and JSON files to OHLCV format automatically. + """ + + @staticmethod + def is_conversion_required(source_path: Path, ohlcv_path: Path | None = None) -> bool: + """ + Check if conversion is required based on file freshness. + + :param source_path: Path to the source file + :param ohlcv_path: Path to the OHLCV file (auto-generated if None) + :return: True if conversion is needed + """ + if ohlcv_path is None: + ohlcv_path = source_path.with_suffix('.ohlcv') + + # If OHLCV file doesn't exist, conversion is needed + if not ohlcv_path.exists(): + return True + + # Use existing file utility to check if source is newer + return is_updated(source_path, ohlcv_path) + + def convert_to_ohlcv( + self, + file_path: Path, + *, + force: bool = False, + provider: str | None = None, + symbol: str | None = None, + timezone: str = "UTC" + ) -> None: + """ + Convert multiple file formats to OHLCV format. + + :param file_path: Path to the data file + :param force: Force conversion even if OHLCV file is up-to-date + :param provider: Data provider name for OHLCV file naming + :param symbol: Symbol for OHLCV file naming + :param timezone: Timezone for timestamp conversion + :raises FileNotFoundError: If source file doesn't exist + :raises DataFormatError: If file format is unsupported + :raises ConversionError: If conversion fails + """ + if not file_path.exists(): + raise FileNotFoundError(f"Source file not found: {file_path}") + + # Detect file format + detected_format = self.detect_format(file_path) + + # If it's already OHLCV, no conversion needed + if detected_format == 'ohlcv': + raise ConversionError(f"Source file is already in OHLCV format: {file_path}") + + # Check if format is supported + if detected_format not in SupportedFormats: + raise DataFormatError(f"Unsupported file format '{detected_format}' for file: {file_path}") + + # Determine OHLCV output path + ohlcv_path = file_path.with_suffix('.ohlcv') + + # Check if conversion is needed + if not force and not self.is_conversion_required(file_path, ohlcv_path): + return + + # Auto-detect symbol and provider from filename if not provided + if symbol is None or provider is None: + detected_symbol, detected_provider = self.guess_symbol_from_filename(file_path) + if symbol is None: + symbol = detected_symbol + if provider is None and detected_provider is not None: + provider = detected_provider + + # Use default provider if not specified + if provider is None: + provider = "CUSTOM" + + analyzed_tick_size = None + analyzed_price_scale = None + analyzed_min_move = None + detected_timeframe = None + + try: + # Perform conversion directly to target file with truncate to clear existing data + with OHLCVWriter(ohlcv_path, truncate=True) as ohlcv_writer: + if detected_format == 'csv': + ohlcv_writer.load_from_csv(file_path, tz=timezone) + elif detected_format == 'json': + ohlcv_writer.load_from_json(file_path, tz=timezone) + elif detected_format == 'txt': + ohlcv_writer.load_from_txt(file_path, tz=timezone) + else: + raise ConversionError(f"Unsupported format for conversion: {detected_format}") + + # Get timeframe directly from writer + if ohlcv_writer.interval is None: + raise ConversionError("Cannot determine timeframe from OHLCV file (less than 2 records)") + try: + detected_timeframe = from_seconds(ohlcv_writer.interval) + except (ValueError, AssertionError): + raise ConversionError( + f"Cannot convert interval {ohlcv_writer.interval} seconds to valid timeframe") + + # Get analyzed tick size data from writer + analyzed_tick_size = ohlcv_writer.analyzed_tick_size + analyzed_price_scale = ohlcv_writer.analyzed_price_scale + analyzed_min_move = ohlcv_writer.analyzed_min_move + + # Copy modification time from source to maintain freshness + copy_mtime(file_path, ohlcv_path) + + # Generate TOML symbol info file if needed + toml_path = file_path.with_suffix('.toml') + + # Skip if TOML file exists and is newer than source (unless force is True) + if symbol and (force or not toml_path.exists() or is_updated(file_path, toml_path)): + # Use analyzed values from OHLCVWriter + if analyzed_tick_size: + mintick = analyzed_tick_size + pricescale = analyzed_price_scale or int(round(1.0 / analyzed_tick_size)) + minmove = analyzed_min_move or 1 + else: + # Fallback to safe defaults if analysis failed + mintick = 0.01 + pricescale = 100 + minmove = 1 + + # Determine symbol type based on symbol name patterns + symbol_upper = symbol.upper() + symbol_type, currency, base_currency = self.guess_symbol_type(symbol_upper) + + # Point value cannot be detected from data, always use 1.0 + # Users can manually adjust in the generated TOML file if needed + pointvalue = 1.0 + + # Get opening hours from OHLCVWriter + analyzed_opening_hours = ohlcv_writer.analyzed_opening_hours + + if analyzed_opening_hours: + # Use automatically detected opening hours + opening_hours = analyzed_opening_hours + else: + # Fallback to default based on symbol type (insufficient data or analysis failed) + opening_hours = self.get_default_opening_hours(symbol_type) + + # Create session starts and ends + session_starts = [SymInfoSession(day=1, time=time(0, 0, 0))] + session_ends = [SymInfoSession(day=7, time=time(23, 59, 59))] + + # Create SymInfo instance + # Use provider as prefix (uppercase), default to "CUSTOM" if not provided + prefix = provider.upper() if provider else "CUSTOM" + syminfo = SymInfo( + prefix=prefix, + description=f"{symbol}", + ticker=symbol_upper, + currency=currency, + basecurrency=base_currency or "USD", + period=detected_timeframe, + type=symbol_type, + mintick=mintick, + pricescale=int(pricescale), + minmove=int(minmove), + pointvalue=pointvalue, + opening_hours=opening_hours, + session_starts=session_starts, + session_ends=session_ends, + timezone=timezone, + ) + + # Save using SymInfo's built-in method + try: + syminfo.save_toml(toml_path) + # Copy modification time from source to maintain consistency + copy_mtime(file_path, toml_path) + except (OSError, IOError): + # Don't fail the entire conversion if TOML creation fails + pass + + except Exception as e: + # Clean up output file on error + if ohlcv_path.exists(): + try: + ohlcv_path.unlink() + except OSError: + pass + raise ConversionError(f"Failed to convert {file_path}: {e}") from e + + @staticmethod + def detect_format(file_path: Path) -> Literal['csv', 'txt', 'json', 'ohlcv', 'unknown']: + """ + Detect file format by content inspection. + + :param file_path: Path to the file to analyze + :return: Detected format + :raises FileNotFoundError: If file doesn't exist + :raises DataFormatError: If file cannot be read + """ + if not file_path.exists(): + raise FileNotFoundError(f"File not found: {file_path}") + + # First check if it's a valid OHLCV file (binary format) + try: + with OHLCVReader(file_path): + # If we can open it successfully, it's a valid OHLCV file + return 'ohlcv' + except (ValueError, OSError, IOError): + # Not a valid OHLCV file, detect by content + pass + + # Detect text-based formats by content + try: + with open(file_path, 'r', encoding='utf-8') as f: + # Read first line for initial analysis + first_line = f.readline().strip() + + # Quick JSON check - look for JSON indicators + if first_line and (first_line.startswith('{') or first_line.startswith('[')): + # Verify it's valid JSON by reading the whole file + f.seek(0) + try: + json.load(f) + return 'json' + except (json.JSONDecodeError, UnicodeDecodeError): + pass + # Reset for further analysis if not JSON + f.seek(0) + first_line = f.readline().strip() + + # Check for CSV patterns + if first_line and ',' in first_line: + # Count commas to see if it looks like structured data + comma_count = first_line.count(',') + if comma_count >= 4: # At least OHLC columns + return 'csv' + + # Check for other delimiters (TXT) + if first_line and any(delim in first_line for delim in ['\t', ';', '|']): + return 'txt' + + # Default to CSV if it has any commas + if first_line and ',' in first_line: + return 'csv' + + return 'unknown' + + except (OSError, IOError, UnicodeDecodeError): + return 'unknown' + + @staticmethod + def get_default_opening_hours(symbol_type: str) -> list[SymInfoInterval]: + """ + Get default opening hours based on symbol type. + + :param symbol_type: Type of symbol ('crypto', 'forex', 'stock', or 'other') + :return: List of SymInfoInterval objects representing default trading hours + """ + opening_hours = [] + + if symbol_type == 'crypto': + # 24/7 trading for crypto + for day in range(1, 8): + opening_hours.append(SymInfoInterval( + day=day, + start=time(0, 0, 0), + end=time(23, 59, 59) + )) + elif symbol_type == 'forex': + # Forex markets: Sunday 5 PM ET to Friday 5 PM ET (roughly) + # Using Monday-Friday 00:00-23:59 as approximation + for day in range(1, 6): + opening_hours.append(SymInfoInterval( + day=day, + start=time(0, 0, 0), + end=time(23, 59, 59) + )) + else: + # Stock markets and others: typical business hours (Mon-Fri 9:30 AM - 4:00 PM) + for day in range(1, 6): + opening_hours.append(SymInfoInterval( + day=day, + start=time(9, 30, 0), + end=time(16, 0, 0) + )) + + return opening_hours + + @staticmethod + def guess_symbol_from_filename(file_path: Path) -> tuple[str | None, str | None]: + """ + Guess symbol and provider from filename based on common patterns. + + :param file_path: Path to the data file + :return: Tuple of (symbol, provider) or (None, None) if not detected + """ + filename = file_path.stem # Filename without extension + filename_upper = filename.upper() + + # Known provider patterns - these will be detected first + provider_patterns = { + 'capitalcom': ['CAPITALCOM'], + 'capital.com': ['CAPITAL.COM', 'CAPITAL_COM'], + 'ccxt': ['CCXT'], + 'tradingview': ['TRADINGVIEW', 'TV'], + 'mt4': ['MT4', 'METATRADER4'], + 'mt5': ['MT5', 'METATRADER5'], + 'binance': ['BINANCE'], + 'bybit': ['BYBIT'], + 'coinbase': ['COINBASE'], + 'kraken': ['KRAKEN'], + 'oanda': ['OANDA'], + 'ib': ['IB', 'INTERACTIVE_BROKERS'], + } + + # Exchange names for crypto detection + exchange_names = ['BINANCE', 'BYBIT', 'COINBASE', 'KRAKEN', 'BITFINEX', 'HUOBI', 'OKEX', 'FTX'] + + # Crypto bases and quotes for pair detection + crypto_bases = ['BTC', 'ETH', 'XRP', 'ADA', 'DOT', 'LINK', 'LTC', 'BCH', 'UNI', 'MATIC', + 'SOL', 'AVAX', 'LUNA', 'ATOM', 'FTM', 'NEAR', 'ALGO', 'VET', 'FIL', 'ICP'] + # Order matters! Longer suffixes first to avoid false matches (USDT before USD) + crypto_quotes = ['USDT', 'USDC', 'BUSD', 'TUSD', 'DAI', 'USD', 'EUR', 'GBP', 'JPY', 'BTC', 'ETH'] + + detected_provider = None + detected_symbol = None + + # Step 1: Try exchange-based detection first (handles BINANCE_BTC_USDT, BYBIT:BTC:USDT, etc.) + # Clean separators and split + cleaned = filename.replace(':', '_').replace('/', '_').replace(',', '_') + parts = [p.strip() for p in cleaned.split('_') if p.strip()] + + if len(parts) >= 2 and parts[0].upper() in exchange_names: + # Exchange detected at the beginning + detected_provider = parts[0].lower() + + if len(parts) >= 3: + # Could be EXCHANGE_BASE_QUOTE format + if parts[1].upper() in crypto_bases and parts[2].upper() in crypto_quotes: + # Format: BINANCE_BTC_USDT + detected_symbol = f"{parts[1].upper()}/{parts[2].upper()}" + elif parts[1].upper() in crypto_bases: + # Maybe compact format in later parts: CCXT_BYBIT_BTC_USDT_USDT_1 + # Look for quote in remaining parts + for i in range(2, len(parts)): + if parts[i].upper() in crypto_quotes: + detected_symbol = f"{parts[1].upper()}/{parts[i].upper()}" + break + if not detected_symbol: + # No quote found, use the second part as-is + detected_symbol = parts[1].upper() + else: + # Check if second part is a compact pair (BTCUSDT) + potential = parts[1].upper() + for quote in crypto_quotes: + if potential.endswith(quote): + base = potential[:-len(quote)] + if base in crypto_bases: + detected_symbol = f"{base}/{quote}" + break + if not detected_symbol: + # Use second part as-is + detected_symbol = parts[1].upper() + elif len(parts) == 2: + # EXCHANGE_SYMBOL format + potential = parts[1].upper() + # Try to detect compact crypto pair + for quote in crypto_quotes: + if potential.endswith(quote): + base = potential[:-len(quote)] + if base in crypto_bases: + detected_symbol = f"{base}/{quote}" + break + if not detected_symbol: + detected_symbol = potential + + if detected_symbol: + return detected_symbol, detected_provider + + # Step 2: Check for explicit provider patterns (handles CAPITALCOM_EURUSD, TV_BTCUSD, etc.) + # Special case for ccxt_EXCHANGE pattern + if filename_upper.startswith('CCXT_'): + # Remove CCXT_ prefix and try to detect exchange and symbol + temp = filename[5:] # Remove "CCXT_" + temp_parts = temp.replace(':', '_').replace('/', '_').split('_') + if len(temp_parts) >= 2 and temp_parts[0].upper() in exchange_names: + # Format: CCXT_EXCHANGE_... - provider is the exchange name, not 'ccxt' + detected_provider = temp_parts[0].lower() # Use exchange name as provider + # Try to extract symbol from remaining parts + if len(temp_parts) >= 3: + # Try BASE/QUOTE detection + for i in range(1, len(temp_parts) - 1): + if temp_parts[i].upper() in crypto_bases: + for j in range(i + 1, len(temp_parts)): + if temp_parts[j].upper() in crypto_quotes: + detected_symbol = f"{temp_parts[i].upper()}/{temp_parts[j].upper()}" + return detected_symbol, detected_provider + # Fallback to simple extraction + detected_symbol = '_'.join(temp_parts[1:]) if len(temp_parts) > 1 else None + if detected_symbol: + return detected_symbol.upper(), detected_provider + else: + # No recognized exchange after ccxt_, just use ccxt as provider + detected_provider = 'ccxt' + detected_symbol = '_'.join(temp_parts) if temp_parts else None + if detected_symbol: + return detected_symbol.upper(), detected_provider + + for provider, patterns in provider_patterns.items(): + for pattern in patterns: + if pattern in filename_upper: + detected_provider = provider + # Remove provider pattern from filename for symbol detection + temp_filename = filename + for p in patterns: + temp_filename = temp_filename.replace(p, '').replace(p.lower(), '').replace(p.capitalize(), '') + temp_filename = temp_filename.strip('_').strip('-').strip(',').strip().strip() + + # TradingView format might have extra parts like ", 30_cbf9d" + # First remove everything after comma if present + if ',' in temp_filename: + temp_filename = temp_filename.split(',')[0].strip() + + if '_' in temp_filename: + temp_parts = temp_filename.split('_') + # Filter out hash-like strings and pure numbers + symbol_parts = [] + for part in temp_parts: + part = part.strip() + if not part: + continue + # Skip if looks like a hash or timeframe + if len(part) <= 6 and any(c.isdigit() for c in part) and any(c.isalpha() for c in part): + continue + if part.isdigit(): + continue + if part.upper() in ['1M', '5M', '15M', '30M', '60M', '1H', '4H', '1D', '1W', 'DAILY', + 'HOURLY', 'WEEKLY']: + continue + symbol_parts.append(part) + if symbol_parts: + temp_filename = '_'.join(symbol_parts) + + if temp_filename: + # Try to parse the symbol + temp_upper = temp_filename.upper() + + # Check for forex pair (6 chars, all letters) + if len(temp_upper) == 6 and temp_upper.isalpha(): + detected_symbol = temp_upper + # Check for crypto pair + elif any(base in temp_upper for base in crypto_bases): + for quote in crypto_quotes: + if temp_upper.endswith(quote): + base = temp_upper[:-len(quote)] + if base in crypto_bases: + detected_symbol = f"{base}/{quote}" + break + if not detected_symbol: + detected_symbol = temp_upper + else: + detected_symbol = temp_upper + break + if detected_provider: + break + + # Step 3: If no provider detected, try to infer from symbol pattern + if not detected_provider and not detected_symbol: + # Remove common suffixes and prefixes + clean_name = filename + for suffix in ['_1M', '_5M', '_15M', '_30M', '_60M', '_1H', '_4H', '_1D', '_1W', '_DAILY', '_HOURLY', + '_WEEKLY']: + if clean_name.upper().endswith(suffix): + clean_name = clean_name[:len(clean_name) - len(suffix)] + break + + clean_upper = clean_name.upper() + + # First check for crypto patterns (more specific) + for quote in crypto_quotes: + if clean_upper.endswith(quote): + base = clean_upper[:-len(quote)] + if base in crypto_bases: + detected_symbol = f"{base}/{quote}" + detected_provider = 'ccxt' + break + + # If not crypto, check for 6-letter forex pair + if not detected_symbol and len(clean_upper) == 6 and clean_upper.isalpha(): + detected_symbol = clean_upper + detected_provider = 'forex' + + # If still no match, check for separator-based pairs + if not detected_symbol: + # Try underscore or dash separator + if '_' in clean_name: + parts = clean_name.split('_') + elif '-' in clean_name: + parts = clean_name.split('-') + else: + parts = [] + + if len(parts) == 2: + if len(parts[0]) == 3 and len(parts[1]) == 3 and parts[0].isalpha() and parts[1].isalpha(): + # Likely forex: EUR_USD or EUR-USD + detected_symbol = parts[0].upper() + parts[1].upper() + detected_provider = 'forex' + elif parts[0].upper() in crypto_bases and parts[1].upper() in crypto_quotes: + # Crypto: BTC_USDT or BTC-USDT + detected_symbol = f"{parts[0].upper()}/{parts[1].upper()}" + detected_provider = 'ccxt' + + # Last resort - if it's a known ticker (must have at least one letter) + if not detected_symbol and len(clean_upper) >= 3 and clean_upper.isalnum() and any( + c.isalpha() for c in clean_upper): + detected_symbol = clean_upper + + return detected_symbol, detected_provider + + @staticmethod + def guess_symbol_type(symbol_upper: str) -> tuple[Literal["forex", "crypto", "other"], str, str | None]: + """ + Guess symbol type and extract currency information based on common patterns. + + :param symbol_upper: Uppercase symbol string + :return: Tuple of (symbol_type, currency, base_currency) + """ + # Common forex pairs - check these first for accurate detection + forex_pairs = { + 'EURUSD', 'GBPUSD', 'USDJPY', 'USDCHF', 'AUDUSD', 'USDCAD', 'NZDUSD', + 'EURGBP', 'EURJPY', 'GBPJPY', 'EURCHF', 'EURAUD', 'EURCAD', 'EURNZD', + 'GBPCHF', 'GBPAUD', 'GBPCAD', 'GBPNZD', 'AUDJPY', 'AUDCHF', 'AUDCAD', + 'AUDNZD', 'CADJPY', 'CADCHF', 'NZDJPY', 'NZDCHF', 'NZDCAD', 'CHFJPY', + 'EUR/USD', 'GBP/USD', 'USD/JPY', 'USD/CHF', 'AUD/USD', 'USD/CAD', 'NZD/USD' + } + + # Common crypto symbols + crypto_symbols = { + 'BTC', 'ETH', 'BNB', 'ADA', 'SOL', 'DOT', 'DOGE', 'AVAX', 'LUNA', 'SHIB', + 'MATIC', 'UNI', 'LINK', 'LTC', 'ALGO', 'BCH', 'XLM', 'VET', 'ATOM', 'FIL', + 'TRX', 'ETC', 'XMR', 'MANA', 'SAND', 'HBAR', 'EGLD', 'THETA', 'FTM', 'XTZ', + 'AAVE', 'AXS', 'CAKE', 'CRO', 'NEAR', 'KSM', 'ENJ', 'CHZ', 'SUSHI', 'SNX' + } + + # Initialize default values + symbol_type: Literal["forex", "crypto", "other"] = 'other' + currency = 'USD' + base_currency: str | None = None + + # Clean up separators + clean_symbol = symbol_upper.replace('_', '').replace('-', '').replace(':', '').strip() + + # Check if it's a direct forex pair match (check both with and without slash) + if clean_symbol in forex_pairs or symbol_upper in forex_pairs or \ + any(pair.replace('/', '') in clean_symbol for pair in forex_pairs): + symbol_type = 'forex' + # Extract currencies from forex pair - more robust extraction + matched = False + for pair in forex_pairs: + clean_pair = pair.replace('/', '') + # Check both versions + if clean_pair in clean_symbol or pair == symbol_upper: + # Found exact match + if '/' in pair: + parts = pair.split('/') + base_currency = parts[0] + currency = parts[1] + else: + base_currency = pair[:3] + currency = pair[3:6] + matched = True + break + + if not matched: + # Fallback extraction for forex + if 'EUR' in clean_symbol: + base_currency = 'EUR' + remaining = clean_symbol.replace('EUR', '') + currency = remaining[:3] if len(remaining) >= 3 else 'USD' + elif 'GBP' in clean_symbol: + base_currency = 'GBP' + remaining = clean_symbol.replace('GBP', '') + currency = remaining[:3] if len(remaining) >= 3 else 'USD' + elif clean_symbol.startswith('USD'): + base_currency = 'USD' + currency = clean_symbol[3:6] if len(clean_symbol) >= 6 else 'EUR' + else: + # Try to extract 3-letter codes + base_currency = clean_symbol[:3] if len(clean_symbol) >= 3 else 'EUR' + currency = clean_symbol[3:6] if len(clean_symbol) >= 6 else 'USD' + + # Check if symbol contains '/' separator (explicit format) + elif '/' in symbol_upper: + parts = symbol_upper.split('/') + if len(parts) == 2: + left_part = parts[0].strip() + right_part = parts[1].strip() + + # Check if it's crypto (contains crypto symbols or stable coins) + if any(crypto in left_part for crypto in crypto_symbols) or \ + right_part in ['USDT', 'USDC', 'BUSD', 'DAI', 'UST', 'TUSD']: + symbol_type = 'crypto' + currency = right_part + base_currency = left_part + # Check if it's forex (both parts are 3-letter currency codes) + elif len(left_part) == 3 and len(right_part) == 3 and \ + left_part.isalpha() and right_part.isalpha(): + symbol_type = 'forex' + base_currency = left_part + currency = right_part + else: + # Default to crypto for slash notation + symbol_type = 'crypto' + currency = right_part + base_currency = left_part + + # Check if it's crypto by matching known crypto symbols + elif any(crypto in clean_symbol for crypto in crypto_symbols): + symbol_type = 'crypto' + # Try to extract the quote currency + if 'USDT' in clean_symbol: + currency = 'USDT' + base_currency = clean_symbol.replace('USDT', '') + elif 'USDC' in clean_symbol: + currency = 'USDC' + base_currency = clean_symbol.replace('USDC', '') + elif 'BUSD' in clean_symbol: + currency = 'BUSD' + base_currency = clean_symbol.replace('BUSD', '') + elif 'USD' in clean_symbol: + currency = 'USD' + base_currency = clean_symbol.replace('USD', '') + else: + # Try to find the crypto part + for crypto in crypto_symbols: + if crypto in clean_symbol: + base_currency = crypto + currency = clean_symbol.replace(crypto, '') or 'USDT' + break + else: + currency = 'USDT' + base_currency = clean_symbol + + if not base_currency or base_currency == currency: + base_currency = clean_symbol[:3] if len(clean_symbol) >= 3 else 'BTC' + + # Check if it looks like a forex pair (6 letters, no special chars) + elif len(clean_symbol) == 6 and clean_symbol.isalpha(): + # Could be forex like EURUSD or crypto like BTCUSD + potential_base = clean_symbol[:3] + potential_quote = clean_symbol[3:6] + + # Common forex currencies + forex_currencies = {'EUR', 'USD', 'GBP', 'JPY', 'CHF', 'CAD', 'AUD', 'NZD'} + + if potential_base in forex_currencies and potential_quote in forex_currencies: + symbol_type = 'forex' + base_currency = potential_base + currency = potential_quote + else: + # Default to other for unknown 6-letter symbols + symbol_type = 'other' + currency = 'USD' + base_currency = None + + else: + # Default to other for everything else + symbol_type = 'other' + currency = 'USD' + base_currency = None + + return symbol_type, currency, base_currency diff --git a/src/pynecore/core/ohlcv_file.py b/src/pynecore/core/ohlcv_file.py index b583c74..89e4a03 100644 --- a/src/pynecore/core/ohlcv_file.py +++ b/src/pynecore/core/ohlcv_file.py @@ -11,16 +11,24 @@ The .ohlcv format cannot have gaps in it. All gaps are filled with the previous close price and -1 volume. """ +from typing import Iterator, cast -from typing import Iterator -import os +import csv +import json +import math import mmap +import os import struct -from pathlib import Path +from collections import Counter +from collections.abc import Buffer +from datetime import datetime, time, timedelta, timezone as dt_timezone, UTC from io import BufferedWriter, BufferedRandom -from datetime import datetime, UTC +from math import gcd as math_gcd +from pathlib import Path +from zoneinfo import ZoneInfo from pynecore.types.ohlcv import OHLCV +from ..core.syminfo import SymInfoInterval RECORD_SIZE = 24 # 6 * 4 STRUCT_FORMAT = 'Ifffff' # I: uint32, f: float32 @@ -28,26 +36,93 @@ __all__ = ['OHLCVWriter', 'OHLCVReader'] -def format_float(value: float) -> str: +def _format_float(value: float) -> str: """Format float with max 8 decimal places, removing trailing zeros""" return f"{value:.8g}" +def _parse_timestamp(ts_str: str, timestamp_format: str | None = None, timezone=None) -> int: + """ + Parse timestamp string to Unix timestamp. + + :param ts_str: Timestamp string to parse + :param timestamp_format: Optional specific datetime format for parsing + :param timezone: Optional timezone to apply to the parsed datetime + :return: Unix timestamp as integer + :raises ValueError: If timestamp cannot be parsed + """ + # Handle numeric timestamps + if ts_str.isdigit(): + timestamp = int(ts_str) + # Handle millisecond timestamps (common in JSON APIs) + if timestamp > 253402300799: # 9999-12-31 23:59:59 + timestamp //= 1000 + return timestamp + + # Parse datetime string + dt = None + if timestamp_format: + dt = datetime.strptime(ts_str, timestamp_format) + else: + # Try common formats + for fmt in [ + '%Y-%m-%d %H:%M:%S%z', # 2024-01-08 19:00:00+0000 + '%Y-%m-%d %H:%M:%S%Z', # 2024-01-08 19:00:00UTC + '%Y-%m-%dT%H:%M:%S%z', # 2024-01-08T19:00:00+0000 + '%Y-%m-%d %H:%M:%S', + '%Y/%m/%d %H:%M:%S', + '%d.%m.%Y %H:%M:%S', + '%Y-%m-%dT%H:%M:%S', + '%Y-%m-%dT%H:%M:%SZ', # ISO with Z + '%Y-%m-%d %H:%M', + '%Y%m%d %H:%M:%S' + ]: + try: + dt = datetime.strptime(ts_str, fmt) + break + except ValueError: + continue + + if dt is None: + raise ValueError(f"Could not parse timestamp: {ts_str}") + + # Apply timezone if specified and convert to timestamp + if timezone and dt is not None: + dt = dt.replace(tzinfo=timezone) + + return int(dt.timestamp()) + + class OHLCVWriter: """ Binary OHLCV data writer using direct file operations """ - __slots__ = ('path', '_file', '_size', '_start_timestamp', '_interval', '_current_pos', '_last_timestamp') + __slots__ = ('path', '_file', '_size', '_start_timestamp', '_interval', '_current_pos', '_last_timestamp', + '_price_changes', '_price_decimals', '_last_close', '_analyzed_tick_size', + '_analyzed_price_scale', '_analyzed_min_move', '_confidence', + '_trading_hours', '_analyzed_opening_hours', '_truncate') - def __init__(self, path: str | Path): + def __init__(self, path: str | Path, truncate: bool = False): self.path: str = str(path) self._file: BufferedWriter | BufferedRandom | None = None + self._truncate: bool = truncate self._size: int = 0 self._start_timestamp: int | None = None self._interval: int | None = None self._current_pos: int = 0 self._last_timestamp: int | None = None + # Tick size analysis + self._price_changes: list[float] = [] + self._price_decimals: set[int] = set() + self._last_close: float | None = None + self._analyzed_tick_size: float | None = None + self._analyzed_price_scale: int | None = None + self._analyzed_min_move: int | None = None + self._confidence: float = 0.0 + # Trading hours analysis + self._trading_hours: dict[tuple[int, int], int] = {} # (weekday, hour) -> count + self._analyzed_opening_hours: list | None = None def __enter__(self): self.open() @@ -109,20 +184,72 @@ def interval(self) -> int | None: """ return self._interval + @property + def analyzed_tick_size(self) -> float | None: + """ + Automatically detected tick size from price data + """ + if self._analyzed_tick_size is None and len(self._price_changes) >= 10: + self._analyze_tick_size() + return self._analyzed_tick_size + + @property + def analyzed_price_scale(self) -> int | None: + """ + Automatically detected price scale from price data + """ + if self._analyzed_price_scale is None and len(self._price_changes) >= 10: + self._analyze_tick_size() + return self._analyzed_price_scale + + @property + def analyzed_min_move(self) -> int | None: + """ + Automatically detected min move (usually 1) + """ + if self._analyzed_min_move is None and len(self._price_changes) >= 10: + self._analyze_tick_size() + return self._analyzed_min_move + + @property + def tick_analysis_confidence(self) -> float: + """ + Confidence of tick size analysis (0.0 to 1.0) + """ + if self._confidence == 0.0 and len(self._price_changes) >= 10: + self._analyze_tick_size() + return self._confidence + + @property + def analyzed_opening_hours(self) -> list | None: + """ + Automatically detected opening hours from trading activity + Returns list of SymInfoInterval tuples or None if not enough data + """ + if self._analyzed_opening_hours is None and self._has_enough_data_for_opening_hours(): + self._analyze_opening_hours() + return self._analyzed_opening_hours + def open(self) -> 'OHLCVWriter': """ Open file for writing """ - # Open in rb+ mode to allow both reading and writing - self._file = open(self.path, 'rb+') if os.path.exists(self.path) else open(self.path, 'wb+') + # If truncate is True, always open in write mode to clear existing data + if self._truncate: + self._file = open(self.path, 'wb+') + else: + # Open in rb+ mode to allow both reading and writing + self._file = open(self.path, 'rb+') if os.path.exists(self.path) else open(self.path, 'wb+') self._size = os.path.getsize(self.path) // RECORD_SIZE # Read initial metadata if file exists if self._size >= 2: self._file.seek(0) - first_timestamp = struct.unpack('I', self._file.read(4))[0] + data: Buffer = self._file.read(4) + first_timestamp = struct.unpack('I', data)[0] self._file.seek(RECORD_SIZE) - second_timestamp = struct.unpack('I', self._file.read(4))[0] + data: Buffer = self._file.read(4) + second_timestamp = struct.unpack('I', data)[0] self._start_timestamp = first_timestamp self._interval = second_timestamp - first_timestamp assert self._interval is not None @@ -132,6 +259,10 @@ def open(self) -> 'OHLCVWriter': self._file.seek(0, os.SEEK_END) self._current_pos = self._size + # Collect trading hours from existing data for analysis + if self._size > 0 and not self._truncate: + self._collect_existing_trading_hours() + return self def write(self, candle: OHLCV) -> None: @@ -154,24 +285,23 @@ def write(self, candle: OHLCV) -> None: if self._interval <= 0: raise ValueError(f"Invalid interval: {self._interval}") elif self._size >= 2: # Changed from elif self._size == 2: to properly handle all cases - # For the second candle, validate interval - if self._size == 2: - assert self._last_timestamp is not None and self._interval is not None - current_interval = candle.timestamp - self._last_timestamp - if current_interval > self._interval * 2: - # Truncate and restart - self.truncate() - self._start_timestamp = candle.timestamp - self._interval = None - self._last_timestamp = None - self._current_pos = 0 - self._size = 0 - # Check chronological order if self._last_timestamp is not None and candle.timestamp <= self._last_timestamp: raise ValueError( f"Timestamps must be in chronological order. Got {candle.timestamp} after {self._last_timestamp}") + # Check if we found a smaller interval (indicates initial interval was wrong due to gap) + if self._interval is not None and self._last_timestamp is not None: + current_interval = candle.timestamp - self._last_timestamp + + # If we find a smaller interval, the initial one was wrong (had a gap) + if 0 < current_interval < self._interval: + # Rebuild file with correct interval + self._rebuild_with_correct_interval(current_interval) + # Now write the current candle with the corrected setup + self.write(candle) + return + # Calculate expected timestamp and fill gaps if self._interval is not None and self._last_timestamp is not None: expected_ts = self._last_timestamp + self._interval @@ -180,14 +310,15 @@ def write(self, candle: OHLCV) -> None: if candle.timestamp > expected_ts: # Get previous candle's close price self._file.seek((self._current_pos - 1) * RECORD_SIZE) - prev_data = struct.unpack(STRUCT_FORMAT, self._file.read(RECORD_SIZE)) + data: Buffer = self._file.read(RECORD_SIZE) + prev_data = struct.unpack(STRUCT_FORMAT, data) prev_close = prev_data[4] # 4th index is close price # Fill gap with previous close and -1 volume (gap indicator) while expected_ts < candle.timestamp: - gap_data = struct.pack(STRUCT_FORMAT, - expected_ts, prev_close, prev_close, - prev_close, prev_close, -1.0) + gap_data: Buffer = struct.pack(STRUCT_FORMAT, + expected_ts, prev_close, prev_close, + prev_close, prev_close, -1.0) self._file.seek(self._current_pos * RECORD_SIZE) self._file.write(gap_data) self._current_pos += 1 @@ -196,12 +327,18 @@ def write(self, candle: OHLCV) -> None: # Write actual data self._file.seek(self._current_pos * RECORD_SIZE) - data = struct.pack(STRUCT_FORMAT, - candle.timestamp, candle.open, candle.high, - candle.low, candle.close, candle.volume) + data: Buffer = struct.pack(STRUCT_FORMAT, + candle.timestamp, candle.open, candle.high, + candle.low, candle.close, candle.volume) self._file.write(data) self._file.flush() + # Collect data for tick size analysis + self._collect_price_data(candle) + + # Collect trading hours data + self._collect_trading_hours(candle) + self._last_timestamp = candle.timestamp self._current_pos += 1 self._size = max(self._size, self._current_pos) @@ -260,6 +397,536 @@ def close(self): self._file.close() self._file = None + def _collect_price_data(self, candle: OHLCV) -> None: + """ + Collect price data for tick size analysis during writing. + """ + # Collect price changes + if self._last_close is not None: + change = abs(candle.close - self._last_close) + if change > 0 and len(self._price_changes) < 1000: # Limit to 1000 samples + self._price_changes.append(change) + + # Collect decimal places + for price in [candle.open, candle.high, candle.low, candle.close]: + if price != int(price): # Has decimal component + price_str = f"{price:.15f}".rstrip('0').rstrip('.') + if '.' in price_str: + decimals = len(price_str.split('.')[1]) + self._price_decimals.add(decimals) + + self._last_close = candle.close + + def _analyze_tick_size(self) -> None: + """ + Analyze collected price data to determine tick size using multiple methods. + """ + if not self._price_changes: + # No data, use defaults + self._analyzed_tick_size = 0.01 + self._analyzed_price_scale = 100 + self._analyzed_min_move = 1 + self._confidence = 0.1 + return + + # Try histogram-based method first for better noise handling + histogram_tick = self._calculate_histogram_tick() + + if histogram_tick[0] > 0 and histogram_tick[1] > 0.7: + # High confidence histogram result, use it directly + self._analyzed_tick_size = histogram_tick[0] + self._analyzed_price_scale = int(round(1.0 / histogram_tick[0])) + self._analyzed_min_move = 1 + self._confidence = histogram_tick[1] + return + + # Fall back to other methods + # Method 1: Most frequent small change + freq_tick = self._calculate_frequency_tick() + + # Method 2: Decimal places analysis + decimal_tick = self._calculate_decimal_tick() + + # Combine methods with weighted confidence (no GCD) + tick_size, confidence = self._combine_tick_estimates(freq_tick, decimal_tick) + + # Calculate price scale and min move + if tick_size > 0: + self._analyzed_tick_size = tick_size + self._analyzed_price_scale = int(round(1.0 / tick_size)) + self._analyzed_min_move = 1 + self._confidence = confidence + else: + # Fallback to defaults + self._analyzed_tick_size = 0.01 + self._analyzed_price_scale = 100 + self._analyzed_min_move = 1 + self._confidence = 0.1 + + def _calculate_frequency_tick(self) -> tuple[float, float]: + """ + Calculate tick size based on most frequent small changes. + Returns (tick_size, confidence) + """ + if len(self._price_changes) < 10: + return 0, 0 + + # Apply float32 filtering first + filtered_changes = [] + for c in self._price_changes[:100]: + if c > 0: + # Convert to float32 and back + float32_val = struct.unpack('f', cast(Buffer, struct.pack('f', c)))[0] + # Round to reasonable precision for float32 + rounded = round(float32_val, 6) + if rounded > 0: + filtered_changes.append(rounded) + + if len(filtered_changes) < 5: + return 0, 0 + + # Find most frequent changes + counter = Counter(filtered_changes) + most_common = counter.most_common(10) + + if not most_common: + return 0, 0 + + # Find GCD of frequent changes to get base tick + frequent_changes = [change for change, count in most_common if count >= 2] + if len(frequent_changes) >= 2: + # Convert to integers for GCD + scale = 1000000 # 6 decimal places + int_changes = [int(round(c * scale)) for c in frequent_changes] + + # Calculate GCD + result = int_changes[0] + for val in int_changes[1:]: + result = math_gcd(result, val) + + tick_size = result / scale + + # Confidence based on how many changes match this tick + matches = sum(1 for c in filtered_changes + if abs(round(c / tick_size) * tick_size - c) < tick_size * 0.1) + confidence = min(matches / len(filtered_changes), 1.0) + return tick_size, confidence * 0.7 # Medium weight + + return 0, 0 + + def _calculate_histogram_tick(self) -> tuple[float, float]: + """ + Calculate tick size using histogram-based clustering approach. + This method is robust to float32 noise. + Returns (tick_size, confidence) + """ + if len(self._price_changes) < 10: + return 0, 0 + + # Common tick sizes to test (from 1 to 0.00001) + candidate_ticks = [ + 1.0, 0.5, 0.25, 0.1, 0.05, 0.01, 0.005, 0.001, + 0.0005, 0.0001, 0.00005, 0.00001, 0.000001 + ] + + best_tick = 0 + best_score = 0 + + # Filter out zero changes and convert to float32 precision + changes = [] + for change in self._price_changes[:200]: # Use more samples for histogram + if change > 0: + # Round to float32 precision + float32_val = struct.unpack('f', cast(Buffer, struct.pack('f', change)))[0] + changes.append(float32_val) + + if len(changes) < 5: + return 0, 0 + + # Get min non-zero change to establish scale + min_change = min(changes) + avg_change = sum(changes) / len(changes) + + for tick in candidate_ticks: + # Skip ticks that are too small (less than 1/10 of smallest change) + if tick < min_change * 0.1: + continue + + # Skip ticks that are way too large + if tick > avg_change * 10: + continue + + # Round all changes to this tick size + rounded = [round(c / tick) * tick for c in changes] + + # Calculate how well the rounding fits + errors = [abs(c - r) for c, r in zip(changes, rounded)] + max_error = max(errors) + + # Key insight: if max error is less than tick/2, this tick captures the grid well + if max_error < tick * 0.5: + # Count how many changes are multiples of this tick (within tolerance) + tolerance = tick * 0.1 + multiples = sum(1 for c in changes if abs(round(c / tick) * tick - c) < tolerance) + multiple_ratio = multiples / len(changes) + + # Score based on how many values are clean multiples + if multiple_ratio > 0.7: # Most values are clean multiples + score = multiple_ratio + + # Prefer larger ticks (less precision) when scores are similar + # This helps choose 0.00001 over 0.000001 when both fit + score *= (1.0 + tick * 100) # Small bonus for larger ticks + + if score > best_score: + best_score = score + best_tick = tick + + # If no good tick found with strict criteria, fall back to simple analysis + if best_tick == 0: + # Find the most common order of magnitude in changes + magnitudes = [] + for c in changes: + if c > 0: + # Find order of magnitude + mag = 10 ** math.floor(math.log10(c)) + magnitudes.append(mag) + + if magnitudes: + # Most common magnitude + counter = Counter(magnitudes) + common_mag = counter.most_common(1)[0][0] + # Use tick as 1/10 of common magnitude + best_tick = common_mag / 10 + best_score = 0.5 + + # Calculate confidence based on score + if best_score > 0.8: + confidence = 0.9 + elif best_score > 0.6: + confidence = 0.7 + else: + confidence = best_score + + return best_tick, confidence + + def _calculate_decimal_tick(self) -> tuple[float, float]: + """ + Calculate tick size based on decimal places. + Returns (tick_size, confidence) + """ + if not self._price_decimals: + # No decimals found, probably integer prices + return 1.0, 0.5 + + # Filter out noise from float representation + # If we have 15 decimals, it's likely float noise + valid_decimals = [d for d in self._price_decimals if d <= 10] + + if not valid_decimals: + # All decimals are noise, assume 2 decimal places (cents) + return 0.01, 0.3 + + # Use most common valid decimal places + max_decimals = max(valid_decimals) + tick_size = 10 ** (-max_decimals) + + # Lower confidence for decimal-only method + return tick_size, 0.5 + + @staticmethod + def _combine_tick_estimates(freq: tuple[float, float], + decimal: tuple[float, float]) -> tuple[float, float]: + """ + Combine tick size estimates from frequency and decimal methods only. + Returns (tick_size, confidence) + """ + estimates = [] + + if freq[0] > 0 and freq[1] > 0: + estimates.append(freq) + if decimal[0] > 0 and decimal[1] > 0: + estimates.append(decimal) + + if not estimates: + return 0.01, 0.1 # Default fallback + + # Use highest confidence estimate + best = max(estimates, key=lambda x: x[1]) + return best + + def _collect_trading_hours(self, candle: OHLCV) -> None: + """ + Collect trading hours data from timestamps. + Only collect for candles with actual volume (not gaps). + """ + if candle.volume <= 0: + return # Skip gaps + + # Convert timestamp to datetime + dt = datetime.fromtimestamp(candle.timestamp, tz=None) # Local time + + # Get weekday (1=Monday, 7=Sunday) and hour + weekday = dt.isoweekday() + hour = dt.hour + + # Count occurrences + key = (weekday, hour) + self._trading_hours[key] = self._trading_hours.get(key, 0) + 1 + + def _collect_existing_trading_hours(self) -> None: + """ + Collect trading hours data from existing file for opening hours analysis. + Only samples a subset of data for performance reasons. + """ + if not self._file or self._size == 0: + return + + # Save current position + current_pos = self._file.tell() + + try: + # Sample data: read every Nth record for performance + # For large files, we don't need to read everything + sample_interval = max(1, self._size // 1000) # Sample up to 1000 points + + for i in range(0, self._size, sample_interval): + self._file.seek(i * RECORD_SIZE) + data = self._file.read(RECORD_SIZE) + + if len(data) == RECORD_SIZE: + # Unpack the record + timestamp, open_val, high, low, close, volume = \ + struct.unpack('Ifffff', cast(Buffer, data)) + + # Only collect if volume > 0 (real trading) + if volume > 0: + dt = datetime.fromtimestamp(timestamp, tz=None) + weekday = dt.isoweekday() + hour = dt.hour + key = (weekday, hour) + self._trading_hours[key] = self._trading_hours.get(key, 0) + 1 + + finally: + # Restore file position + self._file.seek(current_pos) + + def _has_enough_data_for_opening_hours(self) -> bool: + """ + Check if we have enough data to analyze opening hours based on timeframe. + """ + if not self._trading_hours or not self._interval: + return False + + # For daily or larger timeframes + if self._interval >= 86400: # >= 1 day + # We need at least a few days to see a pattern + unique_days = len(set(day for day, hour in self._trading_hours.keys())) + return unique_days >= 3 # At least 3 different days + + # For intraday timeframes + # Check if we have at least some meaningful data + # We need enough to see a pattern + data_points = sum(self._trading_hours.values()) + points_per_hour = 3600 / self._interval + hours_covered = data_points / points_per_hour + + # Need at least 2 hours of data to detect any pattern + # This allows even short sessions to be analyzed + return hours_covered >= 2 + + def _analyze_opening_hours(self) -> None: + """ + Analyze collected trading hours to determine opening hours pattern. + Works for both intraday and daily timeframes. + """ + if not self._trading_hours: + self._analyzed_opening_hours = None + return + + # For daily or larger timeframes, analyze which days have trading + if self._interval and self._interval >= 86400: # >= 1 day + self._analyzed_opening_hours = [] + days_with_trading = set(day for day, hour in self._trading_hours.keys()) + + # Check if it's 24/7 (all 7 days have trading) + if len(days_with_trading) == 7: + # 24/7 trading pattern + for day in range(1, 8): + self._analyzed_opening_hours.append(SymInfoInterval( + day=day, + start=time(0, 0, 0), + end=time(23, 59, 59) + )) + elif days_with_trading <= {1, 2, 3, 4, 5}: # Monday-Friday only + # Business days pattern (stock/forex) + for day in range(1, 6): + self._analyzed_opening_hours.append(SymInfoInterval( + day=day, + start=time(9, 30, 0), # Default to US market hours + end=time(16, 0, 0) + )) + else: + # Mixed pattern - include all days that have trading + for day in sorted(days_with_trading): + self._analyzed_opening_hours.append(SymInfoInterval( + day=day, + start=time(0, 0, 0), # Default to full day for daily data + end=time(23, 59, 59) + )) + return + + # For intraday data, analyze hourly patterns + # Check if it's 24/7 trading (crypto pattern) + total_hours = len(self._trading_hours) + if total_hours >= 168 * 0.7: # 70% of all hours in a week (lowered threshold) + # Check if all hours have similar activity + counts = list(self._trading_hours.values()) + avg_count = sum(counts) / len(counts) + variance = sum((c - avg_count) ** 2 for c in counts) / len(counts) + + # If low variance, it's likely 24/7 + if variance < avg_count * 0.5: + self._analyzed_opening_hours = [] + for day in range(1, 8): + self._analyzed_opening_hours.append(SymInfoInterval( + day=day, + start=time(0, 0, 0), + end=time(23, 59, 59) + )) + return + + # Analyze per-day patterns for intraday + self._analyzed_opening_hours = [] + + for day in range(1, 8): # Monday to Sunday + # Get all hours for this day + day_hours = [(hour, count) for (d, hour), count in self._trading_hours.items() if d == day] + + if not day_hours: + continue # No trading on this day + + # Sort by hour + day_hours.sort(key=lambda x: x[0]) + + # Find continuous trading periods + periods = [] + current_start = None + current_end = None + + # Threshold: consider an hour active if it has at least 20% of average activity + total_count = sum(count for _, count in day_hours) + if total_count == 0: + continue + avg_hour_count = total_count / len(day_hours) + threshold = avg_hour_count * 0.2 + + for hour, count in day_hours: + if count >= threshold: + if current_start is None: + current_start = hour + current_end = hour + else: + current_end = hour + else: + if current_start is not None: + periods.append((current_start, current_end)) + current_start = None + current_end = None + + # Add last period if exists + if current_start is not None: + periods.append((current_start, current_end)) + + # Convert periods to SymInfoInterval + for start_hour, end_hour in periods: + self._analyzed_opening_hours.append(SymInfoInterval( + day=day, + start=time(start_hour, 0, 0), + end=time(end_hour, 59, 59) + )) + + # If no opening hours detected, default to business hours + if not self._analyzed_opening_hours: + for day in range(1, 6): # Monday to Friday + self._analyzed_opening_hours.append(SymInfoInterval( + day=day, + start=time(9, 30, 0), + end=time(16, 0, 0) + )) + + def _rebuild_with_correct_interval(self, new_interval: int) -> None: + """ + Rebuild the entire file with the correct interval when a smaller interval is detected. + This happens when initial interval was wrong due to gaps. + + :param new_interval: The correct interval to use + """ + import tempfile + import shutil + + if not self._file or self._size == 0: + return + + # Save current file position and data + current_records = [] + + # Read all existing records + self._file.seek(0) + for i in range(self._size): + offset = i * RECORD_SIZE + self._file.seek(offset) + data = self._file.read(RECORD_SIZE) + if len(cast(bytes, data)) == RECORD_SIZE: + record = struct.unpack(STRUCT_FORMAT, cast(Buffer, data)) + current_records.append(OHLCV(*record, extra_fields={})) + + # Create temp file for rebuilding + temp_fd, temp_path = tempfile.mkstemp(suffix='.ohlcv.tmp', dir=os.path.dirname(self.path)) + try: + # Close temp file descriptor as we'll open it differently + os.close(temp_fd) + + # Create new writer with temp file + with OHLCVWriter(temp_path) as temp_writer: + # Write all records with correct interval + # The writer will now properly handle gaps + for record in current_records: + temp_writer.write(record) + + # Close current file + self._file.close() + + # Replace original with rebuilt file + shutil.move(temp_path, self.path) + + # Reopen the file + self._file = open(self.path, 'rb+') + self._size = os.path.getsize(self.path) // RECORD_SIZE + + # Reset interval to the correct one + self._interval = new_interval + + # Position at end for appending + self._file.seek(0, os.SEEK_END) + self._current_pos = self._size + + # Update last timestamp + if self._size > 0: + self._file.seek((self._size - 1) * RECORD_SIZE) + data: Buffer = self._file.read(4) + self._last_timestamp = struct.unpack('I', data)[0] + self._file.seek(0, os.SEEK_END) + + except Exception as e: + # Clean up temp file on error + if os.path.exists(temp_path): + try: + os.unlink(temp_path) + except OSError: + pass + raise IOError(f"Failed to rebuild file with correct interval: {e}") + def load_from_csv(self, path: str | Path, timestamp_format: str | None = None, timestamp_column: str | None = None, @@ -276,9 +943,6 @@ def load_from_csv(self, path: str | Path, :param time_column: When timestamp is split into date+time columns, time column name :param tz: Timezone name (e.g. 'UTC', 'Europe/London', '+0100') for timestamp conversion """ - import csv - from zoneinfo import ZoneInfo - # Parse timezone timezone = None if tz: @@ -287,7 +951,6 @@ def load_from_csv(self, path: str | Path, sign = 1 if tz.startswith('+') else -1 hours = int(tz[1:3]) minutes = int(tz[3:]) if len(tz) > 3 else 0 - from datetime import timezone as dt_timezone, timedelta timezone = dt_timezone(sign * timedelta(hours=hours, minutes=minutes)) else: # Handle named timezone (e.g. UTC, Europe/London) @@ -348,40 +1011,11 @@ def load_from_csv(self, path: str | Path, # Combine date and time ts_str = f"{row[date_idx]} {row[time_idx]}" else: - ts_str = row[timestamp_idx] # type: ignore + ts_str = row[timestamp_idx] # Convert timestamp try: - if ts_str.isdigit(): - timestamp = int(ts_str) - else: - if timestamp_format: - dt = datetime.strptime(ts_str, timestamp_format) - else: - # Try common formats - for fmt in [ - '%Y-%m-%d %H:%M:%S%z', # 2024-01-08 19:00:00+0000 - '%Y-%m-%d %H:%M:%S%Z', # 2024-01-08 19:00:00UTC - '%Y-%m-%dT%H:%M:%S%z', # 2024-01-08T19:00:00+0000 - '%Y-%m-%d %H:%M:%S', - '%Y/%m/%d %H:%M:%S', - '%d.%m.%Y %H:%M:%S', - '%Y-%m-%dT%H:%M:%S', - '%Y-%m-%d %H:%M', - '%Y%m%d %H:%M:%S' - ]: - try: - dt = datetime.strptime(ts_str, fmt) - break - except ValueError: - continue - else: - raise ValueError(f"Could not parse timestamp: {ts_str}") - - # Set timezone if specified and convert to timestamp - if timezone: - dt = dt.replace(tzinfo=timezone) - timestamp = int(dt.timestamp()) + timestamp = _parse_timestamp(ts_str, timestamp_format, timezone) except Exception as e: raise ValueError(f"Failed to parse timestamp '{ts_str}': {e}") @@ -398,6 +1032,235 @@ def load_from_csv(self, path: str | Path, except (ValueError, IndexError) as e: raise ValueError(f"Invalid data in row: {e}") + def load_from_txt(self, path: str | Path, + timestamp_format: str | None = None, + timestamp_column: str | None = None, + date_column: str | None = None, + time_column: str | None = None, + tz: str | None = None) -> None: + """ + Load OHLCV data from TXT file using only builtin modules. + + :param path: Path to TXT file + :param timestamp_format: Optional datetime fmt for parsing + :param timestamp_column: Column name for timestamp (default tries: timestamp, time, date) + :param date_column: When timestamp is split into date+time columns, date column name + :param time_column: When timestamp is split into date+time columns, time column name + :param tz: Timezone name (e.g. 'UTC', 'Europe/London', '+0100') for timestamp conversion + """ + # Parse timezone + timezone = None + if tz: + if tz.startswith(('+', '-')): + # Handle UTC offset fmt (e.g. +0100, -0500) + sign = 1 if tz.startswith('+') else -1 + hours = int(tz[1:3]) + minutes = int(tz[3:]) if len(tz) > 3 else 0 + timezone = dt_timezone(sign * timedelta(hours=hours, minutes=minutes)) + else: + # Handle named timezone (e.g. UTC, Europe/London) + try: + timezone = ZoneInfo(tz) + except Exception as e: + raise ValueError(f"Invalid timezone {tz}: {e}") + + # Auto-detect delimiter + with open(path, 'r') as f: + first_line = f.readline().strip() + if not first_line: + raise ValueError("File is empty or first line is blank") + + # Check for common delimiters in order of preference + delimiters = ['\t', ';', '|'] + delimiter_counts = {} + + for delim in delimiters: + count = first_line.count(delim) + if count > 0: + delimiter_counts[delim] = count + + if not delimiter_counts: + raise ValueError("No supported delimiter found (tab, semicolon, or pipe)") + + # Use delimiter with highest count + delimiter = max(delimiter_counts, key=lambda x: delimiter_counts[x]) + + # Read TXT file with manual parsing for better control + with open(path, 'r') as f: + lines = f.readlines() + + if not lines: + raise ValueError("File is empty") + + # Parse header line + header_line = lines[0].strip() + if not header_line: + raise ValueError("Header row is empty") + + headers = self._parse_txt_line(header_line, delimiter) + headers = [h.lower().strip() for h in headers] # Case insensitive + + if not headers: + raise ValueError("No headers found") + + # Find timestamp column + timestamp_idx = None + date_idx = None + time_idx = None + + if date_column and time_column: + try: + date_idx = headers.index(date_column.lower()) + time_idx = headers.index(time_column.lower()) + except ValueError: + raise ValueError(f"Date/time columns not found: {date_column}/{time_column}") + else: + timestamp_col = timestamp_column.lower() if timestamp_column else None + if timestamp_col: + try: + timestamp_idx = headers.index(timestamp_col) + except ValueError: + raise ValueError(f"Timestamp column not found: {timestamp_col}") + else: + # Try common names + for col in ['timestamp', 'time', 'date']: + try: + timestamp_idx = headers.index(col) + break + except ValueError: + continue + + if timestamp_idx is None: + raise ValueError("Timestamp column not found!") + + # Find OHLCV columns + try: + o_idx = headers.index('open') + h_idx = headers.index('high') + l_idx = headers.index('low') + c_idx = headers.index('close') + v_idx = headers.index('volume') + except ValueError as e: + raise ValueError(f"Missing required column: {str(e)}") + + # Process data rows + for line in lines[1:]: # Skip header + line = line.strip() + if not line: # Skip empty lines + continue + + row = self._parse_txt_line(line, delimiter) + + if len(row) != len(headers): + raise ValueError(f"Row has {len(row)} columns, expected {len(headers)}") + + # Strip whitespace from all fields + row = [field.strip() for field in row] + + # Handle timestamp + if date_idx is not None and time_idx is not None: + # Combine date and time + ts_str = f"{row[date_idx]} {row[time_idx]}" + else: + ts_str = str(row[timestamp_idx]) if timestamp_idx is not None and timestamp_idx < len(row) else "" + try: + # Convert timestamp + timestamp = _parse_timestamp(ts_str, timestamp_format, timezone) + except Exception as e: + raise ValueError(f"Failed to parse timestamp '{ts_str}': {e}") + + # Write OHLCV data + try: + self.write(OHLCV( + timestamp, + float(row[o_idx]), + float(row[h_idx]), + float(row[l_idx]), + float(row[c_idx]), + float(row[v_idx]) + )) + except (ValueError, IndexError) as e: + raise ValueError(f"Invalid data in row: {e}") + + @staticmethod + def _parse_txt_line(line: str, delimiter: str) -> list[str]: + """ + Parse a single TXT line with proper handling of quoted fields and escape characters. + + :param line: Line to parse + :param delimiter: Delimiter character + :return: List of parsed fields + :raises ValueError: If line format is invalid + """ + if not line: + return [] + + fields = [] + current_field = "" + in_quotes = False + quote_char = None + i = 0 + + while i < len(line): + char = line[i] + + # Handle escape characters + if char == '\\' and i + 1 < len(line): + next_char = line[i + 1] + if next_char in ['"', "'", '\\', 'n', 't', 'r']: + if next_char == 'n': + current_field += '\n' + elif next_char == 't': + current_field += '\t' + elif next_char == 'r': + current_field += '\r' + else: + current_field += next_char + i += 2 + continue + else: + current_field += char + i += 1 + continue + + # Handle quotes + if char in ['"', "'"] and not in_quotes: + in_quotes = True + quote_char = char + i += 1 + continue + elif char == quote_char and in_quotes: + # Check for escaped quote (double quote) + if i + 1 < len(line) and line[i + 1] == quote_char: + current_field += char + i += 2 + continue + else: + in_quotes = False + quote_char = None + i += 1 + continue + + # Handle delimiter + if char == delimiter and not in_quotes: + fields.append(current_field) + current_field = "" + i += 1 + continue + + # Regular character + current_field += char + i += 1 + + # Add the last field + fields.append(current_field) + + # Validate that quotes are properly closed + if in_quotes: + raise ValueError(f"Unclosed quote in line: {line[:50]}...") + + return fields + def load_from_json(self, path: str | Path, timestamp_format: str | None = None, timestamp_field: str | None = None, @@ -416,10 +1279,6 @@ def load_from_json(self, path: str | Path, :param tz: Timezone name (e.g. 'UTC', 'Europe/London', '+0100') :param mapping: Optional field mapping, e.g. {'timestamp': 't', 'volume': 'vol'} """ - import json - from datetime import datetime - from zoneinfo import ZoneInfo - # Parse timezone timezone = None if tz: @@ -428,7 +1287,6 @@ def load_from_json(self, path: str | Path, sign = 1 if tz.startswith('+') else -1 hours = int(tz[1:3]) minutes = int(tz[3:]) if len(tz) > 3 else 0 - from datetime import timezone as dt_timezone, timedelta timezone = dt_timezone(sign * timedelta(hours=hours, minutes=minutes)) else: # Handle named timezone @@ -490,42 +1348,7 @@ def load_from_json(self, path: str | Path, ts_str = str(record[field_map['timestamp']]) # Convert timestamp - if ts_str.isdigit(): - # Handle millisecond timestamps - ts = int(ts_str) - if ts > 253402300799: # 9999-12-31 23:59:59 - ts //= 1000 - timestamp = ts - else: - dt = None - # Parse datetime string - if timestamp_format: - dt = datetime.strptime(ts_str, timestamp_format) - else: - # Try common formats - for fmt in [ - '%Y-%m-%d %H:%M:%S%z', # 2024-01-08 19:00:00+0000 - '%Y-%m-%d %H:%M:%S%Z', # 2024-01-08 19:00:00UTC - '%Y-%m-%dT%H:%M:%S%z', # 2024-01-08T19:00:00+0000 - '%Y-%m-%d %H:%M:%S', - '%Y/%m/%d %H:%M:%S', - '%Y-%m-%dT%H:%M:%S', - '%Y-%m-%dT%H:%M:%SZ', - '%Y-%m-%d %H:%M', - '%Y%m%d %H:%M:%S' - ]: - try: - dt = datetime.strptime(ts_str, fmt) - break - except ValueError: - continue - else: - raise ValueError(f"Could not parse timestamp: {ts_str}") - - # Set timezone and convert to timestamp - if timezone: - dt = dt.replace(tzinfo=timezone) - timestamp = int(dt.timestamp()) + timestamp = _parse_timestamp(ts_str, timestamp_format, timezone) # Get OHLCV values try: @@ -640,8 +1463,8 @@ def open(self) -> 'OHLCVReader': self._size = os.path.getsize(self.path) // RECORD_SIZE if self._size >= 2: - self._start_timestamp = struct.unpack('I', self._mmap[0:4])[0] - second_timestamp = struct.unpack('I', self._mmap[RECORD_SIZE:RECORD_SIZE + 4])[0] + self._start_timestamp = struct.unpack('I', cast(Buffer, self._mmap[0:4]))[0] + second_timestamp = struct.unpack('I', cast(Buffer, self._mmap[RECORD_SIZE:RECORD_SIZE + 4]))[0] self._interval = second_timestamp - self._start_timestamp return self @@ -761,13 +1584,17 @@ def save_to_csv(self, path: str, as_datetime=False) -> None: else: f.write('timestamp,open,high,low,close,volume\n') for candle in self: + # Skip gaps (volume == -1) + if candle.volume == -1: + continue if as_datetime: - f.write(f"{datetime.fromtimestamp(candle.timestamp, UTC)},{format_float(candle.open)}," - f"{format_float(candle.high)},{format_float(candle.low)},{format_float(candle.close)}," - f"{format_float(candle.volume)}\n") + f.write(f"{datetime.fromtimestamp(candle.timestamp, UTC)},{_format_float(candle.open)}," + f"{_format_float(candle.high)},{_format_float(candle.low)},{_format_float(candle.close)}," + f"{_format_float(candle.volume)}\n") else: - f.write(f"{candle.timestamp},{format_float(candle.open)},{format_float(candle.high)}," - f"{format_float(candle.low)},{format_float(candle.close)},{format_float(candle.volume)}\n") + f.write(f"{candle.timestamp},{_format_float(candle.open)},{_format_float(candle.high)}," + f"{_format_float(candle.low)},{_format_float(candle.close)}," + f"{_format_float(candle.volume)}\n") def save_to_json(self, path: str, as_datetime: bool = False) -> None: """ @@ -789,27 +1616,28 @@ def save_to_json(self, path: str, as_datetime: bool = False) -> None: :param path: Path to save the JSON file :param as_datetime: If True, convert timestamps to ISO fmt datetime strings """ - import json - data = [] for candle in self: + # Skip gaps (volume == -1) + if candle.volume == -1: + continue if as_datetime: item = { "time": datetime.fromtimestamp(candle.timestamp, UTC).isoformat(), - "open": format_float(candle.open), - "high": format_float(candle.high), - "low": format_float(candle.low), - "close": format_float(candle.close), - "volume": format_float(candle.volume) + "open": _format_float(candle.open), + "high": _format_float(candle.high), + "low": _format_float(candle.low), + "close": _format_float(candle.close), + "volume": _format_float(candle.volume) } else: item = { "timestamp": candle.timestamp, - "open": format_float(candle.open), - "high": format_float(candle.high), - "low": format_float(candle.low), - "close": format_float(candle.close), - "volume": format_float(candle.volume) + "open": _format_float(candle.open), + "high": _format_float(candle.high), + "low": _format_float(candle.low), + "close": _format_float(candle.close), + "volume": _format_float(candle.volume) } data.append(item) diff --git a/src/pynecore/lib/timeframe.py b/src/pynecore/lib/timeframe.py index 17df550..99ce038 100644 --- a/src/pynecore/lib/timeframe.py +++ b/src/pynecore/lib/timeframe.py @@ -423,8 +423,8 @@ def from_seconds(seconds: int) -> str: return f"{seconds // (60 * 60 * 24 * 7)}W" if seconds % (60 * 60 * 24) == 0: return f"{seconds // (60 * 60 * 24)}D" - if seconds % (60 * 60) == 0: - return f"{seconds // (60 * 60)}" + if seconds % 60 == 0: + return f"{seconds // 60}" return f"{seconds}S" diff --git a/tests/t00_pynecore/data/test_001_ohlcv_file.py b/tests/t00_pynecore/data/test_001_ohlcv_file.py index e68e696..323a389 100644 --- a/tests/t00_pynecore/data/test_001_ohlcv_file.py +++ b/tests/t00_pynecore/data/test_001_ohlcv_file.py @@ -129,8 +129,8 @@ def __test_ohlcv_seek_operations__(tmp_path): with OHLCVWriter(file_path) as writer: for i in range(10): timestamp = 1609459200 + (i * 60) # 1-minute interval - writer.write(OHLCV(timestamp=timestamp, open=100.0+i, high=110.0 + - i, low=90.0+i, close=105.0+i, volume=1000.0+i)) + writer.write(OHLCV(timestamp=timestamp, open=100.0 + i, high=110.0 + i, low=90.0 + i, close=105.0 + i, + volume=1000.0 + i)) # Test seeking to a specific position and direct write # Note: we use low-level file operations to bypass timestamp checks @@ -138,17 +138,20 @@ def __test_ohlcv_seek_operations__(tmp_path): writer.seek(5) # Seek to 6th record # Use direct byte writing to avoid chronological checks + # noinspection PyProtectedMember assert writer._file is not None data = struct.pack( 'Ifffff', 1609459500, # Timestamp - same as the original at position 5 - 200.0, # Open - 210.0, # High - 190.0, # Low - 205.0, # Close - 2000.0 # Volume + 200.0, # Open + 210.0, # High + 190.0, # Low + 205.0, # Close + 2000.0 # Volume ) + # noinspection PyProtectedMember writer._file.write(data) + # noinspection PyProtectedMember writer._file.flush() # Verify seek operation @@ -168,8 +171,8 @@ def __test_ohlcv_truncate__(tmp_path): with OHLCVWriter(file_path) as writer: for i in range(10): timestamp = 1609459200 + (i * 60) - writer.write(OHLCV(timestamp=timestamp, open=100.0+i, high=110.0 + - i, low=90.0+i, close=105.0+i, volume=1000.0+i)) + writer.write(OHLCV(timestamp=timestamp, open=100.0 + i, high=110.0 + i, low=90.0 + i, close=105.0 + i, + volume=1000.0 + i)) # Truncate the file with OHLCVWriter(file_path) as writer: @@ -261,8 +264,8 @@ def __test_ohlcv_reader_from_to__(tmp_path): with OHLCVWriter(file_path) as writer: for i in range(10): timestamp = 1609459200 + (i * 60) - writer.write(OHLCV(timestamp=timestamp, open=100.0+i, high=110.0 + - i, low=90.0+i, close=105.0+i, volume=1000.0+i)) + writer.write(OHLCV(timestamp=timestamp, open=100.0 + i, high=110.0 + i, low=90.0 + i, close=105.0 + i, + volume=1000.0 + i)) # Read specific range with OHLCVReader(file_path) as reader: @@ -275,6 +278,23 @@ def __test_ohlcv_reader_from_to__(tmp_path): assert candles[-1].timestamp == 1609459500 +def __test_ohlcv_reader_rejects_text_disguised_as_ohlcv__(tmp_path): + """OHLCVReader should raise a clear error when opening a text file renamed to .ohlcv.""" + file_path = tmp_path / "fake_text.ohlcv" + + # Arrange: create a plain-text CSV-like file but with .ohlcv extension + with open(file_path, "w", encoding="utf-8") as f: + f.write("timestamp,open,high,low,close,volume\n") + f.write("1609459200,100,110,90,105,1000\n") + + # Act & Assert: opening via OHLCVReader should fail with a helpful message + with pytest.raises(ValueError) as excinfo: + with OHLCVReader(file_path) as _: + pass # Should not reach here + + assert "Text file detected with .ohlcv extension" in str(excinfo.value) + + def __test_chronological_order_validation__(tmp_path): """Test validation of chronological order in timestamps""" file_path = tmp_path / "test_chronological.ohlcv" @@ -396,3 +416,859 @@ def __test_ohlcv_gap_filling_and_skipping__(tmp_path): # Verify the records assert candles[0].timestamp == 1609459260 assert candles[1].timestamp == 1609459380 + + +def __test_opening_hours_detection_intraday__(tmp_path): + """Test opening hours detection for intraday timeframes""" + from datetime import datetime + file_path = tmp_path / "test_opening_hours_intraday.ohlcv" + + with OHLCVWriter(file_path) as writer: + # Simulate stock market data: Monday-Friday 9:30-16:00 + # Start from a Monday 9:30 AM EST (2024-01-08 09:30:00) + base_timestamp = int(datetime(2024, 1, 8, 9, 30).timestamp()) + + # Write data for multiple days with 1-minute intervals + for day in range(5): # Monday to Friday + day_offset = day * 86400 # Seconds in a day + + # Trading hours: 9:30 AM to 4:00 PM (6.5 hours = 390 minutes) + for minute in range(390): + timestamp = base_timestamp + day_offset + (minute * 60) + price = 100.0 + (minute * 0.01) # Gradual price increase + writer.write(OHLCV( + timestamp=timestamp, + open=price, + high=price + 0.5, + low=price - 0.5, + close=price + 0.1, + volume=1000.0 + minute + )) + + # Check opening hours detection + with OHLCVWriter(file_path) as writer: + opening_hours = writer.analyzed_opening_hours + + # Should detect business hours pattern + assert opening_hours is not None, "Opening hours should be detected" + assert len(opening_hours) > 0, "Should have detected some opening hours" + + # Check that we have Monday-Friday entries + days_with_hours = {interval.day for interval in opening_hours} + assert 1 in days_with_hours # Monday + assert 5 in days_with_hours # Friday + assert 6 not in days_with_hours # Saturday should not be present + assert 7 not in days_with_hours # Sunday should not be present + + +def __test_opening_hours_detection_crypto__(tmp_path): + """Test opening hours detection for crypto (24/7) markets""" + from datetime import datetime + file_path = tmp_path / "test_opening_hours_crypto.ohlcv" + + with OHLCVWriter(file_path) as writer: + # Simulate crypto data: 24/7 trading + # Start from a Monday 00:00 UTC (2024-01-08 00:00:00) + base_timestamp = int(datetime(2024, 1, 8, 0, 0).timestamp()) + + # Write data for a full week with 5-minute intervals + for hour in range(168): # 7 days * 24 hours + for five_min in range(12): # 12 five-minute intervals per hour + timestamp = base_timestamp + (hour * 3600) + (five_min * 300) + price = 50000.0 + (hour * 10.0) # BTC-like prices + writer.write(OHLCV( + timestamp=timestamp, + open=price, + high=price + 50, + low=price - 50, + close=price + 10, + volume=100.0 + five_min + )) + + # Check opening hours detection + with OHLCVWriter(file_path) as writer: + opening_hours = writer.analyzed_opening_hours + + # Should detect 24/7 pattern + assert opening_hours is not None, "Opening hours should be detected" + assert len(opening_hours) == 7, "Should have all 7 days for 24/7 trading" + + # Check that all days are 00:00-23:59 + for interval in opening_hours: + assert interval.start.hour == 0 and interval.start.minute == 0 + assert interval.end.hour == 23 and interval.end.minute == 59 + + +def __test_opening_hours_detection_daily__(tmp_path): + """Test opening hours detection for daily timeframes""" + from datetime import datetime + file_path = tmp_path / "test_opening_hours_daily.ohlcv" + + with OHLCVWriter(file_path) as writer: + # Simulate daily stock data: Monday-Friday only + # Start from a Monday (2024-01-08) + base_timestamp = int(datetime(2024, 1, 8, 16, 0).timestamp()) # Daily close at 4 PM + + # Write data for 3 weeks (15 business days) + for week in range(3): + for day in range(5): # Monday to Friday only + timestamp = base_timestamp + (week * 7 * 86400) + (day * 86400) + price = 150.0 + (week * 5) + day + writer.write(OHLCV( + timestamp=timestamp, + open=price, + high=price + 2, + low=price - 2, + close=price + 1, + volume=1000000.0 + )) + + # Check opening hours detection + with OHLCVWriter(file_path) as writer: + opening_hours = writer.analyzed_opening_hours + + # Should detect weekday-only pattern from daily data + assert opening_hours is not None, "Opening hours should be detected" + assert len(opening_hours) == 5, "Should have Monday-Friday for daily stock data" + + # Check that we only have weekdays (1-5) + days = {interval.day for interval in opening_hours} + assert days == {1, 2, 3, 4, 5}, "Should only have Monday-Friday" + + +def __test_opening_hours_insufficient_data__(tmp_path): + """Test opening hours detection with insufficient data""" + file_path = tmp_path / "test_opening_hours_insufficient.ohlcv" + + with OHLCVWriter(file_path) as writer: + # Write only a few data points (less than required minimum) + base_timestamp = 1609459200 + for i in range(5): # Only 5 minutes of data + writer.write(OHLCV( + timestamp=base_timestamp + (i * 60), + open=100.0, + high=101.0, + low=99.0, + close=100.5, + volume=1000.0 + )) + + # Check opening hours detection + with OHLCVWriter(file_path) as writer: + opening_hours = writer.analyzed_opening_hours + + # Should return None for insufficient data + assert opening_hours is None, "Should return None for insufficient data" + + +def __test_ohlcv_txt_conversion_tab_delimited__(tmp_path): + """Test TXT conversion with tab-delimited format""" + ohlcv_path = tmp_path / "test_txt_tab.ohlcv" + txt_path = tmp_path / "test_input_tab.txt" + + # Create tab-delimited test file + with open(txt_path, 'w') as f: + f.write("timestamp\topen\thigh\tlow\tclose\tvolume\n") + f.write("1609459200\t100.0\t110.0\t90.0\t105.0\t1000.0\n") + f.write("1609459260\t105.0\t115.0\t95.0\t110.0\t1200.0\n") + f.write("1609459320\t110.0\t120.0\t100.0\t115.0\t1400.0\n") + + # Convert TXT to OHLCV + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_txt(txt_path) + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 3 + assert candles[0].timestamp == 1609459200 + assert candles[0].close == 105.0 + assert candles[0].volume == 1000.0 + assert candles[2].timestamp == 1609459320 + assert candles[2].close == 115.0 + + +def __test_ohlcv_txt_conversion_semicolon_delimited__(tmp_path): + """Test TXT conversion with semicolon-delimited format""" + ohlcv_path = tmp_path / "test_txt_semicolon.ohlcv" + txt_path = tmp_path / "test_input_semicolon.txt" + + # Create semicolon-delimited test file + with open(txt_path, 'w') as f: + f.write("timestamp;open;high;low;close;volume\n") + f.write("1609459200;100.0;110.0;90.0;105.0;1000.0\n") + f.write("1609459260;105.0;115.0;95.0;110.0;1200.0\n") + + # Convert TXT to OHLCV + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_txt(txt_path) + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].timestamp == 1609459200 + assert candles[0].close == 105.0 + assert candles[1].timestamp == 1609459260 + assert candles[1].close == 110.0 + + +def __test_ohlcv_txt_conversion_pipe_delimited__(tmp_path): + """Test TXT conversion with pipe-delimited format""" + ohlcv_path = tmp_path / "test_txt_pipe.ohlcv" + txt_path = tmp_path / "test_input_pipe.txt" + + # Create pipe-delimited test file + with open(txt_path, 'w') as f: + f.write("timestamp|open|high|low|close|volume\n") + f.write("1609459200|100.0|110.0|90.0|105.0|1000.0\n") + f.write("1609459260|105.0|115.0|95.0|110.0|1200.0\n") + + # Convert TXT to OHLCV + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_txt(txt_path) + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].timestamp == 1609459200 + assert candles[0].close == 105.0 + + +def __test_ohlcv_txt_conversion_quoted_fields__(tmp_path): + """Test TXT conversion with quoted fields""" + ohlcv_path = tmp_path / "test_txt_quoted.ohlcv" + txt_path = tmp_path / "test_input_quoted.txt" + + # Create test file with quoted fields + with open(txt_path, 'w') as f: + f.write('timestamp\topen\thigh\tlow\tclose\tvolume\n') + f.write('1609459200\t"100.0"\t"110.0"\t"90.0"\t"105.0"\t"1000.0"\n') + f.write("1609459260\t'105.0'\t'115.0'\t'95.0'\t'110.0'\t'1200.0'\n") + + # Convert TXT to OHLCV + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_txt(txt_path) + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 + assert candles[1].close == 110.0 + + +def __test_ohlcv_txt_conversion_with_timezone__(tmp_path): + """Test TXT conversion with timezone handling""" + ohlcv_path = tmp_path / "test_txt_tz.ohlcv" + txt_path = tmp_path / "test_input_tz.txt" + + # Create test file with datetime strings + with open(txt_path, 'w') as f: + f.write("time\topen\thigh\tlow\tclose\tvolume\n") + f.write("2025-01-01 12:00:00\t100.0\t110.0\t90.0\t105.0\t1000.0\n") + f.write("2025-01-01 12:01:00\t105.0\t115.0\t95.0\t110.0\t1200.0\n") + + # Convert TXT to OHLCV with UTC timezone + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_txt(txt_path, tz="UTC") + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 + assert candles[1].close == 110.0 + + +def __test_ohlcv_txt_conversion_date_time_columns__(tmp_path): + """Test TXT conversion with separate date and time columns""" + ohlcv_path = tmp_path / "test_txt_date_time.ohlcv" + txt_path = tmp_path / "test_input_date_time.txt" + + # Create test file with separate date/time columns + with open(txt_path, 'w') as f: + f.write("date\ttime\topen\thigh\tlow\tclose\tvolume\n") + f.write("2025-01-01\t12:00:00\t100.0\t110.0\t90.0\t105.0\t1000.0\n") + f.write("2025-01-01\t12:01:00\t105.0\t115.0\t95.0\t110.0\t1200.0\n") + + # Convert TXT to OHLCV with date/time columns + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_txt(txt_path, date_column="date", time_column="time") + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 + + +def __test_ohlcv_txt_conversion_custom_timestamp_format__(tmp_path): + """Test TXT conversion with custom timestamp format""" + ohlcv_path = tmp_path / "test_txt_custom_fmt.ohlcv" + txt_path = tmp_path / "test_input_custom_fmt.txt" + + # Create test file with custom timestamp format + with open(txt_path, 'w') as f: + f.write("timestamp\topen\thigh\tlow\tclose\tvolume\n") + f.write("01.01.2025 12:00:00\t100.0\t110.0\t90.0\t105.0\t1000.0\n") + f.write("01.01.2025 12:01:00\t105.0\t115.0\t95.0\t110.0\t1200.0\n") + + # Convert TXT to OHLCV with custom timestamp format + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_txt(txt_path, timestamp_format="%d.%m.%Y %H:%M:%S") + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 + + +def __test_ohlcv_txt_conversion_whitespace_handling__(tmp_path): + """Test TXT conversion with extra whitespace""" + ohlcv_path = tmp_path / "test_txt_whitespace.ohlcv" + txt_path = tmp_path / "test_input_whitespace.txt" + + # Create test file with extra whitespace + with open(txt_path, 'w') as f: + f.write(" timestamp \t open \t high \t low \t close \t volume \n") + f.write(" 1609459200 \t 100.0 \t 110.0 \t 90.0 \t 105.0 \t 1000.0 \n") + f.write(" 1609459260 \t 105.0 \t 115.0 \t 95.0 \t 110.0 \t 1200.0 \n") + + # Convert TXT to OHLCV + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_txt(txt_path) + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 + + +def __test_ohlcv_txt_conversion_empty_lines__(tmp_path): + """Test TXT conversion with empty lines""" + ohlcv_path = tmp_path / "test_txt_empty_lines.ohlcv" + txt_path = tmp_path / "test_input_empty_lines.txt" + + # Create test file with empty lines + with open(txt_path, 'w') as f: + f.write("timestamp\topen\thigh\tlow\tclose\tvolume\n") + f.write("\n") # Empty line + f.write("1609459200\t100.0\t110.0\t90.0\t105.0\t1000.0\n") + f.write("\n") # Another empty line + f.write("1609459260\t105.0\t115.0\t95.0\t110.0\t1200.0\n") + f.write("\n") # Final empty line + + # Convert TXT to OHLCV + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_txt(txt_path) + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 + + +def __test_ohlcv_txt_conversion_error_cases__(tmp_path): + """Test TXT conversion error handling""" + ohlcv_path = tmp_path / "test_txt_errors.ohlcv" + + # Test empty file + empty_txt = tmp_path / "empty.txt" + with open(empty_txt, 'w') as f: + f.write("") + + with pytest.raises(ValueError, match="File is empty"): + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_txt(empty_txt) + + # Test file with no delimiter + no_delim_txt = tmp_path / "no_delim.txt" + with open(no_delim_txt, 'w') as f: + f.write("timestamp open high low close volume\n") # Space delimited (not supported) + f.write("1609459200 100.0 110.0 90.0 105.0 1000.0\n") + + with pytest.raises(ValueError, match="No supported delimiter found"): + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_txt(no_delim_txt) + + # Test file with missing required columns + missing_cols_txt = tmp_path / "missing_cols.txt" + with open(missing_cols_txt, 'w') as f: + f.write("timestamp\topen\thigh\n") # Missing low, close, volume + f.write("1609459200\t100.0\t110.0\n") + + with pytest.raises(ValueError, match="Missing required column"): + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_txt(missing_cols_txt) + + # Test file with mismatched column count + mismatch_cols_txt = tmp_path / "mismatch_cols.txt" + with open(mismatch_cols_txt, 'w') as f: + f.write("timestamp\topen\thigh\tlow\tclose\tvolume\n") + f.write("1609459200\t100.0\t110.0\t90.0\t105.0\n") # Missing volume + + with pytest.raises(ValueError, match="Row has 5 columns, expected 6"): + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_txt(mismatch_cols_txt) + + +def __test_ohlcv_txt_conversion_escape_characters__(tmp_path): + """Test TXT conversion with escape characters""" + ohlcv_path = tmp_path / "test_txt_escape.ohlcv" + txt_path = tmp_path / "test_input_escape.txt" + + # Create test file with escape characters (though not commonly used in OHLCV data) + with open(txt_path, 'w') as f: + f.write("timestamp\topen\thigh\tlow\tclose\tvolume\n") + f.write("1609459200\t100.0\t110.0\t90.0\t105.0\t1000.0\n") + f.write("1609459260\t105.0\t115.0\t95.0\t110.0\t1200.0\n") + + # Convert TXT to OHLCV + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_txt(txt_path) + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 + + +def __test_ohlcv_txt_mixed_quote_types__(tmp_path): + """Test TXT conversion with mixed quote types""" + ohlcv_path = tmp_path / "test_txt_mixed_quotes.ohlcv" + txt_path = tmp_path / "test_input_mixed_quotes.txt" + + # Create test file with mixed quote types + with open(txt_path, 'w') as f: + f.write("timestamp\topen\thigh\tlow\tclose\tvolume\n") + f.write('1609459200\t"100.0"\t110.0\t"90.0"\t105.0\t"1000.0"\n') + f.write("1609459260\t'105.0'\t115.0\t'95.0'\t110.0\t'1200.0'\n") + + # Convert TXT to OHLCV + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_txt(txt_path) + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 + assert candles[1].close == 110.0 + + +def __test_ohlcv_csv_conversion_with_timezone__(tmp_path): + """Test CSV conversion with timezone handling""" + ohlcv_path = tmp_path / "test_csv_tz.ohlcv" + csv_path = tmp_path / "test_input_tz.csv" + + # Create CSV file with datetime strings + with open(csv_path, 'w') as f: + f.write("time,open,high,low,close,volume\n") + f.write("2025-01-01 12:00:00,100.0,110.0,90.0,105.0,1000.0\n") + f.write("2025-01-01 12:01:00,105.0,115.0,95.0,110.0,1200.0\n") + + # Convert CSV to OHLCV with UTC timezone + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_csv(csv_path, tz="UTC") + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 + assert candles[1].close == 110.0 + + +def __test_ohlcv_csv_conversion_date_time_columns__(tmp_path): + """Test CSV conversion with separate date and time columns""" + ohlcv_path = tmp_path / "test_csv_date_time.ohlcv" + csv_path = tmp_path / "test_input_date_time.csv" + + # Create CSV file with separate date/time columns + with open(csv_path, 'w') as f: + f.write("date,time,open,high,low,close,volume\n") + f.write("2025-01-01,12:00:00,100.0,110.0,90.0,105.0,1000.0\n") + f.write("2025-01-01,12:01:00,105.0,115.0,95.0,110.0,1200.0\n") + + # Convert CSV to OHLCV with date/time columns + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_csv(csv_path, date_column="date", time_column="time") + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 + + +def __test_ohlcv_csv_conversion_custom_timestamp_format__(tmp_path): + """Test CSV conversion with custom timestamp format""" + ohlcv_path = tmp_path / "test_csv_custom_fmt.ohlcv" + csv_path = tmp_path / "test_input_custom_fmt.csv" + + # Create CSV file with custom timestamp format + with open(csv_path, 'w') as f: + f.write("timestamp,open,high,low,close,volume\n") + f.write("01.01.2025 12:00:00,100.0,110.0,90.0,105.0,1000.0\n") + f.write("01.01.2025 12:01:00,105.0,115.0,95.0,110.0,1200.0\n") + + # Convert CSV to OHLCV with custom timestamp format + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_csv(csv_path, timestamp_format="%d.%m.%Y %H:%M:%S") + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 + + +def __test_ohlcv_csv_conversion_custom_timestamp_column__(tmp_path): + """Test CSV conversion with custom timestamp column name""" + ohlcv_path = tmp_path / "test_csv_custom_ts.ohlcv" + csv_path = tmp_path / "test_input_custom_ts.csv" + + # Create CSV file with custom timestamp column name + with open(csv_path, 'w') as f: + f.write("datetime,open,high,low,close,volume\n") + f.write("2025-01-01 12:00:00,100.0,110.0,90.0,105.0,1000.0\n") + f.write("2025-01-01 12:01:00,105.0,115.0,95.0,110.0,1200.0\n") + + # Convert CSV to OHLCV with custom timestamp column + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_csv(csv_path, timestamp_column="datetime") + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 + + +def __test_ohlcv_csv_conversion_quoted_fields__(tmp_path): + """Test CSV conversion with quoted fields""" + ohlcv_path = tmp_path / "test_csv_quoted.ohlcv" + csv_path = tmp_path / "test_input_quoted.csv" + + # Create CSV file with quoted fields + with open(csv_path, 'w') as f: + f.write("timestamp,open,high,low,close,volume\n") + f.write('1609459200,"100.0","110.0","90.0","105.0","1000.0"\n') + f.write('1609459260,"105.0","115.0","95.0","110.0","1200.0"\n') + + # Convert CSV to OHLCV + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_csv(csv_path) + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 + assert candles[1].close == 110.0 + + +def __test_ohlcv_csv_conversion_error_cases__(tmp_path): + """Test CSV conversion error handling""" + ohlcv_path = tmp_path / "test_csv_errors.ohlcv" + + # Test file with missing required columns + missing_cols_csv = tmp_path / "missing_cols.csv" + with open(missing_cols_csv, 'w') as f: + f.write("timestamp,open,high\n") # Missing low, close, volume + f.write("1609459200,100.0,110.0\n") + + with pytest.raises(ValueError, match="Missing required column"): + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_csv(missing_cols_csv) + + # Test file with invalid timestamp + invalid_ts_csv = tmp_path / "invalid_ts.csv" + with open(invalid_ts_csv, 'w') as f: + f.write("timestamp,open,high,low,close,volume\n") + f.write("invalid-timestamp,100.0,110.0,90.0,105.0,1000.0\n") + + with pytest.raises(ValueError, match="Failed to parse timestamp"): + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_csv(invalid_ts_csv) + + +def __test_ohlcv_csv_conversion_case_insensitive__(tmp_path): + """Test CSV conversion with case insensitive headers""" + ohlcv_path = tmp_path / "test_csv_case.ohlcv" + csv_path = tmp_path / "test_input_case.csv" + + # Create CSV file with mixed case headers + with open(csv_path, 'w') as f: + f.write("TIMESTAMP,Open,HIGH,low,Close,VOLUME\n") + f.write("1609459200,100.0,110.0,90.0,105.0,1000.0\n") + f.write("1609459260,105.0,115.0,95.0,110.0,1200.0\n") + + # Convert CSV to OHLCV + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_csv(csv_path) + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 + + +def __test_ohlcv_json_conversion_with_timezone__(tmp_path): + """Test JSON conversion with timezone handling""" + ohlcv_path = tmp_path / "test_json_tz.ohlcv" + json_path = tmp_path / "test_input_tz.json" + + # Create JSON file with datetime strings + import json + data = [ + {"time": "2025-01-01 12:00:00", "open": 100.0, "high": 110.0, "low": 90.0, "close": 105.0, "volume": 1000.0}, + {"time": "2025-01-01 12:01:00", "open": 105.0, "high": 115.0, "low": 95.0, "close": 110.0, "volume": 1200.0} + ] + with open(json_path, 'w') as f: + json.dump(data, f) + + # Convert JSON to OHLCV with UTC timezone + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_json(json_path, timestamp_field="time", tz="UTC") + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 + assert candles[1].close == 110.0 + + +def __test_ohlcv_json_conversion_date_time_fields__(tmp_path): + """Test JSON conversion with separate date and time fields""" + ohlcv_path = tmp_path / "test_json_date_time.ohlcv" + json_path = tmp_path / "test_input_date_time.json" + + # Create JSON file with separate date/time fields + import json + data = [ + {"date": "2025-01-01", "time": "12:00:00", "open": 100.0, "high": 110.0, "low": 90.0, "close": 105.0, + "volume": 1000.0}, + {"date": "2025-01-01", "time": "12:01:00", "open": 105.0, "high": 115.0, "low": 95.0, "close": 110.0, + "volume": 1200.0} + ] + with open(json_path, 'w') as f: + json.dump(data, f) + + # Convert JSON to OHLCV with date/time fields + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_json(json_path, date_field="date", time_field="time") + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 + + +def __test_ohlcv_json_conversion_custom_timestamp_format__(tmp_path): + """Test JSON conversion with custom timestamp format""" + ohlcv_path = tmp_path / "test_json_custom_fmt.ohlcv" + json_path = tmp_path / "test_input_custom_fmt.json" + + # Create JSON file with custom timestamp format + import json + data = [ + {"timestamp": "01.01.2025 12:00:00", "open": 100.0, "high": 110.0, "low": 90.0, "close": 105.0, + "volume": 1000.0}, + {"timestamp": "01.01.2025 12:01:00", "open": 105.0, "high": 115.0, "low": 95.0, "close": 110.0, + "volume": 1200.0} + ] + with open(json_path, 'w') as f: + json.dump(data, f) + + # Convert JSON to OHLCV with custom timestamp format + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_json(json_path, timestamp_format="%d.%m.%Y %H:%M:%S") + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 + + +def __test_ohlcv_json_conversion_field_mapping__(tmp_path): + """Test JSON conversion with field mapping""" + ohlcv_path = tmp_path / "test_json_mapping.ohlcv" + json_path = tmp_path / "test_input_mapping.json" + + # Create JSON file with custom field names + import json + data = [ + {"t": 1609459200, "o": 100.0, "h": 110.0, "l": 90.0, "c": 105.0, "vol": 1000.0}, + {"t": 1609459260, "o": 105.0, "h": 115.0, "l": 95.0, "c": 110.0, "vol": 1200.0} + ] + with open(json_path, 'w') as f: + json.dump(data, f) + + # Convert JSON to OHLCV with field mapping + mapping = {"timestamp": "t", "open": "o", "high": "h", "low": "l", "close": "c", "volume": "vol"} + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_json(json_path, mapping=mapping) + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 + assert candles[1].close == 110.0 + + +def __test_ohlcv_json_conversion_wrapped_data__(tmp_path): + """Test JSON conversion with wrapped data arrays""" + ohlcv_path = tmp_path / "test_json_wrapped.ohlcv" + json_path = tmp_path / "test_input_wrapped.json" + + # Create JSON file with wrapped data array (common API format) + import json + wrapped_data = { + "data": [ + {"timestamp": 1609459200, "open": 100.0, "high": 110.0, "low": 90.0, "close": 105.0, "volume": 1000.0}, + {"timestamp": 1609459260, "open": 105.0, "high": 115.0, "low": 95.0, "close": 110.0, "volume": 1200.0} + ] + } + with open(json_path, 'w') as f: + json.dump(wrapped_data, f) + + # Convert JSON to OHLCV + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_json(json_path) + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 + + +def __test_ohlcv_json_conversion_millisecond_timestamps__(tmp_path): + """Test JSON conversion with millisecond timestamps""" + ohlcv_path = tmp_path / "test_json_ms.ohlcv" + json_path = tmp_path / "test_input_ms.json" + + # Create JSON file with millisecond timestamps + import json + data = [ + {"timestamp": 1609459200000, "open": 100.0, "high": 110.0, "low": 90.0, "close": 105.0, "volume": 1000.0}, + {"timestamp": 1609459260000, "open": 105.0, "high": 115.0, "low": 95.0, "close": 110.0, "volume": 1200.0} + ] + with open(json_path, 'w') as f: + json.dump(data, f) + + # Convert JSON to OHLCV (should auto-detect and convert milliseconds) + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_json(json_path) + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].timestamp == 1609459200 # Should be converted from ms to s + assert candles[1].timestamp == 1609459260 + assert candles[0].close == 105.0 + + +def __test_ohlcv_json_conversion_auto_field_detection__(tmp_path): + """Test JSON conversion with automatic field detection""" + ohlcv_path = tmp_path / "test_json_auto.ohlcv" + json_path = tmp_path / "test_input_auto.json" + + # Create JSON file with 't' timestamp field (should auto-detect) + import json + data = [ + {"t": 1609459200, "open": 100.0, "high": 110.0, "low": 90.0, "close": 105.0, "volume": 1000.0}, + {"t": 1609459260, "open": 105.0, "high": 115.0, "low": 95.0, "close": 110.0, "volume": 1200.0} + ] + with open(json_path, 'w') as f: + json.dump(data, f) + + # Convert JSON to OHLCV (should auto-detect 't' field) + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_json(json_path) + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 + + +def __test_ohlcv_json_conversion_error_cases__(tmp_path): + """Test JSON conversion error handling""" + ohlcv_path = tmp_path / "test_json_errors.ohlcv" + + # Test file with missing required fields + missing_fields_json = tmp_path / "missing_fields.json" + import json + data = [ + {"timestamp": 1609459200, "open": 100.0, "high": 110.0} # Missing low, close, volume + ] + with open(missing_fields_json, 'w') as f: + json.dump(data, f) + + with pytest.raises(ValueError, match="Missing field in record"): + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_json(missing_fields_json) + + # Test file with no timestamp field + no_timestamp_json = tmp_path / "no_timestamp.json" + data = [ + {"open": 100.0, "high": 110.0, "low": 90.0, "close": 105.0, "volume": 1000.0} # No timestamp + ] + with open(no_timestamp_json, 'w') as f: + json.dump(data, f) + + with pytest.raises(ValueError, match="Could not find timestamp field"): + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_json(no_timestamp_json) + + # Test file with no OHLCV data array + no_data_json = tmp_path / "no_data.json" + data = {"metadata": "some info", "status": "ok"} # No data array + with open(no_data_json, 'w') as f: + json.dump(data, f) + + with pytest.raises(ValueError, match="Could not find OHLCV data array"): + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_json(no_data_json) + + +def __test_ohlcv_json_conversion_alternative_wrappers__(tmp_path): + """Test JSON conversion with alternative wrapper keys""" + ohlcv_path = tmp_path / "test_json_alt_wrap.ohlcv" + json_path = tmp_path / "test_input_alt_wrap.json" + + # Create JSON file with 'candles' wrapper (alternative to 'data') + import json + wrapped_data = { + "candles": [ + {"timestamp": 1609459200, "open": 100.0, "high": 110.0, "low": 90.0, "close": 105.0, "volume": 1000.0}, + {"timestamp": 1609459260, "open": 105.0, "high": 115.0, "low": 95.0, "close": 110.0, "volume": 1200.0} + ] + } + with open(json_path, 'w') as f: + json.dump(wrapped_data, f) + + # Convert JSON to OHLCV + with OHLCVWriter(ohlcv_path) as writer: + writer.load_from_json(json_path) + + # Verify converted data + with OHLCVReader(ohlcv_path) as reader: + candles = list(reader) + assert len(candles) == 2 + assert candles[0].close == 105.0 diff --git a/tests/t00_pynecore/data/test_004_data_converter.py b/tests/t00_pynecore/data/test_004_data_converter.py new file mode 100644 index 0000000..f68baf9 --- /dev/null +++ b/tests/t00_pynecore/data/test_004_data_converter.py @@ -0,0 +1,297 @@ +""" +@pyne +""" +import pytest +from pathlib import Path + +from pynecore.core.data_converter import DataConverter + + +def main(): + """ + Dummy main function to be a valid Pyne script + """ + pass + + +def __test_symbol_provider_detection_ccxt__(): + """Test CCXT-style filename detection""" + dc = DataConverter() + + # Test without ccxt prefix but with exchange + symbol, provider = dc.guess_symbol_from_filename(Path("BINANCE_BTC_USDT.csv")) + assert symbol == "BTC/USDT" + assert provider == "binance" + + # Test exchange with compact symbol + symbol, provider = dc.guess_symbol_from_filename(Path("BINANCE_BTCUSDT.csv")) + assert symbol == "BTC/USDT" + assert provider == "binance" + + # Test with colon separators + symbol, provider = dc.guess_symbol_from_filename(Path("BYBIT:BTC:USDT.csv")) + assert symbol == "BTC/USDT" + assert provider == "bybit" + + # Test ccxt with BYBIT exchange - provider should be bybit, not ccxt + symbol, provider = dc.guess_symbol_from_filename(Path("ccxt_BYBIT_BTC_USDT_USDT_1.csv")) + assert symbol == "BTC/USDT" + assert provider == "bybit" # When ccxt_ prefix, provider is the exchange name + + +def __test_symbol_provider_detection_capitalcom__(): + """Test Capital.com filename detection""" + dc = DataConverter() + + # Test with dots + symbol, provider = dc.guess_symbol_from_filename(Path("capital.com_EURUSD_60.csv")) + assert symbol == "EURUSD" + assert provider == "capital.com" + + # Test with uppercase + symbol, provider = dc.guess_symbol_from_filename(Path("CAPITALCOM_EURUSD.csv")) + assert symbol == "EURUSD" + assert provider == "capitalcom" + + +def __test_symbol_provider_detection_tradingview__(): + """Test TradingView export format detection""" + dc = DataConverter() + + # Test with hash suffix + symbol, provider = dc.guess_symbol_from_filename(Path("CAPITALCOM_EURUSD, 30_cbf9d.csv")) + assert symbol == "EURUSD" + assert provider == "capitalcom" + + # Test TV prefix + symbol, provider = dc.guess_symbol_from_filename(Path("TV_BTCUSD_1h.csv")) + assert symbol == "BTCUSD" + assert provider == "tradingview" + + # Test TradingView prefix + symbol, provider = dc.guess_symbol_from_filename(Path("TRADINGVIEW_AAPL_daily.csv")) + assert symbol == "AAPL" + assert provider == "tradingview" + + +def __test_symbol_provider_detection_metatrader__(): + """Test MetaTrader filename detection""" + dc = DataConverter() + + # Test MT4 format + symbol, provider = dc.guess_symbol_from_filename(Path("MT4_EURUSD_M1.csv")) + assert symbol == "EURUSD" + assert provider == "mt4" + + # Test MT5 format + symbol, provider = dc.guess_symbol_from_filename(Path("MT5_GBPUSD_H1_2024.csv")) + assert symbol == "GBPUSD" + assert provider == "mt5" + + # Test forex pair without explicit provider + symbol, provider = dc.guess_symbol_from_filename(Path("EURUSD.csv")) + assert symbol == "EURUSD" + assert provider == "forex" + + # Test another forex pair + symbol, provider = dc.guess_symbol_from_filename(Path("GBPJPY.csv")) + assert symbol == "GBPJPY" + assert provider == "forex" + + +def __test_symbol_provider_detection_crypto_exchanges__(): + """Test various crypto exchange filename formats""" + dc = DataConverter() + + # Binance + symbol, provider = dc.guess_symbol_from_filename(Path("BINANCE_BTCUSDT.csv")) + assert symbol == "BTC/USDT" + assert provider == "binance" + + # Bybit + symbol, provider = dc.guess_symbol_from_filename(Path("BYBIT_ETH_USDT.csv")) + assert symbol == "ETH/USDT" + assert provider == "bybit" + + # Coinbase + symbol, provider = dc.guess_symbol_from_filename(Path("COINBASE_BTC_USD.csv")) + assert symbol == "BTC/USD" + assert provider == "coinbase" + + # Kraken + symbol, provider = dc.guess_symbol_from_filename(Path("KRAKEN_XRPUSD.csv")) + assert symbol == "XRP/USD" + assert provider == "kraken" + + +def __test_symbol_provider_detection_generic_crypto__(): + """Test generic crypto pair detection without provider""" + dc = DataConverter() + + # Common crypto pairs should be detected + symbol, provider = dc.guess_symbol_from_filename(Path("BTCUSDT.csv")) + assert symbol == "BTC/USDT" + assert provider == "ccxt" + + symbol, provider = dc.guess_symbol_from_filename(Path("ETHUSD.csv")) + assert symbol == "ETH/USD" + assert provider == "ccxt" + + symbol, provider = dc.guess_symbol_from_filename(Path("BTC_USDT.csv")) + assert symbol == "BTC/USDT" + assert provider == "ccxt" + + +def __test_symbol_provider_detection_stock_symbols__(): + """Test stock symbol detection""" + dc = DataConverter() + + # Simple stock symbols + symbol, provider = dc.guess_symbol_from_filename(Path("AAPL.csv")) + assert symbol == "AAPL" + assert provider is None + + symbol, provider = dc.guess_symbol_from_filename(Path("MSFT_daily.csv")) + assert symbol == "MSFT" + assert provider is None + + # With IB provider + symbol, provider = dc.guess_symbol_from_filename(Path("IB_AAPL_1h.csv")) + assert symbol == "AAPL" + assert provider == "ib" + + +def __test_symbol_provider_detection_complex_filenames__(): + """Test complex filename patterns""" + dc = DataConverter() + + # Multiple underscores and timeframe - ccxt_ prefix means provider is exchange + symbol, provider = dc.guess_symbol_from_filename(Path("ccxt_BYBIT_BTC_USDT_USDT_5.csv")) + assert symbol == "BTC/USDT" + assert provider == "bybit" # When ccxt_ prefix, provider is the exchange name + + # Mixed case - Note: Mixed case may not be detected properly + # Using uppercase for consistency + symbol, provider = dc.guess_symbol_from_filename(Path("CAPITAL.COM_EURUSD_60.csv")) + assert symbol == "EURUSD" + assert provider == "capital.com" + + # With date suffix + symbol, provider = dc.guess_symbol_from_filename(Path("MT5_EURUSD_2024_01_01.csv")) + assert symbol == "EURUSD" + assert provider == "mt5" + + +def __test_symbol_provider_detection_edge_cases__(): + """Test edge cases and invalid formats""" + dc = DataConverter() + + # Empty filename + symbol, provider = dc.guess_symbol_from_filename(Path(".csv")) + assert symbol is None + assert provider is None + + # Too short symbol + symbol, provider = dc.guess_symbol_from_filename(Path("XX.csv")) + assert symbol is None + assert provider is None + + # Only provider, no symbol + symbol, provider = dc.guess_symbol_from_filename(Path("CCXT.csv")) + assert symbol is None + assert provider == "ccxt" + + # Numbers only (should not be detected as symbol) + symbol, provider = dc.guess_symbol_from_filename(Path("12345.csv")) + assert symbol is None + assert provider is None + + +def __test_symbol_provider_detection_forex_pairs__(): + """Test various forex pair formats""" + dc = DataConverter() + + # Standard 6-char format + symbol, provider = dc.guess_symbol_from_filename(Path("EURUSD.csv")) + assert symbol == "EURUSD" + assert provider == "forex" + + # With separator + symbol, provider = dc.guess_symbol_from_filename(Path("EUR_USD.csv")) + assert symbol == "EURUSD" + assert provider == "forex" + + # With slash + symbol, provider = dc.guess_symbol_from_filename(Path("EUR-USD.csv")) + assert symbol == "EURUSD" + assert provider == "forex" + + # Less common pairs + symbol, provider = dc.guess_symbol_from_filename(Path("NZDJPY.csv")) + assert symbol == "NZDJPY" + assert provider == "forex" + + +def __test_symbol_provider_detection_our_format__(): + """Test PyneCore own format detection""" + dc = DataConverter() + + # Our format with provider and symbol + symbol, provider = dc.guess_symbol_from_filename(Path("capitalcom_EURUSD_60.ohlcv")) + assert symbol == "EURUSD" + assert provider == "capitalcom" + + # CCXT style with exchange - provider is exchange name + symbol, provider = dc.guess_symbol_from_filename(Path("ccxt_BYBIT_BTC_USDT_USDT_1.ohlcv")) + assert symbol == "BTC/USDT" + assert provider == "bybit" # When ccxt_ prefix, provider is the exchange name + + # Simple format without provider + symbol, provider = dc.guess_symbol_from_filename(Path("BTCUSD_1h.ohlcv")) + assert symbol == "BTC/USD" + assert provider == "ccxt" # Should default to ccxt for crypto + + +# Test runner functions that pytest will find +def test_symbol_provider_detection_ccxt(): + __test_symbol_provider_detection_ccxt__() + + +def test_symbol_provider_detection_capitalcom(): + __test_symbol_provider_detection_capitalcom__() + + +def test_symbol_provider_detection_tradingview(): + __test_symbol_provider_detection_tradingview__() + + +def test_symbol_provider_detection_metatrader(): + __test_symbol_provider_detection_metatrader__() + + +def test_symbol_provider_detection_crypto_exchanges(): + __test_symbol_provider_detection_crypto_exchanges__() + + +def test_symbol_provider_detection_generic_crypto(): + __test_symbol_provider_detection_generic_crypto__() + + +def test_symbol_provider_detection_stock_symbols(): + __test_symbol_provider_detection_stock_symbols__() + + +def test_symbol_provider_detection_complex_filenames(): + __test_symbol_provider_detection_complex_filenames__() + + +def test_symbol_provider_detection_edge_cases(): + __test_symbol_provider_detection_edge_cases__() + + +def test_symbol_provider_detection_forex_pairs(): + __test_symbol_provider_detection_forex_pairs__() + + +def test_symbol_provider_detection_our_format(): + __test_symbol_provider_detection_our_format__() \ No newline at end of file diff --git a/tests/t00_pynecore/data/test_005_symbol_type_detection.py b/tests/t00_pynecore/data/test_005_symbol_type_detection.py new file mode 100644 index 0000000..9d40a62 --- /dev/null +++ b/tests/t00_pynecore/data/test_005_symbol_type_detection.py @@ -0,0 +1,229 @@ +""" +@pyne +""" +import pytest +from pathlib import Path + +from pynecore.core.data_converter import DataConverter + + +def main(): + """ + Dummy main function to be a valid Pyne script + """ + pass + + +def __test_detect_symbol_type_forex__(): + """Test forex pair detection""" + dc = DataConverter() + + # Standard forex pairs + symbol_type, currency, base = dc.guess_symbol_type("EURUSD") + assert symbol_type == "forex" + assert currency == "USD" + assert base == "EUR" + + symbol_type, currency, base = dc.guess_symbol_type("EUR/USD") + assert symbol_type == "forex" + assert currency == "USD" + assert base == "EUR" + + symbol_type, currency, base = dc.guess_symbol_type("GBPUSD") + assert symbol_type == "forex" + assert currency == "USD" + assert base == "GBP" + + symbol_type, currency, base = dc.guess_symbol_type("USDJPY") + assert symbol_type == "forex" + assert currency == "JPY" + assert base == "USD" + + # Less common forex pairs + symbol_type, currency, base = dc.guess_symbol_type("NZDJPY") + assert symbol_type == "forex" + assert currency == "JPY" + assert base == "NZD" + + symbol_type, currency, base = dc.guess_symbol_type("EURCHF") + assert symbol_type == "forex" + assert currency == "CHF" + assert base == "EUR" + + +def __test_detect_symbol_type_crypto__(): + """Test crypto pair detection""" + dc = DataConverter() + + # Common crypto pairs + symbol_type, currency, base = dc.guess_symbol_type("BTC/USDT") + assert symbol_type == "crypto" + assert currency == "USDT" + assert base == "BTC" + + symbol_type, currency, base = dc.guess_symbol_type("BTCUSDT") + assert symbol_type == "crypto" + assert currency == "USDT" + assert base == "BTC" + + symbol_type, currency, base = dc.guess_symbol_type("ETH/USD") + assert symbol_type == "crypto" + assert currency == "USD" + assert base == "ETH" + + symbol_type, currency, base = dc.guess_symbol_type("ETHUSDT") + assert symbol_type == "crypto" + assert currency == "USDT" + assert base == "ETH" + + # Other crypto coins + symbol_type, currency, base = dc.guess_symbol_type("ADAUSDT") + assert symbol_type == "crypto" + assert currency == "USDT" + assert base == "ADA" + + symbol_type, currency, base = dc.guess_symbol_type("DOGEUSDT") + assert symbol_type == "crypto" + assert currency == "USDT" + assert base == "DOGE" + + symbol_type, currency, base = dc.guess_symbol_type("SOL/USDC") + assert symbol_type == "crypto" + assert currency == "USDC" + assert base == "SOL" + + +def __test_detect_symbol_type_other__(): + """Test that unknown symbols default to 'other' type""" + dc = DataConverter() + + # Stock-like symbols should be 'other' now + symbol_type, currency, base = dc.guess_symbol_type("AAPL") + assert symbol_type == "other" + assert currency == "USD" + assert base is None + + symbol_type, currency, base = dc.guess_symbol_type("MSFT") + assert symbol_type == "other" + assert currency == "USD" + assert base is None + + # Unknown patterns + symbol_type, currency, base = dc.guess_symbol_type("UNKNOWN") + assert symbol_type == "other" + assert currency == "USD" + assert base is None + + symbol_type, currency, base = dc.guess_symbol_type("ABC123") + assert symbol_type == "other" + assert currency == "USD" + assert base is None + + # Too short + symbol_type, currency, base = dc.guess_symbol_type("XY") + assert symbol_type == "other" + assert currency == "USD" + assert base is None + + +def __test_detect_symbol_type_edge_cases__(): + """Test edge cases""" + dc = DataConverter() + + # Ambiguous 6-letter that could be forex or other + symbol_type, currency, base = dc.guess_symbol_type("ABCDEF") + assert symbol_type == "other" # Not recognized forex pair + assert currency == "USD" + assert base is None + + # Crypto with USD (not USDT) + symbol_type, currency, base = dc.guess_symbol_type("BTCUSD") + assert symbol_type == "crypto" + assert currency == "USD" + assert base == "BTC" + + # Test with uppercase (method expects uppercase input) + symbol_type, currency, base = dc.guess_symbol_type("BTCUSDT") + assert symbol_type == "crypto" + assert currency == "USDT" + assert base == "BTC" + + # With underscores (cleaned internally) + symbol_type, currency, base = dc.guess_symbol_type("EUR_USD") + assert symbol_type == "forex" + assert currency == "USD" + assert base == "EUR" + + +def __test_detect_symbol_type_forex_with_slash__(): + """Test forex pairs with explicit slash notation""" + dc = DataConverter() + + # These should be detected as forex because both parts are 3-letter currency codes + symbol_type, currency, base = dc.guess_symbol_type("EUR/USD") + assert symbol_type == "forex" + assert currency == "USD" + assert base == "EUR" + + symbol_type, currency, base = dc.guess_symbol_type("GBP/JPY") + assert symbol_type == "forex" + assert currency == "JPY" + assert base == "GBP" + + symbol_type, currency, base = dc.guess_symbol_type("AUD/CAD") + assert symbol_type == "forex" + assert currency == "CAD" + assert base == "AUD" + + +def __test_detect_symbol_type_special_cases__(): + """Test special handling cases""" + dc = DataConverter() + + # BTCUSD vs BTCUSDT - both should be crypto + symbol_type, currency, base = dc.guess_symbol_type("BTCUSD") + assert symbol_type == "crypto" + assert currency == "USD" + assert base == "BTC" + + symbol_type, currency, base = dc.guess_symbol_type("BTCUSDT") + assert symbol_type == "crypto" + assert currency == "USDT" + assert base == "BTC" + + # 6-letter code that happens to have a crypto symbol in it + # but isn't a standard pair + symbol_type, currency, base = dc.guess_symbol_type("BTCXYZ") + assert symbol_type == "crypto" # BTC is detected + # Currency extraction might vary + + # Forex currencies in non-standard order (still detected as forex) + symbol_type, currency, base = dc.guess_symbol_type("JPYEUR") + assert symbol_type == "forex" # Both JPY and EUR are forex currencies + assert currency == "EUR" + assert base == "JPY" + + +# Test runner functions that pytest will find +def test_detect_symbol_type_forex(): + __test_detect_symbol_type_forex__() + + +def test_detect_symbol_type_crypto(): + __test_detect_symbol_type_crypto__() + + +def test_detect_symbol_type_other(): + __test_detect_symbol_type_other__() + + +def test_detect_symbol_type_edge_cases(): + __test_detect_symbol_type_edge_cases__() + + +def test_detect_symbol_type_forex_with_slash(): + __test_detect_symbol_type_forex_with_slash__() + + +def test_detect_symbol_type_special_cases(): + __test_detect_symbol_type_special_cases__() \ No newline at end of file diff --git a/tests/t01_lib/t01_timeframe/test_003_from_seconds.py b/tests/t01_lib/t01_timeframe/test_003_from_seconds.py new file mode 100644 index 0000000..d66ea47 --- /dev/null +++ b/tests/t01_lib/t01_timeframe/test_003_from_seconds.py @@ -0,0 +1,34 @@ +""" +@pyne +""" +from pynecore.lib import script, timeframe + + +@script.indicator(title="Timeframe from_seconds()", shorttitle="tf_fs") +def main(): + # Test basic conversions + assert timeframe.from_seconds(1) == "1S" + assert timeframe.from_seconds(30) == "30S" + assert timeframe.from_seconds(60) == "1" # 1 minute - THIS WAS THE BUG! + assert timeframe.from_seconds(120) == "2" # 2 minutes + assert timeframe.from_seconds(300) == "5" # 5 minutes + assert timeframe.from_seconds(900) == "15" # 15 minutes + assert timeframe.from_seconds(3600) == "60" # 1 hour + assert timeframe.from_seconds(14400) == "240" # 4 hours + assert timeframe.from_seconds(86400) == "1D" # 1 day + assert timeframe.from_seconds(172800) == "2D" # 2 days + assert timeframe.from_seconds(604800) == "1W" # 1 week + assert timeframe.from_seconds(1209600) == "2W" # 2 weeks + assert timeframe.from_seconds(2419200) == "1M" # 1 month (4 weeks) + assert timeframe.from_seconds(4838400) == "2M" # 2 months + + # Edge cases + assert timeframe.from_seconds(59) == "59S" # Not divisible by 60 + assert timeframe.from_seconds(61) == "61S" # Not divisible by 60 + assert timeframe.from_seconds(180) == "3" # 3 minutes + assert timeframe.from_seconds(240) == "4" # 4 minutes + + +def __test_timeframe_from_seconds__(runner, dummy_ohlcv_iter): + """ timeframe.from_seconds() """ + next(runner(dummy_ohlcv_iter).run_iter()) \ No newline at end of file