In [1]:
import os
import psycopg2
from psycopg2.extras import wait_select
from time import time
import select

from dag_execution.task_dag_executor import *
from dag_execution.execution_graph import run_query_py

from configparser import SafeConfigParser
parser = SafeConfigParser()

host : Baptistes-MacBook-Pro.local
user : admin


  # This is added back by InteractiveShellApp.init_path()


In [2]:
# http://initd.org/psycopg/docs/advanced.html
def wait(conn):
    while 1:
        state = conn.poll()
        if state == psycopg2.extensions.POLL_OK:
            break
        elif state == psycopg2.extensions.POLL_WRITE:
            select.select([], [conn.fileno()], [])
        elif state == psycopg2.extensions.POLL_READ:
            select.select([conn.fileno()], [], [])
        else:
            raise psycopg2.OperationalError("poll() returned %s" % state)


def open_connection(dbname, user, host, port, password):
    connect_text = "dbname='{}' user='{}' host={} port={} password='{}'".format(
        dbname, user, host, port, password)
    connector = psycopg2.connect(connect_text)
    cursor = connector.cursor()
    return connector, cursor


def open_connection_async(dbname, user, host, port, password):
    connect_text = "dbname='{}' user='{}' host={} port={} password='{}'".format(
        dbname, user, host, port, password)
    connector = psycopg2.connect(connect_text, async=1)
    # wait(connector)
    wait_select(connector)
    cursor = connector.cursor()
    return connector, cursor


def close_connection(connector, cursor):
    cursor.close()
    connector.close()
    return None


In [3]:
parser.read(PARAM_FILE)
DBNAME   = parser.get("redshift", "dbname")
HOST     = parser.get("redshift", "host")
PASSWORD = parser.get("redshift", "password")
PORT     = parser.get("redshift", "port")
USER     = parser.get("redshift", "user")
connector, cursor = open_connection_async(DBNAME, USER, HOST, PORT, PASSWORD)

# Test

In [None]:
query = """
DROP TABLE if EXISTS req1;
CREATE temp TABLE req1(
  tname CHARACTER VARYING (32)
) diststyle ALL;

INSERT INTO req1 SELECT 'req1' ;
"""

cursor.execute(query)
# for record in cursor:
#     print(record)

connector.commit()

In [None]:
query = """
DROP TABLE if EXISTS req2;
CREATE temp TABLE req2(
  tname CHARACTER VARYING (32)
) diststyle ALL;

INSERT INTO req2 SELECT 'req2' ;
"""

cursor.execute(query)
connector.commit()

In [None]:
query = "SELECT * from req1 ;"

cursor.execute(query)
for record in cursor:
    print(record)

In [None]:
query = "SELECT * from req2 ;"

cursor.execute(query)
for record in cursor:
    print(record)

In [None]:
query = "SELECT * from bidon ;"

# https://bbengfort.github.io/observations/2017/12/06/psycopg2-transactions.html
try:
    cursor.execute(query)
    connector.commit()
except Exception as e:
    connector.rollback()
    pass
    #raise e

In [None]:
query = "SELECT * from bidon ;"

try :
    cursor.execute(query)
    for record in cursor:
        print(record)
except Exception as e:
    print("exception error :", e)

# Trial

In [4]:
def run_query_py(query: str, connector, cursor, prompt: bool=False, asynchronous=False):
    # http://initd.org/psycopg/docs/usage.html
    try:
        cursor.execute(query)
        if prompt:
            for record in cursor:
                print(record)

        connector.commit()
    except Exception as e:
        if not asynchronous:
            connector.rollback()
        raise e
        
def run_query_py_async(query: str, connector, cursor, prompt: bool=False):
    # http://initd.org/psycopg/docs/usage.html
    try:
        cursor.execute(query)
        wait_select(cursor.connection)
        connector.commit()
    except Exception as e:
        raise e

In [5]:
sql_dir = DAG_TST_SQL_DIR
    
task_dag = TaskDag()
task_dag.build(DAG_TST_SQL_DIR + "dependencies.txt")
for node in nx.topological_sort(task_dag.G):
    query = open(sql_dir + node, 'r').read()
    print(node)
    print(query)
    task_dag.add_task(
        node,
        lambda: run_query_py_async(query, connector, cursor), task_dag.G.predecessors(node)
    )
print(task_dag.tasks)

req4.sql
-- SELECT * FROM apps LIMIT 5 ;

DROP TABLE if EXISTS req4;
CREATE temp TABLE req4(
  tname CHARACTER VARYING (32)
) diststyle ALL;

INSERT INTO req4 SELECT 'req4' ;

req1.sql
-- SELECT * FROM apps LIMIT 5 ;

DROP TABLE if EXISTS req1;
CREATE temp TABLE req1(
  tname CHARACTER VARYING (32)
) diststyle ALL;

INSERT INTO req1 SELECT 'req1' ;

req2a.sql
-- SELECT * FROM apps LIMIT 5 ;

DROP TABLE if EXISTS req2a;
CREATE temp TABLE req2a(
  tname CHARACTER VARYING (32)
) diststyle ALL;

INSERT INTO req2a SELECT 'req2a' ;

req2b.sql
-- SELECT * FROM apps LIMIT 5 ;

DROP TABLE if EXISTS req2b;
CREATE temp TABLE req2b(
  tname CHARACTER VARYING (32)
) diststyle ALL;

INSERT INTO req2b SELECT 'req2b' ;

req3.sql
-- SELECT * FROM apps LIMIT 5 ;

DROP TABLE if EXISTS req3;
CREATE temp TABLE req3(
  tname CHARACTER VARYING (32)
) diststyle ALL;

INSERT INTO req3 SELECT 'req3' ;

{'req4.sql': 
**********
name          : 'req4.sql'
parents       : []
children      : ['req2b.sql']
nb of par

In [6]:
task_dag_executor = TaskDagExecutor(10)
task_dag_executor.execute(task_dag)

req4.sql submitted at 1528458766.235134
req1.sql submitted at 1528458766.2360861
req1.sql completed at 1528458766.236819
req2a.sql submitted at 1528458766.236929
req2a.sql completed at 1528458766.237727
req1.sql completed at 1528458766.327271


In [7]:
# check created tables
# time.sleep(10) # wait for queries to finish
# run_query_py(open(sql_dir + 'req5.sql', 'r').read(), connector)
run_query_py_async('SELECT * FROM req1 ;', connector, cursor, prompt=True)

ProgrammingError: relation "req1" does not exist


In [None]:
close_connection(connector, cursor) 

In [None]:
# http://www.dougalmatthews.com/notes/postgres-the-cool-stuff/
import psycopg2
from psycopg2.extras import wait_select

connect_text = "dbname='{}' user='{}' host={} port={} password='{}'".format(
        DBNAME, USER, HOST, PORT, PASSWORD)

aconn = psycopg2.connect(connect_text, async=1)
wait_select(aconn)
acurs = aconn.cursor()
acurs.execute("SELECT 1;")
wait_select(acurs.connection)
acurs.fetchone()[0]