In [1]:
import pandas as pd
from sql_metadata import Parser
from collections import defaultdict

pd.set_option('display.max_colwidth', None)

In [13]:
!ls /usr/local/var/postgres/log

postgresql-2022-02-10_211004.csv postgresql-2022-02-10_211004.log


In [30]:
LOG_DIRECTORY = 'epinions_workload.csv'
df = pd.read_csv(LOG_DIRECTORY, header=None, usecols=[7, 13], names=["query_type", "query_text"])
df.iloc[70:80]

Unnamed: 0,query_type,query_text
70,COMMIT,execute S_1: COMMIT
71,BEGIN,execute <unnamed>: BEGIN
72,UPDATE,execute <unnamed>: UPDATE review SET rating = $1 WHERE i_id=$2 AND u_id=$3
73,COMMIT,execute S_1: COMMIT
74,BEGIN,execute <unnamed>: BEGIN
75,UPDATE,execute <unnamed>: UPDATE trust SET trust = $1 WHERE source_u_id=$2 AND target_u_id=$3
76,COMMIT,execute S_1: COMMIT
77,BEGIN,execute <unnamed>: BEGIN
78,UPDATE,execute <unnamed>: UPDATE item SET title = $1 WHERE i_id=$2
79,COMMIT,execute S_1: COMMIT


In [16]:
table_to_columns_epinions = {
  "item": set(["i_id", "title"]),
  "useracct": set(["u_id", "name"]),
  "review": set(["a_id", "u_id", "i_id", "rating", "rank"]),
  "trust": set(["source_u_id", "target_u_id", "trust", "creation_date"]),
  "review_rating": set(["u_id", "a_id", "rating", "status", "creation_date", "last_mod_date", "type", "vertical_id"])
}

In [28]:
qs = set()
def parse_query(query_str : str, ttc_mapping : dict):
  
  if "pg_" in query_str:
    return None

  query_str = query_str[query_str.find(':')+1:] # Remove "execute <unamed>:"

  # Ignore non-relevant queries
  if "BEGIN" in query_str or "COMMIT" in query_str:
    return None

  try:
    p = Parser(query_str)
    tables = p.tables
    columns = p.columns
  except:
    return None

  # Skip queries that dont have a where clause
  if len(tables) == 0 or len(columns) == 0:
    return None

  # Build string "table.column_name" for each column
  added = False
  for column in columns:
    if "." in column:
      added = True
    else:
      # Find which table this column corresponds to
      for table in tables:
        if column in ttc_mapping[table]:
          added = True
          break
  
  if not added:
    # print("Invalid Query:", query_str)
    return None
  return query_str

In [29]:
queries = []
for index, row in df.iterrows():
  if pd.isna(row["query_type"]):
    continue
  q = parse_query(row["query_text"], table_to_columns_epinions)
  if q is not None:
    queries.append(q)

set(queries)


Not supported query type:  SET extra_float_digits = 3
Not supported query type:  SET application_name = 'PostgreSQL JDBC Driver'
Not supported query type:  SET extra_float_digits = 3
Not supported query type:  SET application_name = 'PostgreSQL JDBC Driver'
Not supported query type:  SET extra_float_digits = 3
Not supported query type:  SET application_name = 'PostgreSQL JDBC Driver'
Not supported query type:  SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE
Not supported query type:  SET extra_float_digits = 3
Not supported query type:  SET application_name = 'PostgreSQL JDBC Driver'
Not supported query type:  SHOW ALL
Not supported query type:  ALTER SYSTEM SET log_destination='stderr'
Not supported query type:  ALTER SYSTEM SET logging_collector='off'
Not supported query type:  ALTER SYSTEM SET log_statement='none'


{' SELECT * FROM review r WHERE r.i_id=$1 ORDER BY creation_date DESC',
 ' SELECT * FROM review r, item i WHERE i.i_id = r.i_id and r.i_id=$1 ORDER BY rating DESC, r.creation_date DESC LIMIT 10',
 ' SELECT * FROM review r, useracct u WHERE u.u_id = r.u_id AND r.u_id=$1 ORDER BY rating DESC, r.creation_date DESC LIMIT 10',
 ' SELECT * FROM trust t WHERE t.source_u_id=$1',
 ' SELECT avg(rating) FROM review r WHERE r.i_id=$1',
 ' SELECT avg(rating) FROM review r, trust t WHERE r.u_id=t.target_u_id AND r.i_id=$1 AND t.source_u_id=$2',
 ' SELECT i_id FROM item',
 ' SELECT u_id FROM useracct',
 ' UPDATE item SET title = $1 WHERE i_id=$2',
 ' UPDATE review SET rating = $1 WHERE i_id=$2 AND u_id=$3',
 ' UPDATE trust SET trust = $1 WHERE source_u_id=$2 AND target_u_id=$3',
 ' UPDATE useracct SET name = $1 WHERE u_id=$2'}

In [22]:
def build_actions_str(indexes : list, filename : str) -> list:
  cmds = [f'echo "Generating sql commands for {filename}"']
  for i, index in enumerate(indexes):
    table, column = index.split(".")
    if i == 0:
      cmd = f'echo "CREATE INDEX idx_{table}_{column} ON {table}({column});" > {filename}'
    else:
      cmd = f'echo "CREATE INDEX idx_{table}_{column} ON {table}({column});" >> {filename}'
    cmds.append(cmd)

  return cmds


def build_drop_idx_cmd(indexes : list) -> str:
  drop_cmd = "DROP INDEX"
  for i, index in enumerate(indexes):
    table, column = index.split(".")
    if i != len(indexes)-1:
      drop_cmd += f" idx_{table}_{column},"
    else:
      drop_cmd += f" idx_{table}_{column};"

  return drop_cmd



  
  
      


In [21]:
import csv
import glob
import re
import time
from pathlib import Path
from typing import List

import pandas as pd
import pglast
from pandarallel import pandarallel
from plumbum import cli
from tqdm.contrib.concurrent import process_map

# Enable parallel pandas operations.
# pandarallel is a little buggy. For example, progress_bar=True does not work,
# and if you are using PyCharm you will want to enable "Emulate terminal in
# output console" instead of using the PyCharm Python Console.
# The reason we're using this library anyway is that:
# - The parallelization is dead simple: change .blah() to .parallel_blah().
# - swifter has poor string perf; we're mainly performing string ops.
# - That said, Wan welcomes any switch that works.
pandarallel.initialize(verbose=1)


class Preprocessor:
    _PG_LOG_COLUMNS: List[str] = [
        "log_time",
        "user_name",
        "database_name",
        "process_id",
        "connection_from",
        "session_id",
        "session_line_num",
        "command_tag",
        "session_start_time",
        "virtual_transaction_id",
        "transaction_id",
        "error_severity",
        "sql_state_code",
        "message",
        "detail",
        "hint",
        "internal_query",
        "internal_query_pos",
        "context",
        "query",
        "query_pos",
        "location",
        "application_name",
        "backend_type",
    ]

    """
    Convert PostgreSQL query logs into pandas DataFrame objects.
    """

    def get_dataframe(self):
        """
        Get a raw dataframe of query log data.

        Returns
        -------
        df : pd.DataFrame
            Dataframe containing the query log data.
            Note that irrelevant query log entries are still included.
        """
        return self._df

    def get_grouped_dataframe_interval(self, interval=None):
        """
        Get the pre-grouped version of query log data.

        Parameters
        ----------
        interval : pd.TimeDelta or None
            time interval to group and count the query templates
            if None, pd is only aggregated by template

        Returns
        -------
        grouped_df : pd.DataFrame
            Dataframe containing the pre-grouped query log data.
            Grouped on query template and optionally log time.
        """
        gb = None
        if interval is None:
            gb = self._df.groupby("query_template").size()
            gb.drop("", axis=0, inplace=True)
        else:
            gb = self._df.groupby("query_template").resample(interval).size()
            gb.drop("", axis=0, level=0, inplace=True)
        grouped_df = pd.DataFrame(gb, columns=["count"])
        return grouped_df

    def get_grouped_dataframe_params(self):
        """
        Get the pre-grouped version of query log data.

        Returns
        -------
        grouped_df : pd.DataFrame
            Dataframe containing the pre-grouped query log data.
            Grouped on query template and query parameters.
        """
        return self._grouped_df_params

    def get_params(self, query):
        """
        Find the parameters associated with a particular query.

        Parameters
        ----------
        query : str
            The query template to look up parameters for.

        Returns
        -------
        params : pd.Series
            The counts of parameters associated with a particular query.
            Unfortunately, due to quirks of the PostgreSQL CSVLOG format,
            the types of parameters are unreliable and may be stringly typed.
        """
        params = self._grouped_df_params.query("query_template == @query")
        return params.droplevel(0).squeeze(axis=1)

    def sample_params(self, query, n, replace=True, weights=True):
        """
        Find a sampling of parameters associated with a particular query.

        Parameters
        ----------
        query : str
            The query template to look up parameters for.
        n : int
            The number of parameter vectors to sample.
        replace : bool
            True if the sampling should be done with replacement.
        weights : bool
            True if the sampling should use the counts as weights.
            False if the sampling should be equal probability weighting.

        Returns
        -------
        params : np.ndarray
            Sample of the parameters associated with a particular query.
        """
        params = self.get_params(query)
        weight_vec = params if weights else None
        sample = params.sample(n, replace=replace, weights=weight_vec)
        return sample.index.to_numpy()

    @staticmethod
    def substitute_params(query_template, params):
        assert type(query_template) == str
        query = query_template
        keys = [f"${i}" for i in range(1, len(params) + 1)]
        for k, v in reversed(list(zip(keys, params))):
            # The reversing is crucial! Note that $1 is a prefix of $10.
            query = query.replace(k, v)
        return query

    @staticmethod
    def _read_csv(csvlog, log_columns):
        """
        Read a PostgreSQL CSVLOG file into a pandas DataFrame.

        Parameters
        ----------
        csvlog : str
            Path to a CSVLOG file generated by PostgreSQL.
        log_columns : List[str]
            List of columns in the csv log.

        Returns
        -------
        df : pd.DataFrame
            DataFrame containing the relevant columns for query forecasting.
        """
        # This function must have a separate non-local binding from _read_df
        # so that it can be pickled for multiprocessing purposes.
        return pd.read_csv(
            csvlog,
            names=log_columns,
            parse_dates=["log_time", "session_start_time"],
            usecols=[
                "log_time",
                "session_start_time",
                "command_tag",
                "message",
                "detail",
            ],
            header=None,
            index_col=False,
            skiprows=100,
            nrows=1000
        )

    @staticmethod
    def _extract_query(message_series):
        """
        Extract SQL queries from the CSVLOG's message column.

        Parameters
        ----------
        message_series : pd.Series
            A series corresponding to the message column of a CSVLOG file.

        Returns
        -------
        query : pd.Series
            A str-typed series containing the queries from the log.
        """
        simple = r"statement: ((?:DELETE|INSERT|SELECT|UPDATE).*)"
        extended = r"execute .+: ((?:DELETE|INSERT|SELECT|UPDATE).*)"
        regex = f"(?:{simple})|(?:{extended})"
        query = message_series.str.extract(regex, flags=re.IGNORECASE)
        # Combine the capture groups for simple and extended query protocol.
        query = query[0].fillna(query[1])
        print("TODO(WAN): Disabled SQL format for being too slow.")
        # Prettify each SQL query for standardized formatting.
        # query = query.parallel_map(pglast.prettify, na_action='ignore')
        # Replace NA values (irrelevant log messages) with empty strings.
        query.fillna("", inplace=True)
        return query.astype(str)

    @staticmethod
    def _extract_params(detail_series):
        """
        Extract SQL parameters from the CSVLOG's detail column.
        If there are no such parameters, an empty {} is returned.

        Parameters
        ----------
        detail_series : pd.Series
            A series corresponding to the detail column of a CSVLOG file.

        Returns
        -------
        params : pd.Series
            A dict-typed series containing the parameters from the log.
        """

        def extract(detail):
            detail = str(detail)
            prefix = "parameters: "
            idx = detail.find(prefix)
            if idx == -1:
                return {}
            parameter_list = detail[idx + len(prefix):]
            params = {}
            for pstr in parameter_list.split(", "):
                pnum, pval = pstr.split(" = ")
                assert pnum.startswith("$")
                assert pnum[1:].isdigit()
                params[pnum] = pval
            return params

        return detail_series.parallel_apply(extract)

    @staticmethod
    def _substitute_params(df, query_col, params_col):
        """
        Substitute parameters into the query, wherever possible.

        Parameters
        ----------
        df : pd.DataFrame
            The dataframe of query log data.
        query_col : str
            Name of the query column produced by _extract_query.
        params_col : str
            Name of the parameter column produced by _extract_params.
        Returns
        -------
        query_subst : pd.Series
            A str-typed series containing the query with parameters inlined.
        """

        def substitute(query, params):
            # Consider '$2' -> "abc'def'ghi".
            # This necessitates the use of a SQL-aware substitution,
            # even if this is much slower than naive string substitution.
            new_sql, last_end = [], 0
            for token in pglast.parser.scan(query):
                token_str = str(query[token.start: token.end + 1])
                if token.start > last_end:
                    new_sql.append(" ")
                if token.name == "PARAM":
                    assert token_str.startswith("$")
                    assert token_str[1:].isdigit()
                    new_sql.append(params[token_str])
                else:
                    new_sql.append(token_str)
                last_end = token.end + 1
            new_sql = "".join(new_sql)
            return new_sql

        def subst(row):
            return substitute(row[query_col], row[params_col])

        return df.parallel_apply(subst, axis=1)

    @staticmethod
    def _parse(query_series):
        """
        Parse the SQL query to extract (prepared queries, parameters).

        Parameters
        ----------
        query_series : pd.Series
            SQL queries with the parameters inlined.

        Returns
        -------
        queries_and_params : pd.Series
            A series containing tuples of (prepared SQL query, parameters).
        """
        simple = r"statement: ((?:DELETE|INSERT|SELECT|UPDATE).*)"
        extended = r"execute .+: ((?:DELETE|INSERT|SELECT|UPDATE).*)"
        regex = f"(?:{simple})|(?:{extended})"
        def parse(sql):
            query = re.search(regex, sql)
            if not query:
                return np.nan, ()
            query = query.group(0)

            new_sql, params, last_end = [], [], 0
            for token in pglast.parser.scan(sql):
                token_str = str(sql[token.start: token.end + 1])
                if token.start > last_end:
                    new_sql.append(" ")
                if token.name in ["ICONST", "FCONST", "SCONST"]:
                    # Integer, float, or string constant.
                    new_sql.append("$" + str(len(params) + 1))
                    params.append(token_str)
                else:
                    new_sql.append(token_str)
                last_end = token.end + 1
            new_sql = "".join(new_sql)
            return new_sql, tuple(params)

        return query_series.parallel_apply(parse)

    def _from_csvlogs(self, workload_csv_path, log_columns, store_query_subst=False):
        """
        Glue code for initializing the Preprocessor from CSVLOGs.

        Parameters
        ----------
        csvlogs : List[str]
            List of PostgreSQL CSVLOG files.
        log_columns : List[str]
            List of columns in the csv log.
        store_query_subst: bool
            True if the "query_subst" column should be stored.

        Returns
        -------
        df : pd.DataFrame
            A dataframe representing the query log.
        """
        time_end, time_start = None, time.perf_counter()

        def clock(label):
            nonlocal time_end, time_start
            time_end = time.perf_counter()
            print("\r{}: {:.2f} s".format(label, time_end - time_start))
            time_start = time_end

        df = self._read_csv(workload_csv_path, log_columns)
        clock("Read dataframe")

        # print("Extract queries: ", end="", flush=True)
        # df["query_raw"] = self._extract_query(df["message"])
        # df.drop(columns=["message"], inplace=True)
        # clock("Extract queries")

        # print("Extract parameters: ", end="", flush=True)
        # df["params"] = self._extract_params(df["detail"])
        # df.drop(columns=["detail"], inplace=True)
        # clock("Extract parameters")

        # print("Substitute parameters into query: ", end="", flush=True)
        # df["query_subst"] = self._substitute_params(df, "query_raw", "params")
        # df.drop(columns=["query_raw", "params"], inplace=True)
        # clock("Substitute parameters into query")
        # print(df["query_subst"], "\n=====================")

        print("Parse query: ", end="", flush=True)
        parsed = self._parse(df["message"])
        df[["query_template", "query_params"]] = pd.DataFrame(
            parsed.tolist(), index=df.index)
        df.drop(columns=["message"], inplace=True)
        clock("Parse query")

        # Only keep the relevant columns to optimize for storage, unless otherwise specified.
        stored_columns = ["log_time", "query_template", "query_params"]
        if store_query_subst:
            stored_columns.append("query_subst")
        return df[stored_columns]

    def __init__(self, workload_csv_path, store_query_subst=False):
        log_columns = self._PG_LOG_COLUMNS
        print(f"Preprocessing CSV logs in: {workload_csv_path}")

        df = self._from_csvlogs(
            workload_csv_path, log_columns, store_query_subst=store_query_subst)
        
        df.set_index("log_time", inplace=True)

        # Grouping queries by template-parameters count.
        gbp = df.groupby(["query_template", "query_params"]).size()
        grouped_by_params = pd.DataFrame(gbp, columns=["count"])
        # Remove unrelated queries and empty queries
        grouped_by_params = grouped_by_params[~grouped_by_params.index.isin([
                                                                            ("", ())])]
        grouped_by_params = grouped_by_params[~grouped_by_params.index.get_level_values(0).str.contains('pg_', case=False)]
        
        self._df = df
        self._grouped_df_params = grouped_by_params
        # self._df = self._df[~(self._df["query_template"].isin(["", ()]))]

        # Optionally write out the query templates and queries out to a file.
        # if self.output_query_templates is not None:
        #     templates = preprocessor.get_dataframe()["query_template"]
        #     templates = pd.Series(templates[templates != ""].unique())
        #     templates.to_csv(self.output_query_templates,
        #                      header=False, index=False, quoting=csv.QUOTE_ALL)
        # if self.output_queries is not None:
        #     queries = preprocessor.get_dataframe()["query_subst"]
        #     queries = queries[queries != ""]
        #     queries.to_csv(self.output_queries, header=False,
        #                    index=False, quoting=csv.QUOTE_ALL)

In [22]:
p = Preprocessor("./workload.csv")
p._grouped_df_params[:100]

Preprocessing CSV logs in: ./workload.csv
Read dataframe: 0.08 s
Parse query: 

AttributeError: 'NoneType' object has no attribute 'group'

In [12]:
# most_common_params = (p._grouped_df_params.reset_index().sort_values('count')).drop_duplicates(subset=["query_template"],keep='last')
params = p.sample_params("SELECT * FROM review r, item i WHERE i.i_id = r.i_id and r.i_id=$1 ORDER BY rating DESC, r.creation_date DESC LIMIT $2", n=5, replace=True, weights=True)
p.substitute_params("SELECT * FROM review r, item i WHERE i.i_id = r.i_id and r.i_id=$1 ORDER BY rating DESC, r.creation_date DESC LIMIT $2", params[0])

'SELECT * FROM review r, item i WHERE i.i_id = r.i_id and r.i_id=960 ORDER BY rating DESC, r.creation_date DESC LIMIT 10'

In [18]:
import numpy as np

# Count of each query
p_count = p._grouped_df_params.groupby("query_template")["count"].sum()
scaled_p_count = (p_count * 100/sum(p_count)).apply(np.ceil).astype(int)
scaled_p_count


query_template
SELECT * FROM review r WHERE r.i_id=$1 ORDER BY creation_date DESC                                                            10
SELECT * FROM review r, item i WHERE i.i_id = r.i_id and r.i_id=$1 ORDER BY rating DESC, r.creation_date DESC LIMIT $2        10
SELECT * FROM review r, useracct u WHERE u.u_id = r.u_id AND r.u_id=$1 ORDER BY rating DESC, r.creation_date DESC LIMIT $2    10
SELECT * FROM trust t WHERE t.source_u_id=$1                                                                                  10
SELECT avg(rating) FROM review r WHERE r.i_id=$1                                                                              10
SELECT avg(rating) FROM review r, trust t WHERE r.u_id=t.target_u_id AND r.i_id=$1 AND t.source_u_id=$2                       10
SELECT i_id FROM item                                                                                                          1
SELECT u_id FROM useracct                                                         

In [90]:
for index, (query_template, count) in enumerate(p_count.iteritems()):
  print(index, query_template, count)
  break

0 SELECT * FROM review r WHERE r.i_id=$1 ORDER BY creation_date DESC 13734


In [19]:
# How many unique parameters each query has?
p_unique = p._grouped_df_params.groupby("query_template").size()
p_unique

query_template
SELECT * FROM review r WHERE r.i_id=$1 ORDER BY creation_date DESC                                                             1000
SELECT * FROM review r, item i WHERE i.i_id = r.i_id and r.i_id=$1 ORDER BY rating DESC, r.creation_date DESC LIMIT $2         1000
SELECT * FROM review r, useracct u WHERE u.u_id = r.u_id AND r.u_id=$1 ORDER BY rating DESC, r.creation_date DESC LIMIT $2     2000
SELECT * FROM trust t WHERE t.source_u_id=$1                                                                                   2000
SELECT avg(rating) FROM review r WHERE r.i_id=$1                                                                               1000
SELECT avg(rating) FROM review r, trust t WHERE r.u_id=t.target_u_id AND r.i_id=$1 AND t.source_u_id=$2                       27775
SELECT i_id FROM item                                                                                                             1
SELECT u_id FROM useracct                                    

In [71]:
print(len(p3))

14


In [42]:
len(p._df)

153281

In [35]:
for index, row in p._df.iterrows():
  print(row["query_template"] == "")

True
True
False
True
True
False
True
True
False
True
True
False
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
True
True
False
