In [4]:
import sqlglot

def extract_ctes_and_tables(sql):
    # Parse the SQL query into an AST
    expressions = sqlglot.parse_one(sql)
    
    cte_dict = {}

    # Check if the query contains CTEs
    if expressions.ctes:
        for cte in expressions.ctes:
            cte_name = cte.alias_or_name
            tables = []

            # Extract tables used in the CTE
            for table in cte.find_all(sqlglot.exp.Table):
                tables.append(table.name)
            
            cte_dict[cte_name] = tables

    return cte_dict

# Sample SQL with CTEs
sql_query = """
WITH cte1 AS (
    SELECT a.id, b.name
    FROM hellow a
    JOIN world3 b ON a.id = b.id
),
cte2 AS (
    SELECT id, COUNT(*)
    FROM cte1
    GROUP BY id
)
SELECT * FROM cte2;
"""

# Extract CTEs and their associated tables
ctes_and_tables = extract_ctes_and_tables(sql_query)
print(ctes_and_tables)


{'cte1': ['hellow', 'world3'], 'cte2': ['cte1']}


In [1]:
import sqlglot

def extract_tables_by_subquery(sql):
    # Parse the SQL query into an AST
    expressions = sqlglot.parse_one(sql)
    
    subquery_tables = []

    # Recursive function to extract tables from subqueries
    def extract_tables(node, level=0):
        if isinstance(node, sqlglot.expressions.Subquery):
            tables = [table.name for table in node.find_all(sqlglot.exp.Table)]
            subquery_tables.append({"subquery_level": level, "tables": tables})
        for child in node.iter_expressions():
            extract_tables(child, level + 1)

    # Start extracting tables from the root expression
    extract_tables(expressions)
    return subquery_tables

# Sample SQL query with subqueries
sql_query = """
WITH cte1 AS (
    SELECT a.id, b.name
    FROM table1 a
    JOIN table2 b ON a.id = b.id
),
cte2 AS (
    SELECT id, COUNT(*)
    FROM cte1
    GROUP BY id
)
SELECT * 
FROM (
    SELECT id
    FROM cte2
    WHERE id IN (SELECT DISTINCT id FROM table3)
) subquery_alias;
"""

# Extract tables grouped by subquery
tables_by_subquery = extract_tables_by_subquery(sql_query)

# Display results
for entry in tables_by_subquery:
    print(f"Subquery Level: {entry['subquery_level']}, Tables: {entry['tables']}")


Subquery Level: 2, Tables: ['cte2', 'table3']
Subquery Level: 6, Tables: ['table3']


In [2]:
import sqlglot

def extract_tables_by_subquery(sql):
    # Parse the SQL query into an AST
    expressions = sqlglot.parse_one(sql)
    
    subquery_tables = []

    # Recursive function to extract tables from subqueries
    def extract_tables(node, current_level=0):
        if isinstance(node, sqlglot.expressions.Subquery) or isinstance(node, sqlglot.expressions.CTE):
            # Find all table references in this subquery
            tables = [table.name for table in node.find_all(sqlglot.exp.Table)]
            subquery_tables.append({"subquery_level": current_level, "tables": tables})
            
            # Recurse into the subquery itself
            for child in node.iter_expressions():
                extract_tables(child, current_level + 1)
        else:
            # Recurse for non-subquery nodes
            for child in node.iter_expressions():
                extract_tables(child, current_level)

    # Start extracting tables from the root expression
    extract_tables(expressions)
    
    # Deduplicate results for clarity
    return subquery_tables

# Sample SQL query with subqueries
sql_query = """
WITH cte1 AS (
    SELECT a.id, b.name
    FROM table1 a
    JOIN table2 b ON a.id = b.id
),
cte2 AS (
    SELECT id, COUNT(*)
    FROM cte1
    GROUP BY id
)
SELECT * 
FROM (
    SELECT id
    FROM cte2
    WHERE id IN (SELECT DISTINCT id FROM table3)
) subquery_alias;
"""

# Extract tables grouped by subquery
tables_by_subquery = extract_tables_by_subquery(sql_query)

# Display results
for entry in tables_by_subquery:
    print(f"Subquery Level: {entry['subquery_level']}, Tables: {entry['tables']}")


Subquery Level: 0, Tables: ['cte2', 'table3']
Subquery Level: 1, Tables: ['table3']
Subquery Level: 0, Tables: ['table1', 'table2']
Subquery Level: 0, Tables: ['cte1']


In [3]:
import sqlglot

def extract_tables_by_subquery(sql):
    # Parse the SQL query into an AST
    expressions = sqlglot.parse_one(sql)
    
    subquery_tables = []

    # Recursive function to extract tables from subqueries
    def extract_tables(node, current_level=0):
        if isinstance(node, sqlglot.expressions.CTE):
            # Handle CTEs explicitly
            cte_name = node.alias_or_name
            tables = [table.name for table in node.find_all(sqlglot.exp.Table)]
            subquery_tables.append({"subquery_type": "CTE", "name": cte_name, "level": current_level, "tables": tables})
        
        elif isinstance(node, sqlglot.expressions.Subquery):
            # Handle subqueries explicitly
            tables = [table.name for table in node.find_all(sqlglot.exp.Table)]
            subquery_tables.append({"subquery_type": "Subquery", "name": None, "level": current_level, "tables": tables})
            
            # Recurse into the subquery for nested structures
            for child in node.iter_expressions():
                extract_tables(child, current_level + 1)
        else:
            # Recurse through other nodes without changing the level
            for child in node.iter_expressions():
                extract_tables(child, current_level)

    # Start extracting tables from the root expression
    extract_tables(expressions)

    # Deduplicate results for clarity
    return subquery_tables

# Sample SQL query with subqueries and CTEs
sql_query = """
WITH cte1 AS (
    SELECT a.id, b.name
    FROM table1 a
    JOIN table2 b ON a.id = b.id
),
cte2 AS (
    SELECT id, COUNT(*)
    FROM cte1
    GROUP BY id
)
SELECT * 
FROM (
    SELECT id
    FROM cte2
    WHERE id IN (SELECT DISTINCT id FROM table3)
) subquery_alias;
"""

# Extract tables grouped by subquery
tables_by_subquery = extract_tables_by_subquery(sql_query)

# Display results
for entry in tables_by_subquery:
    subquery_type = entry["subquery_type"]
    name = entry.get("name", "N/A")
    level = entry["level"]
    tables = entry["tables"]
    print(f"{subquery_type} (Level {level}): Name = {name}, Tables = {tables}")


Subquery (Level 0): Name = None, Tables = ['cte2', 'table3']
Subquery (Level 1): Name = None, Tables = ['table3']
CTE (Level 0): Name = cte1, Tables = ['table1', 'table2']
CTE (Level 0): Name = cte2, Tables = ['cte1']


In [4]:
import sqlglot

def extract_tables_by_subquery(sql):
    # Parse the SQL query into an AST
    expressions = sqlglot.parse_one(sql)
    
    subquery_tables = []

    # Recursive function to extract tables from subqueries
    def extract_tables(node, current_level=0):
        if isinstance(node, sqlglot.expressions.CTE):
            # Handle CTEs explicitly
            cte_name = node.alias_or_name
            tables = [table.name for table in node.find_all(sqlglot.exp.Table)]
            subquery_tables.append({"subquery_type": "CTE", "name": cte_name, "level": current_level, "tables": tables})
        
        elif isinstance(node, sqlglot.expressions.Subquery):
            # Handle subqueries explicitly
            tables = [table.name for table in node.find_all(sqlglot.exp.Table)]
            subquery_tables.append({"subquery_type": "Subquery", "name": None, "level": current_level, "tables": tables})
            
            # Recurse into the subquery for nested structures
            for child in node.iter_expressions():
                extract_tables(child, current_level + 1)
        else:
            # Recurse through other nodes without changing the level
            for child in node.iter_expressions():
                extract_tables(child, current_level)

    # Start extracting tables from the root expression
    extract_tables(expressions)

    # Sort results by level and type (CTE first, then Subquery)
    subquery_tables.sort(key=lambda x: (x['level'], x['subquery_type']))
    return subquery_tables

# Sample SQL query with subqueries and CTEs
sql_query = """
WITH cte1 AS (
    SELECT a.id, b.name
    FROM table1 a
    JOIN table2 b ON a.id = b.id
),
cte2 AS (
    SELECT id, COUNT(*)
    FROM cte1
    GROUP BY id
)
SELECT * 
FROM (
    SELECT id
    FROM cte2
    WHERE id IN (SELECT DISTINCT id FROM table3)
) subquery_alias;
"""

# Extract tables grouped by subquery
tables_by_subquery = extract_tables_by_subquery(sql_query)

# Display results
for entry in tables_by_subquery:
    subquery_type = entry["subquery_type"]
    name = entry.get("name", "N/A")
    level = entry["level"]
    tables = entry["tables"]
    print(f"{subquery_type} (Level {level}): Name = {name}, Tables = {tables}")


CTE (Level 0): Name = cte1, Tables = ['table1', 'table2']
CTE (Level 0): Name = cte2, Tables = ['cte1']
Subquery (Level 0): Name = None, Tables = ['cte2', 'table3']
Subquery (Level 1): Name = None, Tables = ['table3']


In [None]:
# Adding support for table aliases in the output

import sqlglot

def extract_tables_with_aliases(sql):
    # Parse the SQL query into an AST
    expressions = sqlglot.parse_one(sql)
    
    subquery_tables = []

    # Recursive function to extract tables and their aliases
    def extract_tables(node, current_level=0):
        if isinstance(node, sqlglot.expressions.CTE):
            # Handle CTEs explicitly
            cte_name = node.alias_or_name
            tables_with_aliases = {
                table.alias_or_name: table.name for table in node.find_all(sqlglot.exp.Table)
            }
            subquery_tables.append({
                "subquery_type": "CTE",
                "name": cte_name,
                "level": current_level,
                "tables_with_aliases": tables_with_aliases
            })
        
        elif isinstance(node, sqlglot.expressions.Subquery):
            # Handle subqueries explicitly
            tables_with_aliases = {
                table.alias_or_name: table.name for table in node.find_all(sqlglot.exp.Table)
            }
            subquery_tables.append({
                "subquery_type": "Subquery",
                "name": None,
                "level": current_level,
                "tables_with_aliases": tables_with_aliases
            })
            
            # Recurse into the subquery for nested structures
            for child in node.iter_expressions():
                extract_tables(child, current_level + 1)
        else:
            # Recurse through other nodes without changing the level
            for child in node.iter_expressions():
                extract_tables(child, current_level)

    # Start extracting tables from the root expression
    extract_tables(expressions)

    # Sort results by level and type (CTE first, then Subquery)
    subquery_tables.sort(key=lambda x: (x['level'], x['subquery_type']))
    return subquery_tables

# Sample SQL query with subqueries and CTEs
sql_query = """
WITH cte1 AS (
    SELECT a.id, b.name
    FROM table1 a
    JOIN table2 b ON a.id = b.id
),
cte2 AS (
    SELECT id, COUNT(*)
    FROM cte1
    GROUP BY id
)
SELECT * 
FROM (
    SELECT id
    FROM cte2
    WHERE id IN (SELECT DISTINCT id FROM table3)
) subquery_alias;
"""

# Extract tables with aliases grouped by subquery
tables_by_subquery = extract_tables_with_aliases(sql_query)

# Display results
for entry in tables_by_subquery:
    subquery_type = entry["subquery_type"]
    name = entry.get("name", "N/A")
    level = entry["level"]
    tables_with_aliases = entry["tables_with_aliases"]
    print(f"{subquery_type} (Level {level}): Name = {name}")
    for alias, table in tables_with_aliases.items():
        print(f"  Alias: {alias}, Table: {table}")


CTE (Level 0): Name = cte1
  Alias: a, Table: table1
  Alias: b, Table: table2
CTE (Level 0): Name = cte2
  Alias: cte1, Table: cte1
Subquery (Level 0): Name = None
  Alias: cte2, Table: cte2
  Alias: table3, Table: table3
Subquery (Level 1): Name = None
  Alias: table3, Table: table3
