## table_statistics

In [1]:
import psycopg2
import csv

# 连接到 PostgreSQL 数据库
conn = psycopg2.connect(
    database="tpcds",
    user="postgres",
    password="postgres",
    host="127.0.0.1",
    port="5432"
)

# 创建一个数据库游标
cur = conn.cursor()

# 执行查询
cur.execute("""
    SELECT
        schemaname,
        tablename,
        attname AS column_name,
        null_frac AS null_fraction,
        n_distinct AS distinct_values,
        correlation
    FROM
        pg_stats
    WHERE
        schemaname NOT LIKE 'pg_%' AND
        schemaname != 'information_schema'
    ORDER BY
        schemaname,
        tablename,
        column_name
""")

# 获取查询结果
query_result = cur.fetchall()

# 关闭数据库连接
cur.close()
conn.close()

# 将结果写入 CSV 文件
csv_file_path = "./information/table_statistics.csv"
with open(csv_file_path, mode='w', newline='') as file:
    writer = csv.writer(file)
    
    # 写入表头
    writer.writerow(["Schema", "Table", "Column", "Null Fraction", "Distinct Values", "Correlation"])
    
    # 写入数据
    writer.writerows(query_result)

print(f"CSV file saved to: {csv_file_path}")


CSV file saved to: ./information/table_statistics.csv


In [3]:
import csv
import psycopg2

# 连接到 PostgreSQL 数据库
conn = psycopg2.connect(
    database="tpcds",
    user="postgres",
    password="postgres",
    host="127.0.0.1",
    port="5432"
)

# 创建一个游标对象
cur = conn.cursor()

# 执行查询以获取每个表的行数
cur.execute("SELECT relname, n_live_tup FROM pg_stat_user_tables;")

# 获取结果
rows = cur.fetchall()

# 打印每个表的行数
for row in rows:
    print("Table:", row[0], "  Rows:", row[1])

# 指定要写入的CSV文件名
csv_file = "./information/table_row_counts.csv"

# 打开CSV文件并写入数据
with open(csv_file, mode='w', newline='') as file:
    writer = csv.writer(file)
    # 写入表头
    writer.writerow(['Table', 'Rows'])
    # 写入每行数据
    for row in rows:
        writer.writerow(row)

# 关闭游标和连接
cur.close()
conn.close()

display("CSV文件已创建:", csv_file)

Table: customer   Rows: 100000
Table: store   Rows: 12
Table: web_sales   Rows: 719384
Table: inventory   Rows: 11745095
Table: store_returns   Rows: 287514
Table: store_sales   Rows: 2880189
Table: catalog_sales   Rows: 1441539
Table: promotion   Rows: 300
Table: call_center   Rows: 6
Table: customer_demographics   Rows: 1920800
Table: item   Rows: 18000
Table: time_dim   Rows: 86400
Table: reason   Rows: 35
Table: warehouse   Rows: 5
Table: web_returns   Rows: 71763
Table: catalog_returns   Rows: 144067
Table: web_site   Rows: 30
Table: catalog_page   Rows: 11718
Table: ship_mode   Rows: 20
Table: web_page   Rows: 60
Table: date_dim   Rows: 73049
Table: dbgen_version   Rows: 0
Table: household_demographics   Rows: 7200
Table: customer_address   Rows: 50000
Table: income_band   Rows: 20


'CSV文件已创建:'

'./information/table_row_counts.csv'

## Cond

In [2]:
import pandas as pd
queries_df=pd.read_csv("./information/queries_filter.csv")
queries=queries_df['query'].values
len(queries)

14400

In [3]:
from PGUtils import pgrunner
plan_jsons=[]
for sql in queries:
    plan_jsons.append(pgrunner.getCostPlanJson(sql))
len(plan_jsons)

14400

In [4]:
plan_jsons[8]

{'Plan': {'Node Type': 'Limit',
  'Parallel Aware': False,
  'Startup Cost': 69852.91,
  'Total Cost': 69852.91,
  'Plan Rows': 1,
  'Plan Width': 246,
  'Plans': [{'Node Type': 'Sort',
    'Parent Relationship': 'Outer',
    'Parallel Aware': False,
    'Startup Cost': 69852.91,
    'Total Cost': 69852.91,
    'Plan Rows': 1,
    'Plan Width': 246,
    'Sort Key': ['store.s_store_name',
     'store.s_store_id',
     "(sum(CASE WHEN (date_dim.d_day_name = 'Sunday'::bpchar) THEN store_sales.ss_sales_price ELSE NULL::numeric END))",
     "(sum(CASE WHEN (date_dim.d_day_name = 'Monday'::bpchar) THEN store_sales.ss_sales_price ELSE NULL::numeric END))",
     "(sum(CASE WHEN (date_dim.d_day_name = 'Tuesday'::bpchar) THEN store_sales.ss_sales_price ELSE NULL::numeric END))",
     "(sum(CASE WHEN (date_dim.d_day_name = 'Wednesday'::bpchar) THEN store_sales.ss_sales_price ELSE NULL::numeric END))",
     "(sum(CASE WHEN (date_dim.d_day_name = 'Thursday'::bpchar) THEN store_sales.ss_sales_price 

In [7]:
import json
from IPython.display import display

def find_operators_with_cond(plan_node):
    operators_with_cond = set()

    if 'Node Type' in plan_node:
        result_string = ''.join(map(str, plan_node.keys()))
        if 'Cond' in result_string or 'Filter' in result_string:
            operators_with_cond.add(plan_node['Node Type'])
            # if plan_node['Node Type']=='Aggregate':
            #     display(plan_node)

    if 'Plans' in plan_node:
        # 递归遍历子节点
        for sub_plan in plan_node['Plans']:
            operators_with_cond.update(find_operators_with_cond(sub_plan))

    return operators_with_cond

# 示例的 plan_jsons 是包含多个执行计划的列表
# 这里使用 plan_jsons[4]['Plan'] 获取其中一个执行计划的根节点
operators_with_cond_types = find_operators_with_cond(plan_jsons[4]['Plan'])

# 打印结果
operators_with_cond_types

{'Aggregate', 'Index Scan', 'Merge Join', 'Seq Scan', 'Subquery Scan'}

In [8]:
cond_filter_operater_type=set()
for plan in plan_jsons:
    cond_filter_operater_type=cond_filter_operater_type | find_operators_with_cond(plan['Plan'])
cond_filter_operater_type

{'Aggregate',
 'Bitmap Heap Scan',
 'Bitmap Index Scan',
 'CTE Scan',
 'Hash Join',
 'Index Only Scan',
 'Index Scan',
 'Merge Join',
 'Nested Loop',
 'Seq Scan',
 'Subquery Scan'}

## psqlparse

In [None]:
import psycopg2
from psqlparse import parse_dict

# 替换以下信息为你的 PostgreSQL 数据库信息
dbname = 'indexselection_tpch___10'
user = 'postgres'
password = 'password'
host = '127.0.0.1'
port = '5432'

# 尝试连接到 PostgreSQL 数据库
try:
    connection = psycopg2.connect(
        dbname=dbname,
        user=user,
        password=password,
        host=host,
        port=port
    )

    # 创建游标
    cursor = connection.cursor()

    # 定义 SQL 查询语句
    sql_query = "SELECT * FROM Orders JOIN Customer ON Orders.o_custkey = Customer.c_custkey WHERE o_orderdate BETWEEN '1995-01-01' AND '1995-01-31' AND o_totalprice > 10000;" 

    # 解析 SQL 查询语句
    parse_result = parse_dict(sql_query)[0]["SelectStmt"]
    
    # 打印解析结果
    print(parse_result)
    cursor.execute("explain (COSTS, FORMAT JSON) "+sql_query)
    rows = cursor.fetchall()
    print('rows:',rows)
    plan_json = rows[0][0][0]
    print(plan_json)

    # 提交事务
    connection.commit()

except Exception as e:
    print(f"Error: {e}")

finally:
    # 关闭游标和连接
    if cursor:
        cursor.close()
    if connection:
        connection.close()