In [2]:
import mmh3
import math

class HyperLogLog:
    def __init__(self, precision):
        self.precision = precision
        self.buckets = [0] * (2 ** precision)

    def add(self, value):
        hash_value = mmh3.hash(value)
        bucket = hash_value & ((2 ** self.precision) - 1)
        rank = (hash_value >> self.precision).bit_length() + 1
        self.buckets[bucket] = max(self.buckets[bucket], rank)

    def estimate_cardinality(self):
        alpha = {
            4: 0.673,
            5: 0.697,
            6: 0.709
        }
        m = len(self.buckets)
        z = 1 / sum([2 ** -v for v in self.buckets])
        if self.precision in alpha:
            alpha_m = alpha[self.precision] * m ** 2
        else:
            alpha_m = (0.7213 / (1 + 1.079 / m)) * m ** 2
        estimate = alpha_m * z
        if estimate <= 2.5 * m:
            zeros = self.buckets.count(0)
            if zeros != 0:
                estimate = m * math.log(m / zeros)
        elif estimate > 1 / 30 * 2 ** 32:
            estimate = -2 ** 32 * math.log(1 - estimate / 2 ** 32)
        return round(estimate)

def estimate_cardinality(sql_query):
    # 这里的代码需要根据具体的数据库和SQL查询进行实现
    # 假设我们已经得到了查询结果集
    results = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

    hll = HyperLogLog(precision=14)
    for result in results:
        hll.add(str(result))

    return hll.estimate_cardinality()

sql_query = "SELECT COUNT(*) FROM table WHERE column = 'value'"
cardinality_estimate = estimate_cardinality(sql_query)
print("Cardinality estimate:", cardinality_estimate)

Cardinality estimate: 10


In [2]:
import re
from collections import defaultdict

# 模拟表数据
table_data = {  
    "users": {
        "columns": ["id", "name", "age"],
        "rows": 1000,
        "column_data": {
            "id": {"unique_values": 1000},
            "name": {"unique_values": 900},
            "age": {"unique_values": 80},
        },
    },
}

def parse_query(query):
    select_re = r"SELECT\s+(?P<columns>.*?)\s+FROM\s+(?P<table>\w+)(?:\s+WHERE\s+(?P<conditions>.*))?"
    match = re.match(select_re, query, re.IGNORECASE)
    if not match:
        raise ValueError("Invalid query")

    columns_str = match.group("columns")
    table = match.group("table")
    conditions_str = match.group("conditions")

    columns = [col.strip() for col in columns_str.split(",")]
    conditions = []

    if conditions_str:
        condition_re = r"(?P<column>\w+)\s*=\s*(?P<value>.+)"
        for cond_str in conditions_str.split("AND"):
            cond_match = re.match(condition_re, cond_str.strip(), re.IGNORECASE)
            if cond_match:
                conditions.append({
                    "column": cond_match.group("column"),
                    "value": cond_match.group("value"),
                })

    return {
        "columns": columns,
        "table": table,
        "conditions": conditions,
    }

def estimate_cardinality(query):
    parsed_query = parse_query(query)
    table = parsed_query["table"]
    conditions = parsed_query["conditions"]

    if table not in table_data:
        raise ValueError(f"Table '{table}' not found")

    table_info = table_data[table]
    cardinality = table_info["rows"]

    for condition in conditions:
        column = condition["column"]
        if column not in table_info["column_data"]:
            raise ValueError(f"Column '{column}' not found in table '{table}'")

        column_info = table_info["column_data"][column]
        unique_values = column_info["unique_values"]
        selectivity = 1 / unique_values
        cardinality *= selectivity

    return int(cardinality)

# 使用示例
query = "SELECT id, name, age FROM users WHERE age = 25"
estimated_cardinality = estimate_cardinality(query)
print(f"Estimated cardinality: {estimated_cardinality}")

Estimated cardinality: 12
