In [None]:
!pip install graphviz

In [None]:
def extract_sql_and_physical_plan(file_path):
    result = []
    with open(file_path, 'r', encoding='utf-8') as file:
        content = file.read()
    
    # Split the content by the separator
    entries = content.split('------------------------------------------------------------------------------------------------------------------')

    for entry in entries:
        current_entry = {}
        capture_mode = None
        buffer = []
        
        lines = entry.splitlines()
        
        for line in lines:
            stripped_line = line.strip()

            if stripped_line.startswith("[SQL Query]"):
                # Save previous entry if complete
                if 'SQL Query' in current_entry and 'Physical plan' in current_entry:
                    result.append(current_entry)
                current_entry = {}
                capture_mode = 'SQL Query'
                buffer = []
                continue
            elif stripped_line.startswith("[Physical plan]"):
                if capture_mode and buffer:
                    current_entry[capture_mode] = '\n'.join(buffer).strip()
                capture_mode = 'Physical plan'
                buffer = []
                continue
            elif stripped_line.startswith("[") and stripped_line.endswith("]"):
                # Save current buffer if switching to a new section
                if capture_mode and buffer:
                    current_entry[capture_mode] = '\n'.join(buffer).strip()
                capture_mode = None
                buffer = []
                continue
            
            if capture_mode:
                buffer.append(line.rstrip())

        # Catch the last entry if needed
        if capture_mode and buffer:
            current_entry[capture_mode] = '\n'.join(buffer).strip()
        if 'SQL Query' in current_entry and 'Physical plan' in current_entry:
            result.append(current_entry)

    return result


In [None]:
file_path = 'physical_plans.txt'
data = extract_sql_and_physical_plan(file_path)

for entry in data:
    print("SQL Query:\n", entry['SQL Query'])
    print("Physical plan:\n", entry['Physical plan'])
    print('-' * 80)

In [None]:
import re
from typing import List, Dict
from dataclasses import dataclass, field

@dataclass
class PlanNode:
    op_type: str
    attributes: Dict[str, str]
    node_id: int
    children: List["PlanNode"] = field(default_factory=list)

def parse_plan(plan_text: str) -> PlanNode:
    lines = [line.rstrip() for line in plan_text.strip().split("\n") if line.strip()]
    stack: List[Tuple[int, PlanNode]] = []

    node_pattern = re.compile(r"(\w+)\((.*?)\), id\s*=\s*(\d+)")
    attr_pattern = re.compile(r"(\w+)=\[(.*?)\]")

    for line in lines:
        indent = len(line) - len(line.lstrip())
        match = node_pattern.match(line.strip())
        if not match:
            continue

        op_type, raw_attrs, node_id = match.groups()
        node_id = int(node_id)

        # Parse attributes
        attributes = {k: v for k, v in attr_pattern.findall(raw_attrs)}

        node = PlanNode(op_type, attributes, node_id)

        # Attach to parent
        while stack and stack[-1][0] >= indent:
            stack.pop()
        if stack:
            stack[-1][1].children.append(node)

        stack.append((indent, node))

    # Root node is the first one in the stack
    return stack[0][1] if stack else None


In [None]:
with open("outputs.txt") as f:
    text = f.read()

# Split on physical plan sections
plans = text.split("Physical plan:")
for plan_text in plans[1:]:
    root = parse_plan(plan_text)
    print(root)


In [None]:
from graphviz import Digraph
from typing import Optional
import html

def escape_label(s):
    return html.escape(str(s))

def visualize_plan_tree(root: PlanNode, filename: Optional[str] = "plan_tree") -> Digraph:
    dot = Digraph(format='png')
    dot.attr(rankdir='TB')  # Top to bottom

    def add_nodes_edges(node: PlanNode):
        node_label = f"{node.op_type}\\n(id={node.node_id})"
        for k, v in node.attributes.items():
            node_label += f"\\n{k}=[{v}]"
        node_label = escape_label(node_label)
        dot.node(str(node.node_id), label=node_label, shape="box", style="filled", fillcolor="lightgray")

        for child in node.children:
            dot.edge(str(node.node_id), str(child.node_id))
            add_nodes_edges(child)

    add_nodes_edges(root)
    dot.render(filename, view=True)
    return dot


In [None]:
root_node = parse_plan("""BindableAggregate(group=[{}], ALTERNATIVE_NAME=[MIN($0)], VOICED_CHAR_NAME=[MIN($1)], VOICING_ACTRESS=[MIN($2)], AMERICAN_MOVIE=[MIN($3)]), id = 37720614
  BindableProject(name=[$2], name0=[$9], name2=[$35], title=[$46]), id = 37720613
    BindableJoin(condition=[=($17, $45)], joinType=[inner]), id = 37720612
      BindableJoin(condition=[=($21, $43)], joinType=[inner]), id = 37720611
        BindableJoin(condition=[=($16, $34)], joinType=[inner]), id = 37720609
          BindableJoin(condition=[AND(=($17, $30), =($31, $22))], joinType=[inner]), id = 37720607
            BindableJoin(condition=[true], joinType=[inner]), id = 37720606
              BindableProject(id=[$0], person_id=[$1], name=[$2], imdb_index=[$3], name_pcode_cf=[$4], name_pcode_nf=[$5], surname_pcode=[$6], md5sum=[$7], id1=[$15], name0=[$16], imdb_index0=[$17], imdb_id=[$18], name_pcode_nf0=[$19], surname_pcode0=[$20], md5sum0=[$21], id0=[$8], person_id0=[$9], movie_id=[$10], person_role_id=[$11], note=[$12], nr_order=[$13], role_id=[$14]), id = 37720604
                BindableJoin(condition=[=($15, $11)], joinType=[inner]), id = 37720603
                  BindableJoin(condition=[=($1, $9)], joinType=[inner]), id = 37720602
                    BindableTableScan(table=[[aka_name]]), id = 37716643
                    BindableFilter(condition=[SEARCH($4, Sarg['(voice)':VARCHAR, '(voice) (uncredited)':VARCHAR, '(voice: English version)':VARCHAR, '(voice: Japanese version)':VARCHAR]:VARCHAR)]), id = 37720601
                      BindableTableScan(table=[[cast_info]]), id = 37716659
                  BindableTableScan(table=[[char_name]]), id = 37716647
              BindableFilter(condition=[=($2, '[us]')]), id = 37720605
                BindableTableScan(table=[[company_name]]), id = 37716671
            BindableTableScan(table=[[movie_companies]]), id = 37716683
          BindableFilter(condition=[=($4, 'f')]), id = 37720608
            BindableTableScan(table=[[name]]), id = 37716695
        BindableFilter(condition=[=($1, 'actress')]), id = 37720610
          BindableTableScan(table=[[role_type]]), id = 37716707
      BindableTableScan(table=[[title]]), id = 37716719
""")
root_node
visualize_plan_tree(root_node, filename="example_plan")

In [20]:
from typing import Dict, List
from io import StringIO

class PlanCodeGenerator:
    def __init__(self, root):
        self.root = root
        self.code_lines = []
        self.visited = set()

    def generate(self) -> str:
        self.code_lines = [
            "import pandas as pd",
            "# Load CSVs (you may need to adjust paths)",
        ]
        self._gen_node_code(self.root)
        self.code_lines.append(f"\nresult = df_{self.root.node_id}")
        self.code_lines.append("print(result.head())")
        return "\n".join(self.code_lines)

    def _gen_node_code(self, node):
        if node.node_id in self.visited:
            return
        self.visited.add(node.node_id)

        # Generate code for children first
        for child in node.children:
            self._gen_node_code(child)

        op = node.op_type
        df_name = f"df_{node.node_id}"

        if op == "BindableTableScan":
            table = self._extract_table_name(node.attributes.get("table", ""))
            self.code_lines.append(f"{df_name} = pd.read_csv('{table}.csv')")

        elif op == "BindableFilter":
            child_df = f"df_{node.children[0].node_id}"
            condition = self._translate_condition(node.attributes.get("condition", ""))
            self.code_lines.append(f"{df_name} = {child_df}.query({condition})")

        elif op == "BindableProject":
            child_df = f"df_{node.children[0].node_id}"
            cols = self._extract_projection_cols(node.attributes)
            self.code_lines.append(f"{df_name} = {child_df}[[{', '.join(cols)}]]")

        elif op == "BindableJoin":
            left = f"df_{node.children[0].node_id}"
            right = f"df_{node.children[1].node_id}"
            join_type = node.attributes.get("joinType", "inner").strip('[]')
            on_cols = self._translate_join_condition(node.attributes.get("condition", ""))
            self.code_lines.append(f"{df_name} = pd.merge({left}, {right}, how='{join_type}', {on_cols})")

        elif op == "BindableAggregate":
            child_df = f"df_{node.children[0].node_id}"
            aggs = self._translate_aggregates(node.attributes)
            self.code_lines.append(f"{df_name} = {child_df}.agg({aggs}).reset_index(drop=True)")

        elif op == "BindableValues":
            self.code_lines.append(f"{df_name} = pd.DataFrame([[]])")

        else:
            self.code_lines.append(f"# Unsupported node type: {op}")
            self.code_lines.append(f"{df_name} = pd.DataFrame()  # placeholder")

    def _extract_table_name(self, table_attr):
        # table=[[movie_companies]] => movie_companies
        return table_attr.replace("[[", "").replace("]]", "").split(".")[-1]

    def _extract_projection_cols(self, attrs: Dict[str, str]) -> List[str]:
        # Extract projection columns like name=[$1], title=[$2] -> return ['"name"', '"title"']
        return [f"'{k}'" for k in attrs.keys()]

    def _translate_condition(self, cond: str) -> str:
        # Minimal translator for now; could be improved with regex parsing
        cond = cond.replace("LIKE", "str.contains")
        cond = cond.replace("AND", "and").replace("OR", "or")
        cond = cond.replace("=", "==")
        cond = cond.replace("<>", "!=")
        cond = cond.replace("true", "True").replace("false", "False")
        return f'""" {cond} """'

    def _translate_join_condition(self, cond: str) -> str:
        # Example: =($0, $1) => on=['col0', 'col1']
        matches = re.findall(r"\$([0-9]+)", cond)
        if len(matches) == 2:
            return f"left_on='col{matches[0]}', right_on='col{matches[1]}'"
        elif "AND" in cond:
            cols = re.findall(r"\$([0-9]+)", cond)
            return f"left_on={[f'col{c}' for c in cols[::2]]}, right_on={[f'col{c}' for c in cols[1::2]]}"
        else:
            return "# TODO: Complex join condition"

    def _translate_aggregates(self, attrs: Dict[str, str]) -> str:
        agg_map = {}
        for k, v in attrs.items():
            match = re.search(r'MIN\(\$(\d+)\)', v)
            if match:
                col = f"col{match.group(1)}"
                agg_map[k.lower()] = f"('{col}', 'min')"
            # You can add other aggregate support here (e.g. MAX, SUM, etc.)
    
        return "{" + ", ".join(f"'{k}': {v}" for k, v in agg_map.items()) + "}"


In [22]:
plan_text = """BindableAggregate(group=[{}], ALTERNATIVE_NAME=[MIN($0)], VOICED_CHAR_NAME=[MIN($1)], VOICING_ACTRESS=[MIN($2)], AMERICAN_MOVIE=[MIN($3)]), id = 37720614
  BindableProject(name=[$2], name0=[$9], name2=[$35], title=[$46]), id = 37720613
    BindableJoin(condition=[=($17, $45)], joinType=[inner]), id = 37720612
      BindableJoin(condition=[=($21, $43)], joinType=[inner]), id = 37720611
        BindableJoin(condition=[=($16, $34)], joinType=[inner]), id = 37720609
          BindableJoin(condition=[AND(=($17, $30), =($31, $22))], joinType=[inner]), id = 37720607
            BindableJoin(condition=[true], joinType=[inner]), id = 37720606
              BindableProject(id=[$0], person_id=[$1], name=[$2], imdb_index=[$3], name_pcode_cf=[$4], name_pcode_nf=[$5], surname_pcode=[$6], md5sum=[$7], id1=[$15], name0=[$16], imdb_index0=[$17], imdb_id=[$18], name_pcode_nf0=[$19], surname_pcode0=[$20], md5sum0=[$21], id0=[$8], person_id0=[$9], movie_id=[$10], person_role_id=[$11], note=[$12], nr_order=[$13], role_id=[$14]), id = 37720604
                BindableJoin(condition=[=($15, $11)], joinType=[inner]), id = 37720603
                  BindableJoin(condition=[=($1, $9)], joinType=[inner]), id = 37720602
                    BindableTableScan(table=[[aka_name]]), id = 37716643
                    BindableFilter(condition=[SEARCH($4, Sarg['(voice)':VARCHAR, '(voice) (uncredited)':VARCHAR, '(voice: English version)':VARCHAR, '(voice: Japanese version)':VARCHAR]:VARCHAR)]), id = 37720601
                      BindableTableScan(table=[[cast_info]]), id = 37716659
                  BindableTableScan(table=[[char_name]]), id = 37716647
              BindableFilter(condition=[=($2, '[us]')]), id = 37720605
                BindableTableScan(table=[[company_name]]), id = 37716671
            BindableTableScan(table=[[movie_companies]]), id = 37716683
          BindableFilter(condition=[=($4, 'f')]), id = 37720608
            BindableTableScan(table=[[name]]), id = 37716695
        BindableFilter(condition=[=($1, 'actress')]), id = 37720610
          BindableTableScan(table=[[role_type]]), id = 37716707
      BindableTableScan(table=[[title]]), id = 37716719"""
root = parse_plan(plan_text)
generator = PlanCodeGenerator(root)
code = generator.generate()

print(code)

import pandas as pd
# Load CSVs (you may need to adjust paths)
df_37716643 = pd.read_csv('[aka_name.csv')
df_37716659 = pd.read_csv('[cast_info.csv')
df_37720601 = df_37716659.query(""" SEARCH($4, Sarg['(voice)':VARCHAR, '(voice) (uncredited)':VARCHAR, '(voice: English version)':VARCHAR, '(voice: Japanese version)':VARCHAR """)
df_37720602 = pd.merge(df_37716643, df_37720601, how='inner', left_on='col1', right_on='col9')
df_37716647 = pd.read_csv('[char_name.csv')
df_37720603 = pd.merge(df_37720602, df_37716647, how='inner', left_on='col15', right_on='col11')
df_37720604 = df_37720603[['id', 'person_id', 'name', 'imdb_index', 'name_pcode_cf', 'name_pcode_nf', 'surname_pcode', 'md5sum', 'id1', 'name0', 'imdb_index0', 'imdb_id', 'name_pcode_nf0', 'surname_pcode0', 'md5sum0', 'id0', 'person_id0', 'movie_id', 'person_role_id', 'note', 'nr_order', 'role_id']]
df_37716671 = pd.read_csv('[company_name.csv')
df_37720605 = df_37716671.query(""" ==($2, '[us """)
df_37720606 = pd.merge(df_3772060

In [1]:
import pandas as pd
from collections import defaultdict

# example DataFrames
# R1 has column 'a'
# R2 has columns 'a', 'b'
# R3 has column 'b'
# replace these with your actual DataFrames
df1 = pd.DataFrame({'a': [1,2,2,3]})
df2 = pd.DataFrame({'a': [1,2,2,4], 'b': ['x','y','z','y']})
df3 = pd.DataFrame({'b': ['x','y','w'], 'c': [100,200,300]})

# 1) build hash table for R2 on key = a
ht2 = defaultdict(list)
for _, row in df2.iterrows():
    ht2[row['a']].append(row)

# 2) build hash table for R3 on key = b
ht3 = defaultdict(list)
for _, row in df3.iterrows():
    ht3[row['b']].append(row)

# 3) nested‐iterator join
results = []
for _, r1 in df1.iterrows():
    # find all R2 rows where R2.a == R1.a
    for r2 in ht2.get(r1['a'], []):
        # for each, find all R3 rows where R3.b == R2.b
        for r3 in ht3.get(r2['b'], []):
            # combine the three rows into one result row.
            # here we prefix columns to avoid name clashes.
            out = {}
            out.update({f"R1_{col}": r1[col] for col in df1.columns})
            out.update({f"R2_{col}": r2[col] for col in df2.columns})
            out.update({f"R3_{col}": r3[col] for col in df3.columns})
            results.append(out)

# 4) materialize as a DataFrame
joined_df = pd.DataFrame(results)

print(joined_df)


   R1_a  R2_a R2_b R3_b  R3_c
0     1     1    x    x   100
1     2     2    y    y   200
2     2     2    y    y   200


In [11]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
import numpy as np
from collections import defaultdict

def build_ht(df, key):
    ht = defaultdict(list)
    for _, row in df.iterrows():
        ht[row[key]].append(row)
    return ht

def join_chunk(chunk_df1, ht2, ht3):
    out = []
    for _, r1 in chunk_df1.iterrows():
        for r2 in ht2.get(r1['a'], ()):
            for r3 in ht3.get(r2['b'], ()):
                row = {f"R1_{c}": r1[c] for c in chunk_df1.columns}
                row.update({f"R2_{c}": r2[c] for c in r2.index})
                row.update({f"R3_{c}": r3[c] for c in r3.index})
                out.append(row)
    return out

# Pre-build once
ht2 = build_ht(df2, 'a')
ht3 = build_ht(df3, 'b')

# Split R1 into N roughly even chunks
num_threads = 4
chunks = np.array_split(df1, num_threads)

results = []
with ThreadPoolExecutor(max_workers=num_threads) as exe:
    futures = [exe.submit(join_chunk, chunk, ht2, ht3) for chunk in chunks]
    for f in as_completed(futures):
        results.extend(f.result())

joined = pd.DataFrame(results)
joined

  return bound(*args, **kwds)


Unnamed: 0,R1_a,R2_a,R2_b,R3_b,R3_c
0,2,2,y,y,200
1,2,2,y,y,200
2,1,1,x,x,100


In [5]:
!pip install numpy

Defaulting to user installation because normal site-packages is not writeable


In [5]:
import pandas as pd
import numpy as np
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import psutil
import os

# Example DataFrames (replace with your real ones)
df1 = pd.DataFrame({'a': np.random.randint(0, 100, size=10000)})
df2 = pd.DataFrame({
    'a': np.random.randint(0, 100, size=10000),
    'b': np.random.randint(0, 100, size=10000)
})
df3 = pd.DataFrame({
    'b': np.random.randint(0, 100, size=10000),
    'c': np.random.randn(10000)
})

proc = psutil.Process(os.getpid())

def mem_mb():
    return proc.memory_info().rss / 1024**2

def build_ht(df, key):
    ht = defaultdict(list)
    for row in df.itertuples(index=False):
        ht[getattr(row, key)].append(row)
    return ht

def join_chunk(chunk_df1, ht2, ht3):
    out = []
    for r1 in chunk_df1.itertuples(index=False):
        for r2 in ht2.get(r1.a, ()):
            for r3 in ht3.get(r2.b, ()):
                out.append((r1, r2, r3))
    return out

def parallel_hash_join(df1, df2, df3, num_threads=4):
    # 1) record mem before anything
    mem_before = mem_mb()
    t0 = time.perf_counter()

    # 2) build hash tables
    ht2 = build_ht(df2, 'a')
    ht3 = build_ht(df3, 'b')
    t1 = time.perf_counter()
    mem_after_build = mem_mb()

    # 3) split and dispatch
    chunks = np.array_split(df1, num_threads)
    results = []
    with ThreadPoolExecutor(max_workers=num_threads) as exe:
        futures = [exe.submit(join_chunk, chunk, ht2, ht3) for chunk in chunks]
        for f in as_completed(futures):
            results.extend(f.result())

    t2 = time.perf_counter()
    mem_after_join = mem_mb()

    # 4) stats
    total_time = t2 - t0
    build_time = t1 - t0
    join_time  = t2 - t1
    n_tuples   = len(results)
    ht_memory  = mem_after_build - mem_before
    join_memory = mem_after_join - mem_after_build
    total_memory = mem_after_join - mem_before

    # 5) report
    print(f"=== Timing ===")
    print(f" Total elapsed       : {total_time:.3f} s")
    print(f"  - Hash-build phase : {build_time :.3f} s")
    print(f"  - Join phase       : {join_time  :.3f} s")
    print()
    print(f"=== Output ===")
    print(f" Total tuples produced: {n_tuples}")
    print()
    print(f"=== Memory (RSS) ===")
    print(f" Before              : {mem_before   :.1f} MB")
    print(f" After build         : {mem_after_build:.1f} MB")
    print(f" After join          : {mem_after_join :.1f} MB")
    print(f" -------------------------")
    print(f" Memory for ht build : {ht_memory  :.1f} MB")
    print(f" Memory for join     : {join_memory:.1f} MB")
    print(f" Total bump          : {total_memory:.1f} MB")

    return pd.DataFrame(results,
                        columns=['R1_row','R2_row','R3_row'])

# Run it
joined_df = parallel_hash_join(df1, df2, df3, num_threads=4)


  return bound(*args, **kwds)


=== Timing ===
 Total elapsed       : 71.434 s
  - Hash-build phase : 0.024 s
  - Join phase       : 71.410 s

=== Output ===
 Total tuples produced: 99916364

=== Memory (RSS) ===
 Before              : 186.7 MB
 After build         : 188.2 MB
 After join          : 7838.2 MB
 -------------------------
 Memory for ht build : 1.5 MB
 Memory for join     : 7650.0 MB
 Total bump          : 7651.5 MB


In [1]:
import pandas as pd
import numpy as np
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import psutil
import os

# Example DataFrames (replace with your real ones)
df1 = pd.DataFrame({'a': np.random.randint(0, 100, size=10000)})
df2 = pd.DataFrame({
    'a': np.random.randint(0, 100, size=10000),
    'b': np.random.randint(0, 100, size=10000)
})
df3 = pd.DataFrame({
    'b': np.random.randint(0, 100, size=10000),
    'c': np.random.randn(10000)
})

proc = psutil.Process(os.getpid())

def build_ht_chunk(rows, key):
    """Build a partial hash‐table from an iterable of rows."""
    partial = defaultdict(list)
    for r in rows:
        partial[getattr(r, key)].append(r)
    return partial

def merge_hash_tables(dicts):
    """Merge a list of defaultdict(list) into one."""
    ht = defaultdict(list)
    for part in dicts:
        for k, lst in part.items():
            ht[k].extend(lst)
    return ht

def parallel_build_ht(df, key, num_workers=4):
    # 1) split the DataFrame into DataFrame‐chunks (not iterators)
    df_chunks = np.array_split(df, num_workers)

    # 2) in each thread, build a partial hash‐table over its chunk
    partials = []
    with ThreadPoolExecutor(max_workers=num_workers) as exe:
        futures = [
            exe.submit(build_ht_chunk, chunk.itertuples(index=False), key)
            for chunk in df_chunks
        ]
        for f in as_completed(futures):
            partials.append(f.result())

    # 3) merge them into one big hash‐table
    return merge_hash_tables(partials)

def mem_mb():
    return proc.memory_info().rss / 1024**2

def build_ht(df, key):
    ht = defaultdict(list)
    for row in df.itertuples(index=False):
        ht[getattr(row, key)].append(row)
    return ht

def join_chunk(chunk_df1, ht2, ht3):
    out = []
    for r1 in chunk_df1.itertuples(index=False):
        for r2 in ht2.get(r1.a, ()):
            for r3 in ht3.get(r2.b, ()):
                out.append((r1, r2, r3))
    return out

def parallel_hash_join(df1, df2, df3, num_threads=4):
    # 1) record mem before anything
    mem_before = mem_mb()
    t0 = time.perf_counter()

    # 2) build hash tables
    ht2 = parallel_build_ht(df2, 'a', num_workers=4)
    ht3 = parallel_build_ht(df3, 'b', num_workers=4)
    t1 = time.perf_counter()
    mem_after_build = mem_mb()

    # 3) split and dispatch
    chunks = np.array_split(df1, num_threads)
    results = []
    with ThreadPoolExecutor(max_workers=num_threads) as exe:
        futures = [exe.submit(join_chunk, chunk, ht2, ht3) for chunk in chunks]
        for f in as_completed(futures):
            results.extend(f.result())

    t2 = time.perf_counter()
    mem_after_join = mem_mb()

    # 4) stats
    total_time = t2 - t0
    build_time = t1 - t0
    join_time  = t2 - t1
    n_tuples   = len(results)
    ht_memory  = mem_after_build - mem_before
    join_memory = mem_after_join - mem_after_build
    total_memory = mem_after_join - mem_before

    # 5) report
    print(f"=== Timing ===")
    print(f" Total elapsed       : {total_time:.3f} s")
    print(f"  - Hash-build phase : {build_time :.3f} s")
    print(f"  - Join phase       : {join_time  :.3f} s")
    print()
    print(f"=== Output ===")
    print(f" Total tuples produced: {n_tuples}")
    print()
    print(f"=== Memory (RSS) ===")
    print(f" Before              : {mem_before   :.1f} MB")
    print(f" After build         : {mem_after_build:.1f} MB")
    print(f" After join          : {mem_after_join :.1f} MB")
    print(f" -------------------------")
    print(f" Memory for ht build : {ht_memory  :.1f} MB")
    print(f" Memory for join     : {join_memory:.1f} MB")
    print(f" Total bump          : {total_memory:.1f} MB")

    return pd.DataFrame(results,
                        columns=['R1_row','R2_row','R3_row'])

# Run it
joined_df = parallel_hash_join(df1, df2, df3, num_threads=4)


  return bound(*args, **kwds)


=== Timing ===
 Total elapsed       : 60.011 s
  - Hash-build phase : 0.044 s
  - Join phase       : 59.966 s

=== Output ===
 Total tuples produced: 100221691

=== Memory (RSS) ===
 Before              : 126.1 MB
 After build         : 128.6 MB
 After join          : 7801.6 MB
 -------------------------
 Memory for ht build : 2.5 MB
 Memory for join     : 7673.0 MB
 Total bump          : 7675.5 MB


In [3]:
import pandas as pd

# Sample DataFrames (replace with your actual data)
R1 = pd.DataFrame({'a': [1, 2, 3]})
R2 = pd.DataFrame({'a': [1, 2, 4], 'b': ['x', 'y', 'z']})
R3 = pd.DataFrame({'b': ['x', 'y', 'w']})

def hash_join(df1, df2, key1, key2=None, how='inner'):
    """Performs a hash join between df1 and df2 on given keys."""
    if key2 is None:
        key2 = key1
    hash_table = {}
    # Build phase on df2
    for _, row in df2.iterrows():
        key = row[key2]
        if key not in hash_table:
            hash_table[key] = []
        hash_table[key].append(row)
    
    result_rows = []
    # Probe phase on df1
    for _, row1 in df1.iterrows():
        key = row1[key1]
        if key in hash_table:
            for row2 in hash_table[key]:
                combined = pd.concat([row1, row2.drop(labels=key2)], axis=0)
                result_rows.append(combined)
    
    return pd.DataFrame(result_rows)

# Step 1: Join R2 and R3 on 'b'
R2_R3_joined = hash_join(R2, R3, key1='b')

# Step 2: Join the result with R1 on 'a'
final_result = hash_join(R1, R2_R3_joined, key1='a')

print(final_result)


   a  b
0  1  x
1  2  y


In [1]:
import pandas as pd
import numpy as np
import psutil
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

# Sample data
R1 = pd.DataFrame({'a': np.random.randint(0, 100, size=1000)})
R2 = pd.DataFrame({
    'a': np.random.randint(0, 100, size=1000),
    'b': np.random.randint(0, 100, size=1000)
})
R3 = pd.DataFrame({
    'b': np.random.randint(0, 100, size=1000),
    'c': np.random.randn(1000)
})

proc = psutil.Process(os.getpid())

def build_hash_table(df, key):
    """Builds hash table on key column from df."""
    hash_table = {}
    for _, row in df.iterrows():
        k = row[key]
        if k not in hash_table:
            hash_table[k] = []
        hash_table[k].append(row)
    return hash_table

def mem_mb():
    return proc.memory_info().rss / 1024**2

def probe_partition(df_partition, hash_table, key1, key2):
    """Probe a partition of df against a hash table."""
    result_rows = []
    for _, row1 in df_partition.iterrows():
        key = row1[key1]
        if key in hash_table:
            for row2 in hash_table[key]:
                combined = pd.concat([row1, row2.drop(labels=key2)], axis=0)
                result_rows.append(combined)
    return pd.DataFrame(result_rows)

def parallel_hash_join(df1, df2, key1, key2=None, threads=4):
    if key2 is None:
        key2 = key1

    mem_before = mem_mb()
    t0 = time.perf_counter()

    # Build phase on df2
    hash_table = build_hash_table(df2, key2)
    t1 = time.perf_counter()
    mem_after_build = mem_mb()

    # Partition df1 into chunks
    partitions = [df1.iloc[i::threads] for i in range(threads)]

    # Parallel probe phase
    results = []
    with ThreadPoolExecutor(max_workers=threads) as executor:
        futures = [executor.submit(probe_partition, part, hash_table, key1, key2)
                   for part in partitions]
        for future in as_completed(futures):
            results.append(future.result())

    result = pd.concat(results, ignore_index=True)
    
    t2 = time.perf_counter()
    mem_after_join = mem_mb()

    # 4) stats
    total_time = t2 - t0
    build_time = t1 - t0
    join_time  = t2 - t1
    n_tuples   = len(results)
    ht_memory  = mem_after_build - mem_before
    join_memory = mem_after_join - mem_after_build
    total_memory = mem_after_join - mem_before

    # 5) report
    print(f"=== Timing ===")
    print(f" Total elapsed       : {total_time:.3f} s")
    print(f"  - Hash-build phase : {build_time :.3f} s")
    print(f"  - Join phase       : {join_time  :.3f} s")
    print()
    print(f"=== Output ===")
    print(f" Total tuples produced: {n_tuples}")
    print()
    print(f"=== Memory (RSS) ===")
    print(f" Before              : {mem_before   :.1f} MB")
    print(f" After build         : {mem_after_build:.1f} MB")
    print(f" After join          : {mem_after_join :.1f} MB")
    print(f" -------------------------")
    print(f" Memory for ht build : {ht_memory  :.1f} MB")
    print(f" Memory for join     : {join_memory:.1f} MB")
    print(f" Total bump          : {total_memory:.1f} MB")

    return result

# Join R2 and R3 on 'b'
R2_R3_joined = parallel_hash_join(R2, R3, key1='b', threads=4)

# Join result with R1 on 'a'
final_result = parallel_hash_join(R1, R2_R3_joined, key1='a', threads=4)
final_result

=== Timing ===
 Total elapsed       : 4.913 s
  - Hash-build phase : 0.034 s
  - Join phase       : 4.879 s

=== Output ===
 Total tuples produced: 4

=== Memory (RSS) ===
 Before              : 125.3 MB
 After build         : 126.8 MB
 After join          : 153.2 MB
 -------------------------
 Memory for ht build : 1.5 MB
 Memory for join     : 26.4 MB
 Total bump          : 27.9 MB
=== Timing ===
 Total elapsed       : 46.411 s
  - Hash-build phase : 0.365 s
  - Join phase       : 46.046 s

=== Output ===
 Total tuples produced: 4

=== Memory (RSS) ===
 Before              : 148.2 MB
 After build         : 148.2 MB
 After join          : 308.2 MB
 -------------------------
 Memory for ht build : 0.0 MB
 Memory for join     : 160.0 MB
 Total bump          : 160.0 MB


Unnamed: 0,a,b,c
0,23.0,5.0,-0.436417
1,23.0,5.0,-0.238719
2,23.0,5.0,-0.595987
3,23.0,5.0,0.476777
4,23.0,5.0,-0.165180
...,...,...,...
99718,28.0,33.0,-0.444299
99719,28.0,33.0,-1.149624
99720,28.0,33.0,-1.261386
99721,28.0,33.0,1.303021


In [1]:
import re

def parse_bindable_aggregate(line, input_cols):
    """
    Parse a single BindableAggregate spec and return
    (group_cols, agg_dict) where
      - group_cols is a list of column names to group by,
      - agg_dict is the dict you can pass to pandas .agg().
    
    Parameters
    ----------
    line : str
        e.g. "BindableAggregate(group=[{COMPANY}], FROM_COMPANY=[MIN($0)], LINK_TYPE=[MIN($1)], SEQUEL_MOVIE=[MIN($2)])"
    input_cols : List[str]
        e.g. ['from_company', 'link_type', 'sequel_movie']
    
    Returns
    -------
    group_cols : List[str]
    agg_dict : Dict[str, Tuple[str,str]]
    """
    # 1) pull out group list
    grp_match = re.search(r'group=\[\{(.*?)\}\]', line)
    if grp_match and grp_match.group(1).strip():
        group_cols = [col.strip() for col in grp_match.group(1).split(',')]
    else:
        group_cols = []
    
    # 2) pull every output=[FUNC($i)] chunk
    pairs = re.findall(r'(\w+)=\[\s*(\w+)\s*\(\s*\$(\d+)\s*\)\s*\]', line)
    
    # 3) build the agg dict
    agg_dict = {}
    for out_col, func, idx in pairs:
        i = int(idx)
        try:
            in_col = input_cols[i]
        except IndexError:
            raise ValueError(f"No input column at position ${i}")
        agg_dict[out_col] = (in_col, func.lower())
    
    return group_cols, agg_dict

# Example usage:

line = (
    "BindableAggregate(group=[{}], "
    "UNCREDITED_VOICED_CHARACTER=[MIN($0)], "
    "RUSSIAN_MOVIE=[MIN($1)])"
)
input_cols = ['uncited_voiced_character', 'russian_movie']

group_cols, mapping = parse_bindable_aggregate(line, input_cols)

# Now you can do:
#    df.groupby(group_cols).agg(mapping)
#
# For this example, since group_cols==[],
# it becomes:
#    df.agg(mapping)

print("group by:", group_cols)
print("agg map:", mapping)


group by: []
agg map: {'UNCREDITED_VOICED_CHARACTER': ('uncited_voiced_character', 'min'), 'RUSSIAN_MOVIE': ('russian_movie', 'min')}


In [29]:
import pandas as pd

# Define the parser function
import re
def parse_bindable_aggregate(line, input_cols):
    grp_match = re.search(r'group=\[\{(.*?)\}\]', line)
    group_cols = [c.strip() for c in grp_match.group(1).split(',')] if grp_match and grp_match.group(1).strip() else []
    pairs = re.findall(r'(\w+)=\[\s*(\w+)\s*\(\s*\$(\d+)\)\s*\]', line)
    return group_cols, {out_col: (input_cols[int(idx)], func.lower()) for out_col, func, idx in pairs}

# Create example DataFrame
df = pd.DataFrame({
    'category': ['A', 'A', 'B', 'B', 'C'],
    'id': [1, 2, 3, 4, 5],
    'value': [10, 20, 30, 40, 50],
    'score': [5, 3, 6, 2, 4]
})

# Define BindableAggregate line and input_cols
line = "group=[{}], SUM_VALUE=[SUM($2)], MIN_SCORE=[MIN($3)]"
input_cols = ['category', 'id', 'value', 'score']

# Parse to get group columns and aggregation mapping
group_cols, agg_map = parse_bindable_aggregate(line, input_cols)

print(group_cols)
print(agg_map)

if group_cols:
    # regular GROUP BY
    result = (
        df
        .groupby(group_cols)
        .agg(**agg_map)
        .reset_index()
    )
else:
    # no GROUP BY → aggregate entire frame
    result = df.agg(**agg_map)

result


[]
{'SUM_VALUE': ('value', 'sum'), 'MIN_SCORE': ('score', 'min')}


Unnamed: 0,value,score
SUM_VALUE,150.0,
MIN_SCORE,,2.0
