In [1]:
from mimesis.locales import Locale
from mimesis.enums import Gender
import string
from mimesis.keys import maybe
import base64
from mimesis.schema import Field, Schema
import random
from datetime import datetime
import psycopg2
import randomtimestamp
import pandas as pd

In [11]:
field = Field(locale=Locale.EN)

def rand_int(random, a=None, b=None): # create field handler returning random integer
    return random.randint(a, b)

field.register_handler("random_integer",rand_int) # register rand_int handler under the name "random_integer"

In [12]:
def rand_string(random,max_length=None): # create field handler returning random integer of random length
    N=random.randint(1,max_length) 
    res = ''.join(random.choices(string.ascii_letters + string.digits, k=N))
    encoded_string = base64.b64encode(res.encode("utf-8")).decode("utf-8") # base64 encoding
    return encoded_string

field.register_handler("string_encoded",rand_string)

In [5]:
field = Field(locale=Locale.EN)

date_format = "%Y-%m-%d %H:%M:%S" # format compatible with postgres timestamp
start_year = 2013
end_year = 2023

In [19]:
# 80M actions generated by first 20K clients, 20M actions were generated by the rest of clients
schema = Schema(
    schema=lambda: {
        "client_id": field("random_integer", a = 0, b = 20000),
        "create_timestamp": field("datetime",key = lambda s: datetime.strftime(s, date_format),start=start_year,end=end_year),
        "action_body": field("string_encoded",max_length=1000)
    },
    iterations=80000000
)
action = schema.create()

In [12]:
# generate 100K clients
schema = Schema(
    schema=lambda: {
        "name": field("full_name", key=random.choice([Gender.FEMALE,Gender.MALE])),
        "registration_timestamp": field("datetime",key = lambda s: datetime.strftime(s, "%Y-%m-%d %H:%M:%S"),start=start_year,end=end_year),
        "delete_timestamp": field("datetime",key = maybe(None, probability = 0.8),start=start_year,end=end_year)
    },
    iterations=100000
)
client = schema.create()

In [147]:
# establishing the connection
conn = psycopg2.connect(
    database="postgres",
    user='postgres',
    password='postgres',
    host='localhost',
    port='5432'
)

# creating a cursor object
cursor = conn.cursor()

In [141]:
# creating table of clients
cursor.execute("""CREATE TABLE IF NOT EXISTS client(
client_id SERIAL PRIMARY KEY,
name VARCHAR(60) not null,
registration_timestamp TIMESTAMP not null,
action_count INTEGER,
delete_timestamp TIMESTAMP
);""")

# creating table for client's actions
cursor.execute("""CREATE TABLE IF NOT EXISTS action(
action_id SERIAL PRIMARY KEY,
client_id INTEGER not null,
create_timestamp TIMESTAMP not null,
action_body TEXT
);""")

# creating table for statistics
cursor.execute("""CREATE TABLE IF NOT EXISTS statistics(
id serial primary key, 
execution_time numeric(12,5), 
rows integer, 
size text, 
cache_usage numeric(6,5)
);""")

# create f_get_data function which decrypt action_body
cursor.execute(""" CREATE OR REPLACE FUNCTION public.f_get_data(p_client_id integer, ts_from timestamp, ts_to timestamp)
RETURNS TABLE(action_id integer, client_id integer, create_timestamp timestamp, action_body text)
LANGUAGE sql
AS $function$
select 
action_id, 
client_id, 
create_timestamp, 
convert_from(decode(action_body, 'base64'),'UTF-8')
from action
where client_id = p_client_id and create_timestamp between ts_from and ts_to;
$function$
;""")

# inserting record into employee table
for step in range(800):

    # wrap into try-except to avoid abruption when the list of actions is out of range
    try:
        action_data = action[100000*step:100000*(step+1)]
        for d in action_data:
            cursor.execute("INSERT into action(client_id,create_timestamp,action_body) VALUES (%(client_id)s, %(create_timestamp)s, %(action_body)s)", d)
    except:
        print('list of actions out of range')
    
    # wrap into try-except to avoid abruption when the list of clients is out of range
    try:
        client_data = client[100000*step:100000*(step+1)]
        for d in client_data:
        	cursor.execute("INSERT into client(name,registration_timestamp,delete_timestamp) VALUES (%(name)s, %(registration_timestamp)s, %(delete_timestamp)s)", d)
    except:
        print('list of clients out of range')
        
    # Commit your changes in the database after 100000 inserts
    conn.commit() 

list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of actions out of range
list of action

In [132]:
def create_statistics_procedure(N: int):
    for iteration in range(N):
            # establishing the connection
        conn = psycopg2.connect(
        database="postgres",
        user='postgres',
        password='postgres',
        host='localhost',
        port='5432'
        )

        # creating a cursor object
        cursor = conn.cursor()
        # generate random trader
        random_trader = random.randint(a = 0, b = 100000)
        # generate two random dates
        a = randomtimestamp.randomtimestamp(start_year = start_year, end_year = end_year, pattern = date_format)
        b = randomtimestamp.randomtimestamp(start_year = start_year, end_year = end_year, pattern = date_format)
        # define lower and upper bounds of the interval
        ts_from = min(a,b)
        ts_to = max(a,b)

        # temporary table is used for measuring the size of query. The difference in time execution is neglected 
        sql = f"EXPLAIN(analyze, buffers,timing, format json) CREATE TEMPORARY TABLE IF NOT EXISTS query_result AS SELECT * FROM f_get_data({random_trader}, '{ts_from}', '{ts_to}');"

        cursor.execute(sql)

        # unformatted output in json format
        unformatted_output = cursor.fetchall()

        if unformatted_output is not None:
            # get total execution time
            total_exec_time = round(unformatted_output[0][0][0]['Plan'].get('Actual Total Time'),5)

            # get total rows
            rows = unformatted_output[0][0][0]['Plan'].get('Actual Rows')

            # calculate size of data
            get_data_size_sql = "SELECT pg_size_pretty(pg_relation_size('query_result'))"
            cursor.execute(get_data_size_sql)
            data_size = cursor.fetchall()[0][0]

            # calculate cache hit ratio
            hit_blocks = float(unformatted_output[0][0][0]['Plan'].get('Shared Hit Blocks'))
            read_blocks = float(unformatted_output[0][0][0]['Plan'].get('Shared Read Blocks'))
            hit_cache_ratio = round(100 * hit_blocks / (hit_blocks + read_blocks),5)

            # insert statistics into table
            dml = f"INSERT into statistics(execution_time,rows,size,cache_usage) VALUES ({total_exec_time}, {rows}, '{data_size}', {hit_cache_ratio})"

            cursor.execute(dml)

            # commit changes
            cursor.execute("DROP TABLE IF EXISTS public.query_result;")
            conn.commit()
        else:
            print(f'No data found for trader {random_trader}')
        conn.close()

In [134]:
create_statistics_procedure(100)