In [16]:
from multiprocessing import Process,freeze_support, Pool
import psycopg2
import time
import os
import pandas as pd
import datetime,random
import concurrent.futures

In [17]:
USER = "user_1"
DB = "test_1"
PASS = "postgres"
HOST = "127.0.0.1"
PORT = 5432
df = None

In [18]:
def serial_read():
    start_time = time.time()
    conn = psycopg2.connect(database = DB, user = USER, password = PASS, host = HOST, port= PORT)
    cursor = conn.cursor()
    cursor.execute("SELECT * from employees.employee")
    records = cursor.fetchall()
    conn.close()
    print(f"Execution time for serial read:{round(time.time() - start_time,3)} s")
    df = pd.DataFrame(records,columns=["ID","Birth Date", "First Name", "Last Name", "Gender", "Hire Date"])
    return df

In [19]:
df = serial_read()
df

Execution time for serial read:0.384 s


Unnamed: 0,ID,Birth Date,First Name,Last Name,Gender,Hire Date
0,10001,1953-09-02,Georgi,Facello,M,1986-06-26
1,10002,1964-06-02,Bezalel,Simmel,F,1985-11-21
2,10003,1959-12-03,Parto,Bamford,M,1986-08-28
3,10004,1954-05-01,Chirstian,Koblick,M,1986-12-01
4,10005,1955-01-21,Kyoichi,Maliniak,M,1989-09-12
...,...,...,...,...,...,...
300019,499995,1958-09-24,Dekang,Lichtner,F,1993-01-12
300020,499996,1953-03-07,Zito,Baaz,M,1990-09-27
300021,499997,1961-08-03,Berhard,Lenart,M,1986-04-21
300022,499998,1956-09-05,Patricia,Breugel,M,1993-10-13


In [20]:
def execute_select():
    conn = psycopg2.connect(database = DB, user = USER, password = PASS, host = HOST, port= PORT)
    cursor = conn.cursor()
    cursor.execute("SELECT * from employees.employee")
    records = cursor.fetchall()
    cursor.close()
    return records

def parallel_read():
    start_time = time.time()
    records = []
    with concurrent.futures.ProcessPoolExecutor() as executor:
        proc = [executor.submit(execute_select)]
        for f in concurrent.futures.as_completed(proc):
            records.extend(f.result())
    
    print(f"Execution time for parallel read:{round(time.time() - start_time,3)} s")
#     print(records[])
    df = pd.DataFrame(records,columns=["ID","Birth Date", "First Name", "Last Name", "Gender", "Hire Date"])
    return df
    
    
    

In [21]:
df = parallel_read()

Execution time for parallel read:1.461 s


In [22]:
def random_date():
    d = random.randint(1, int(time.time()))
    return datetime.date.fromtimestamp(d).strftime('%Y-%m-%d')

def generate_name():
    length = random.randint(1,6)
    name = ""
    for i in range(length):
        j = random.randint(0,26)
        name += chr(97+j)
    return name
def create_record(id):
    id = int(id)
    seed = random.randint(0,1)
    gender = ""
    if seed == 0:
        gender = 'M'
    else:
        gender = 'F'
    query = """INSERT INTO employees.employee (id, birth_date, first_name, last_name, gender, hire_date) VALUES (%s,%s,%s,%s,%s,%s)"""
    values = (id,random_date(),generate_name(),generate_name(),gender,random_date())
    return query,values
def generate_records(n):
    records = []
    for i in range(n):
        records.append(create_record(df.iloc[df.shape[0] - 1,0] + i + 1))
    return records

In [23]:
def insert_serially(n):
    records = generate_records(n)
    orignal_size = df.shape[0]
    start_time = time.time()
    conn = psycopg2.connect(database = DB, user = USER, password = PASS, host = HOST, port= PORT)
    cursor = conn.cursor()
    for record in records:
        query, values = record
        cursor.execute(query,values)
        conn.commit()
    print(f"Execution time for sequential insert:{round(time.time() - start_time,3)} s")
    cursor.execute('select count(*) from employees.employee')
    rows = cursor.fetchall()
    if rows[0][0] - orignal_size == n:
        print(f"{n} records inserted successfully")
    
    conn.close()

In [24]:
insert_serially(10000)
df  = serial_read()
df

Execution time for sequential insert:20.49 s
10000 records inserted successfully
Execution time for serial read:0.325 s


Unnamed: 0,ID,Birth Date,First Name,Last Name,Gender,Hire Date
0,10001,1953-09-02,Georgi,Facello,M,1986-06-26
1,10002,1964-06-02,Bezalel,Simmel,F,1985-11-21
2,10003,1959-12-03,Parto,Bamford,M,1986-08-28
3,10004,1954-05-01,Chirstian,Koblick,M,1986-12-01
4,10005,1955-01-21,Kyoichi,Maliniak,M,1989-09-12
...,...,...,...,...,...,...
310019,509995,2014-02-13,cnksqw,zh{,F,1975-02-19
310020,509996,2003-10-18,optm,cpj{u,M,1997-10-11
310021,509997,1980-07-23,u,san,M,2008-05-16
310022,509998,1974-08-27,jxko,nulhk,M,2012-11-13


In [25]:
def insert(query,values):
    conn = psycopg2.connect(database = DB, user = USER, password = PASS, host = HOST, port= PORT)
    cursor = conn.cursor()
    cursor.execute(query,values)
    conn.commit()
    conn.close()
    
    
    

def parallel_write(n):
    records = generate_records(n)
    orignal_size = df.shape[0]
    start_time = time.time()
    with concurrent.futures.ProcessPoolExecutor() as executor:
        proc = [executor.submit(insert,query=query,values=values) for query,values in records]
    
    print(f"Execution time for parallel insert:{round(time.time() - start_time,3)} s")
    conn = psycopg2.connect(database = DB, user = USER, password = PASS, host = HOST, port= PORT)
    cursor = conn.cursor()
    cursor.execute('select count(*) from employees.employee')
    rows = cursor.fetchall()
    if rows[0][0] - orignal_size == n:
        print(f"{n} records inserted successfully")
    
    conn.close()

In [26]:
parallel_write(10000)
df = parallel_read()
df

Execution time for parallel insert:12.917 s
10000 records inserted successfully
Execution time for parallel read:1.708 s


Unnamed: 0,ID,Birth Date,First Name,Last Name,Gender,Hire Date
0,10001,1953-09-02,Georgi,Facello,M,1986-06-26
1,10002,1964-06-02,Bezalel,Simmel,F,1985-11-21
2,10003,1959-12-03,Parto,Bamford,M,1986-08-28
3,10004,1954-05-01,Chirstian,Koblick,M,1986-12-01
4,10005,1955-01-21,Kyoichi,Maliniak,M,1989-09-12
...,...,...,...,...,...,...
320019,519924,1994-09-26,fxg,{nx,F,1975-09-25
320020,519943,1988-02-10,orpl,who,F,1974-05-28
320021,519962,2018-03-28,mep,cinyc,M,1988-12-05
320022,519982,2004-09-06,uuwhyo,h,M,1988-07-23
