<a href="https://colab.research.google.com/github/BaronVonBussin/NewTransit/blob/main/domains_20241229.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import os
import pandas as pd

class DomainProcessor:
    def __init__(self, domain_depth=12):
        self.domain_depth = domain_depth

    def read_csv_files(self, input_folder):
        """
        Reads all CSV files from the input folder. Assumes files are named TICKER_{temporal_period}.csv
        and contain columns: date, open, high, low, close.
        """
        data = {}
        for file in os.listdir(input_folder):
            if file.endswith(".csv"):
                ticker = os.path.splitext(file)[0]
                df = pd.read_csv(
                    os.path.join(input_folder, file),
                    parse_dates=["date"]
                )
                data[ticker] = df
        return data

    def calculate_domains(self, df):
        """
        Calculate domains based on the rolling range and domain persistence rules.
        """
        domains = []
        domain_id = 0
        active_domains = []

        # Iterate over the rows
        for i, row in df.iterrows():
            current_high = row["high"]
            current_low = row["low"]
            current_close = row["close"]
            current_date = row["date"]

            # Generate rolling ranges for up to N bars
            for depth in range(1, self.domain_depth + 1):
                if i < depth:
                    continue  # Skip if there aren't enough prior bars

                # Calculate rolling high and low
                rolling_high = df.loc[i - depth + 1:i, "high"].max()
                rolling_low = df.loc[i - depth + 1:i, "low"].min()
                rolling_open = df.loc[i - depth + 1, "open"]
                rolling_close = df.loc[i - 1, "close"]

                # Check if the current bar is within the rolling range
                if current_high <= rolling_high and current_low >= rolling_low:
                    # Create a domain if one does not already exist
                    domain_id += 1
                    domain = {
                        "domain_id": domain_id,
                        "domain_type": "P",
                        "domain_high": rolling_high,
                        "domain_low": rolling_low,
                        "domain_close_original": rolling_close,
                        "domain_close_last": current_close,
                        "domain_range": rolling_high - rolling_low,
                        "domain_duration": 1,
                        "domain_percentr_original": (rolling_close - rolling_low) / (rolling_high - rolling_low),
                        "domain_edge_bias_direction_original": "U" if (rolling_close - rolling_low) / (rolling_high - rolling_low) >= 0.5 else "D",
                        "domain_percentr_active": (current_close - rolling_low) / (rolling_high - rolling_low),
                        "domain_edge_bias_direction_active": "U" if (current_close - rolling_low) / (rolling_high - rolling_low) >= 0.5 else "D",
                        "domain_true_duration": depth,
                    }

                    # Update active domains and check for nesting
                    active_domains.append(domain)
                    for active_domain in active_domains:
                        active_high = active_domain["domain_high"]
                        active_low = active_domain["domain_low"]

                        if rolling_high <= active_high and rolling_low >= active_low:
                            # Nested Domain
                            active_domain["domain_type"] = "N"
                            active_domain["domain_duration"] += 1
                            active_domain["domain_close_last"] = current_close
                            active_domain["domain_percentr_active"] = (current_close - active_low) / active_domain["domain_range"]
                            active_domain["domain_edge_bias_direction_active"] = "U" if active_domain["domain_percentr_active"] >= 0.5 else "D"

                    # Add the domain to the list if it doesn't already exist
                    if domain not in domains:
                        domains.append(domain)

        return pd.DataFrame(domains)

    def process_files(self, input_folder, output_folder):
        """
        Processes all CSV files in the input folder and generates detailed domain outputs.
        """
        if not os.path.exists(output_folder):
            os.makedirs(output_folder)

        data = self.read_csv_files(input_folder)
        for ticker, df in data.items():
            print(f"Processing {ticker}...")
            domains_df = self.calculate_domains(df)

            # Save to output
            output_file = os.path.join(output_folder, f"{ticker}_Domains.csv")
            domains_df.to_csv(output_file, index=False)
            print(f"Domain file saved for {ticker}: {output_file}")


# Example Usage
input_folder = "input"
output_folder = "domain_output"

processor = DomainProcessor(domain_depth=12)
processor.process_files(input_folder, output_folder)


  df = pd.read_csv(


Processing MMM_D...
Domain file saved for MMM_D: domain_output/MMM_D_Domains.csv
Processing AAPL_D...
Domain file saved for AAPL_D: domain_output/AAPL_D_Domains.csv
Processing AFL_D...
Domain file saved for AFL_D: domain_output/AFL_D_Domains.csv
Processing WTI_D...
Domain file saved for WTI_D: domain_output/WTI_D_Domains.csv


In [5]:
import os
import pandas as pd

class DomainProcessor:
    def __init__(self, domain_depth=12):
        self.domain_depth = domain_depth

    def read_csv_files(self, input_folder):
        """
        Reads all CSV files from the input folder. Assumes files are named TICKER_{temporal_period}.csv
        and contain columns: date, open, high, low, close.
        """
        data = {}
        for file in os.listdir(input_folder):
            if file.endswith(".csv"):
                ticker = os.path.splitext(file)[0]
                df = pd.read_csv(
                    os.path.join(input_folder, file),
                )
                data[ticker] = df
        return data

    def calculate_domains(self, df):
        """
        Calculate domains based on the rolling range and domain persistence rules.
        """
        domains = []
        domain_id = 0
        rolling_ranges = []
        open_domains = []

        # Iterate over the rows
        for i, row in df.iterrows():
            current_high = row["high"]
            current_low = row["low"]
            current_close = row["close"]
            current_open = row["open"]
            current_date = row["date"]

            # Skip rows until enough bars are available for the largest rolling range depth
            if i + 1 < self.domain_depth:
                continue

            # Calculate rolling ranges for up to domain_depth
            for depth in range(1, self.domain_depth + 1):
                if i + 1 < depth:
                    continue  # Skip if not enough data for this depth

                rolling_high = df.loc[i - depth + 1:i, "high"].max()
                rolling_low = df.loc[i - depth + 1:i, "low"].min()
                rolling_open = df.loc[i - depth + 1, "open"]
                rolling_close = df.loc[i - 1, "close"]

                rolling_ranges.append({
                    "depth": depth,
                    "rolling_high": rolling_high,
                    "rolling_low": rolling_low,
                    "rolling_open": rolling_open,
                    "rolling_close": rolling_close,
                    "start_date": df.loc[i - depth + 1, "date"],
                })

            # Check for domains within rolling ranges
            for rolling_range in rolling_ranges:
                if current_high <= rolling_range["rolling_high"] and current_low >= rolling_range["rolling_low"]:
                    # Create or update an open domain
                    domain = next(
                        (d for d in open_domains if d["domain_high"] == rolling_range["rolling_high"]
                         and d["domain_low"] == rolling_range["rolling_low"]),
                        None
                    )
                    if domain:
                        # Update existing domain
                        domain["domain_close_last"] = current_close
                        domain["domain_duration"] += 1
                        domain["domain_percentr_active"] = (
                            (current_close - domain["domain_low"]) / domain["domain_range"]
                        )
                        domain["domain_edge_bias_direction_active"] = (
                            "U" if domain["domain_percentr_active"] >= 0.5 else "D"
                        )
                    else:
                        # Create new domain
                        domain_id += 1
                        open_domains.append({
                            "domain_id": domain_id,
                            "domain_start_date": rolling_range["start_date"],
                            "domain_type": "P",  # Default to primary
                            "domain_open": rolling_range["rolling_open"],
                            "domain_high": rolling_range["rolling_high"],
                            "domain_low": rolling_range["rolling_low"],
                            "domain_close_original": rolling_range["rolling_close"],
                            "domain_close_last": current_close,
                            "domain_range": rolling_range["rolling_high"] - rolling_range["rolling_low"],
                            "domain_duration": 1,
                            "domain_percentr_original": (
                                (rolling_range["rolling_close"] - rolling_range["rolling_low"]) /
                                (rolling_range["rolling_high"] - rolling_range["rolling_low"])
                            ),
                            "domain_edge_bias_direction_original": (
                                "U" if (rolling_range["rolling_close"] - rolling_range["rolling_low"]) /
                                       (rolling_range["rolling_high"] - rolling_range["rolling_low"]) >= 0.5 else "D"
                            ),
                            "domain_percentr_active": (
                                (current_close - rolling_range["rolling_low"]) /
                                (rolling_range["rolling_high"] - rolling_range["rolling_low"])
                            ),
                            "domain_edge_bias_direction_active": (
                                "U" if (current_close - rolling_range["rolling_low"]) /
                                       (rolling_range["rolling_high"] - rolling_range["rolling_low"]) >= 0.5 else "D"
                            ),
                            "domain_true_duration": rolling_range["depth"],
                            "state": "open",
                        })

            # Close domains that have been expanded
            for domain in open_domains:
                if current_high > domain["domain_high"] or current_low < domain["domain_low"]:
                    domain["state"] = "closed"
                    domains.append(domain)
            open_domains = [d for d in open_domains if d["state"] == "open"]

        # Append any remaining open domains
        domains.extend(open_domains)
        return pd.DataFrame(domains)

    def process_files(self, input_folder, output_folder):
        """
        Processes all CSV files in the input folder and generates detailed domain outputs.
        """
        if not os.path.exists(output_folder):
            os.makedirs(output_folder)

        data = self.read_csv_files(input_folder)
        for ticker, df in data.items():
            print(f"Processing {ticker}...")
            domains_df = self.calculate_domains(df)

            # Save to output
            output_file = os.path.join(output_folder, f"{ticker}_Domains.csv")
            domains_df.to_csv(output_file, index=False)
            print(f"Domain file saved for {ticker}: {output_file}")


# Example Usage
input_folder = "input"
output_folder = "domain_output"

processor = DomainProcessor(domain_depth=12)
processor.process_files(input_folder, output_folder)


Processing MMM_D...
Domain file saved for MMM_D: domain_output/MMM_D_Domains.csv
Processing WTI_M...
Domain file saved for WTI_M: domain_output/WTI_M_Domains.csv
Processing AAPL_D...
Domain file saved for AAPL_D: domain_output/AAPL_D_Domains.csv
Processing AFL_D...
Domain file saved for AFL_D: domain_output/AFL_D_Domains.csv


In [1]:
import os
import pandas as pd


class DomainProcessor:
    def __init__(self, domain_depth=12, domain_primary_check_duration=20):
        self.domain_depth = domain_depth
        self.domain_primary_check_duration = domain_primary_check_duration

    def read_csv_files(self, input_folder):
        """
        Reads all CSV files from the input folder. Assumes files are named TICKER_{TEMPORAL_PERIOD}.csv
        and contain columns: date, open, high, low, close.
        """
        data = {}
        for file in os.listdir(input_folder):
            if file.endswith(".csv"):
                ticker = os.path.splitext(file)[0]
                df = pd.read_csv(
                    os.path.join(input_folder, file),
                )
                df.columns = df.columns.str.lower().str.strip()  # Standardize column names
                data[ticker] = df
        return data

    def calculate_rolling_ranges(self, df):
        """
        Calculate rolling ranges for each bar up to the domain_depth.
        """
        rolling_ranges = []
        for depth in range(1, self.domain_depth + 1):
            if len(df) < depth:
                continue
            df[f"rolling_high_{depth}"] = df["high"].rolling(depth).max()
            df[f"rolling_low_{depth}"] = df["low"].rolling(depth).min()
            df[f"rolling_open_{depth}"] = df["open"].shift(depth - 1)
            df[f"rolling_close_{depth}"] = df["close"].shift(1)
            rolling_ranges.append(depth)
        return rolling_ranges

    def identify_domains(self, df, rolling_ranges):
        """
        Identify domains based on rolling ranges and classify them.
        """
        domains = []
        domain_id = 0
        open_domains = []

        for i, row in df.iterrows():
            current_high = row["high"]
            current_low = row["low"]
            current_close = row["close"]

            if i < self.domain_depth - 1:
                continue  # Skip rows until there are enough bars

            for depth in rolling_ranges:
                rolling_high = row[f"rolling_high_{depth}"]
                rolling_low = row[f"rolling_low_{depth}"]
                rolling_open = row[f"rolling_open_{depth}"]
                rolling_close = row[f"rolling_close_{depth}"]

                if current_high <= rolling_high and current_low >= rolling_low:
                    domain_id += 1
                    domain = {
                        "domain_id": domain_id,
                        "domain_rolling_range_duration": depth,
                        "domain_start_date": row["date"],
                        "domain_type": "P",  # Default to primary
                        "domain_open": rolling_open,
                        "domain_high": rolling_high,
                        "domain_low": rolling_low,
                        "domain_close_original": rolling_close,
                        "domain_close_last": current_close,
                        "domain_range": rolling_high - rolling_low,
                        "domain_captive_count": 1,
                        "domain_captive_range_high": rolling_high,
                        "domain_captive_range_low": rolling_low,
                        "domain_true_flag": 1,  # Default to true
                        "domain_true_duration": depth,
                        "domain_percentr_original": (rolling_close - rolling_low) / (rolling_high - rolling_low),
                        "domain_edge_bias_direction_original": "U" if (rolling_close - rolling_low) /
                                                                 (rolling_high - rolling_low) >= 0.5 else "D",
                        "domain_percentr_active": (current_close - rolling_low) / (rolling_high - rolling_low),
                        "domain_edge_bias_direction_active": "U" if (current_close - rolling_low) /
                                                               (rolling_high - rolling_low) >= 0.5 else "D"
                    }

                    open_domains.append(domain)

            # Close domains that have been expanded
            for domain in open_domains:
                if current_high > domain["domain_high"] or current_low < domain["domain_low"]:
                    domain["domain_captive_count"] += 1
                    domain["domain_captive_range_high"] = max(domain["domain_captive_range_high"], current_high)
                    domain["domain_captive_range_low"] = min(domain["domain_captive_range_low"], current_low)
                    domain["domain_close_last"] = current_close
                    domain["domain_percentr_active"] = (current_close - domain["domain_low"]) / domain["domain_range"]
                    domain["domain_edge_bias_direction_active"] = "U" if domain["domain_percentr_active"] >= 0.5 else "D"

            open_domains = [d for d in open_domains if current_high <= d["domain_high"] and current_low >= d["domain_low"]]

        # Add all remaining open domains
        domains.extend(open_domains)
        return pd.DataFrame(domains)

    def process_files(self, input_folder, output_folder):
        """
        Process all files in the input folder and save domain outputs to the output folder.
        """
        if not os.path.exists(output_folder):
            os.makedirs(output_folder)

        data = self.read_csv_files(input_folder)
        for ticker, df in data.items():
            print(f"Processing {ticker}...")

            # Calculate rolling ranges
            rolling_ranges = self.calculate_rolling_ranges(df)

            # Identify domains
            domains_df = self.identify_domains(df, rolling_ranges)

            # Save to output file
            output_file = os.path.join(output_folder, f"{ticker}_Domains.csv")
            domains_df.to_csv(output_file, index=False)
            print(f"Saved domains for {ticker} to {output_file}")


# Example Usage
input_folder = "input"
output_folder = "domain_output"

processor = DomainProcessor(domain_depth=12, domain_primary_check_duration=20)
processor.process_files(input_folder, output_folder)


Processing MMM_D...
Saved domains for MMM_D to domain_output/MMM_D_Domains.csv
Processing AAPL_D...
Saved domains for AAPL_D to domain_output/AAPL_D_Domains.csv
Processing AFL_D...
Saved domains for AFL_D to domain_output/AFL_D_Domains.csv


In [2]:
import os
import pandas as pd


class DomainProcessor:
    def __init__(self, domain_depth=12, domain_primary_check_duration=20):
        self.domain_depth = domain_depth
        self.domain_primary_check_duration = domain_primary_check_duration

    def read_csv_files(self, input_folder):
        """
        Reads all CSV files from the input folder. Assumes files are named TICKER_{TEMPORAL_PERIOD}.csv
        and contain columns: date, open, high, low, close.
        """
        data = {}
        for file in os.listdir(input_folder):
            if file.endswith(".csv"):
                ticker = os.path.splitext(file)[0]
                df = pd.read_csv(
                    os.path.join(input_folder, file),
                    #skiprows=1  # Data starts on the second row
                )
                df.columns = df.columns.str.lower().str.strip()  # Standardize column names
                data[ticker] = df
        return data

    def calculate_rolling_ranges(self, df, rolling_window):
        """
        Calculate rolling high and low for a specified rolling window.
        """
        df[f"rolling_high_{rolling_window}"] = df["high"].rolling(window=rolling_window).max().shift(1)
        df[f"rolling_low_{rolling_window}"] = df["low"].rolling(window=rolling_window).min().shift(1)
        return df

    def identify_domains(self, df, rolling_window):
        """
        Identify domains based on the rolling range and classify them.
        """
        domains = []
        domain_id = 0
        open_domains = []

        for i, row in df.iterrows():
            if pd.isna(row[f"rolling_high_{rolling_window}"]) or pd.isna(row[f"rolling_low_{rolling_window}"]):
                continue  # Skip rows without a valid rolling range

            current_high = row["high"]
            current_low = row["low"]
            current_close = row["close"]
            current_date = row["date"]

            rolling_high = row[f"rolling_high_{rolling_window}"]
            rolling_low = row[f"rolling_low_{rolling_window}"]

            # Check if the current bar is fully inside the rolling range
            if current_high <= rolling_high and current_low >= rolling_low:
                if len(open_domains) == 0 or open_domains[-1]["state"] == "closed":
                    # Create a new domain
                    domain_id += 1
                    domain = {
                        "domain_id": domain_id,
                        "domain_rolling_range_duration": rolling_window,
                        "domain_start_date": current_date,
                        "domain_type": "P",  # Default to primary
                        "domain_open": row["open"],
                        "domain_high": rolling_high,
                        "domain_low": rolling_low,
                        "domain_close_original": row["close"],
                        "domain_close_last": row["close"],
                        "domain_range": rolling_high - rolling_low,
                        "domain_captive_count": 1,
                        "state": "open"
                    }
                    open_domains.append(domain)
                else:
                    # Update the current open domain
                    open_domain = open_domains[-1]
                    open_domain["domain_captive_count"] += 1
                    open_domain["domain_close_last"] = current_close

            # Close domains if the current bar expands the rolling range
            for domain in open_domains:
                if current_high > domain["domain_high"] or current_low < domain["domain_low"]:
                    domain["state"] = "closed"
                    domains.append(domain)

            # Remove closed domains
            open_domains = [d for d in open_domains if d["state"] == "open"]

        # Append any remaining open domains
        domains.extend(open_domains)
        return pd.DataFrame(domains)

    def process_files(self, input_folder, output_folder):
        """
        Process all files in the input folder and save domain outputs to the output folder.
        """
        if not os.path.exists(output_folder):
            os.makedirs(output_folder)

        data = self.read_csv_files(input_folder)
        for ticker, df in data.items():
            print(f"Processing {ticker}...")

            # Process each rolling range duration
            all_domains = []
            for rolling_window in range(1, self.domain_depth + 1):
                df = self.calculate_rolling_ranges(df, rolling_window)
                domains_df = self.identify_domains(df, rolling_window)
                all_domains.append(domains_df)

            # Combine all domain dataframes
            all_domains_df = pd.concat(all_domains, ignore_index=True)

            # Save to output file
            output_file = os.path.join(output_folder, f"{ticker}_Domains.csv")
            all_domains_df.to_csv(output_file, index=False)
            print(f"Saved domains for {ticker} to {output_file}")


# Example Usage
input_folder = "input"
output_folder = "domain_output"

processor = DomainProcessor(domain_depth=12, domain_primary_check_duration=20)
processor.process_files(input_folder, output_folder)


Processing MMM_D...
Saved domains for MMM_D to domain_output/MMM_D_Domains.csv
Processing AAPL_D...
Saved domains for AAPL_D to domain_output/AAPL_D_Domains.csv
Processing AFL_D...
Saved domains for AFL_D to domain_output/AFL_D_Domains.csv
