In [4]:
import psycopg2
import psycopg2.extras
from optparse import OptionParser, OptionGroup
import sys

In [5]:
# Connect to an existing database
conn1 = psycopg2.connect("dbname=DB-project user=postgres password=admin")
conn2 = psycopg2.connect("dbname=DB-project-ETL user=postgres password=admin")

In [6]:
cur1 = conn1.cursor()
cur2 = conn2.cursor()

conn1.autocommit = True
conn2.autocommit = True

# Pipeline

In [97]:
def extract_pk(cursor, table):
    cursor.execute("""
    SELECT c.column_name, c.data_type
    FROM information_schema.table_constraints tc 
    JOIN information_schema.constraint_column_usage AS ccu USING (constraint_schema, constraint_name) 
    JOIN information_schema.columns AS c ON c.table_schema = tc.constraint_schema
      AND tc.table_name = c.table_name AND ccu.column_name = c.column_name
    WHERE constraint_type = 'PRIMARY KEY' and tc.table_name = '{table}';
    """.format(table=table))

def extract_pk_values(cursor, table, pk):
    cursor.execute("""
    SELECT {pk}
    FROM {table}
    """.format(table=table, pk=pk))
    
def table_allrows(cursor, table):
    cursor.execute("""
    SELECT *
    FROM {table};
    """.format(table=table))

def insert_warehouse(cursor, value, table, pk, pk_value):
    cursor.execute("""
    DELETE FROM {table}
    WHERE {pk}={pk_value};
    INSERT INTO {table}
    VALUES {value}
    """.format(table=table, value=value, pk=pk, pk_value=pk_value))
    
def delete_warehouse(cursor, table, pk, pk_values):
    cursor.execute("""
    DELETE FROM {table}
    WHERE {pk} NOT IN {pk_values}
    """.format(table=table, pk=pk, pk_values=pk_values))
    
def record_value(records):
    pk_value = records[0]
    if type(pk_value) == str:
        pk_value = "'" + pk_value + "'"
    value = '(' + str(pk_value)
    for record in records[1:]:
        if record == None:
            record = 'NULL'
        elif type(record) != int:
            record = "'{}'".format(record)
        value += ', ' + str(record)
    value += ')'
    if len(records) == 1:
        value = value[:-1] + ')'
    return pk_value, value

def key_pk_values(keys):
    pk_values = '('
    for key in keys:
        if type(key[0]) == str:
            pk_values += "'{}', ".format(key[0])
        else:
            pk_values += '{}, '.format(key[0])
    pk_values = pk_values[:-2]
    pk_values +=')'
    return pk_values

In [98]:
def pipeline(table):
    # insert and update 
    with conn1.cursor() as cursor1:
        table_allrows(cursor1, table)
        records = cursor1.fetchone()
#         print(str(records[1]))
        with conn2.cursor() as cursor2:
            extract_pk(cursor2, table)
            pk = cursor2.fetchone()[0]
        while records != None:
            pk_value, value = record_value(records)
#             print(value)
            with conn2.cursor() as cursor2:
                try:
                    insert_warehouse(cursor2, value, table, pk, pk_value)
                    conn2.commit()
                except Exception as e:
                    conn2.rollback()
                    raise
            records = cursor1.fetchone()
    # delete
    with conn1.cursor() as cursor1:
        extract_pk_values(cursor1, table, pk)
        keys = cursor1.fetchall()
        pk_values = key_pk_values(keys)
        if pk_values==' ) ': # No data in the table
            with conn2.cursor() as cursor2:
                delete_warehouse(cursor2, table, pk, pk_values)
                conn2.commit()

In [99]:
pipeline('languageb')

In [79]:
# type(2)

int

# Dependencies

In [39]:
def writedeps(cursor, tbl, dependency):
    sql = """SELECT
        tc.constraint_name, tc.table_name, kcu.column_name,
        ccu.table_name AS foreign_table_name,
        ccu.column_name AS foreign_column_name
    FROM
        information_schema.table_constraints AS tc
    JOIN information_schema.key_column_usage AS kcu ON
        tc.constraint_name = kcu.constraint_name
    JOIN information_schema.constraint_column_usage AS ccu ON
        ccu.constraint_name = tc.constraint_name
    WHERE constraint_type = 'FOREIGN KEY' AND tc.table_name = '%s'"""
    cursor.execute(sql % tbl)
    for row in cursor.fetchall():
        constraint, table, column, foreign_table, foreign_column = row
        print('{} -> {} [label={}];'.format(tbl, foreign_table, constraint))
        dependency.append([tbl, foreign_table])
    return dependency


def get_tables(cursor):
    cursor.execute("SELECT tablename FROM pg_tables WHERE schemaname='public'")
    for row in cursor.fetchall():
        yield row[0]

    

In [40]:
print("Digraph F {\n")
print('ranksep=1.0; size="18.5, 15.5"; rankdir=LR;')
dependency = []
with conn1.cursor() as cursor1:
    for i in get_tables(cursor1):
        dependency = writedeps(cursor1, i, dependency)
    print("}")

Digraph F {

ranksep=1.0; size="18.5, 15.5"; rankdir=LR;
book -> person [label=book_main_author_fkey];
book -> languageb [label=book_main_language_fkey];
book -> genre [label=book_main_genre_fkey];
user_library -> person [label=user_library_person_id_fkey];
written_by -> person [label=written_by_person_id_fkey];
written_by -> book [label=written_by_book_id_fkey];
translated_by -> person [label=translated_by_person_id_fkey];
translated_by -> book [label=translated_by_book_id_fkey];
language_book -> languageb [label=language_book_languageb_fkey];
language_book -> book [label=language_book_book_id_fkey];
genre_book -> genre [label=genre_book_genre_fkey];
genre_book -> book [label=genre_book_book_id_fkey];
borrowed -> user_library [label=borrowed_user_id_fkey];
borrowed -> book [label=borrowed_book_id_fkey];
}


In [43]:
print(dependency)

[['book', 'person'], ['book', 'languageb'], ['book', 'genre'], ['user_library', 'person'], ['written_by', 'person'], ['written_by', 'book'], ['translated_by', 'person'], ['translated_by', 'book'], ['language_book', 'languageb'], ['language_book', 'book'], ['genre_book', 'genre'], ['genre_book', 'book'], ['borrowed', 'user_library'], ['borrowed', 'book']]


# Topological sort

In [88]:
def tables(cursor):
    cursor.execute("SELECT tablename FROM pg_tables WHERE schemaname='public'")
    tables = []
    for row in cursor.fetchall():
        tables.append(row[0])
    return tables

with conn1.cursor() as cursor1:
    tables = tables(cursor1)
    print('list of tables:\n', tables)

list of tables:
 ['person', 'book', 'languageb', 'genre', 'user_library', 'written_by', 'translated_by', 'language_book', 'genre_book', 'borrowed']


In [89]:
print(tables.index('book'))

1


In [92]:
#Python program to print topological sorting of a DAG
from collections import defaultdict

#Class to represent a graph
class Graph:
    def __init__(self,vertices):
        self.graph = defaultdict(list) #dictionary containing adjacency List
        self.V = vertices #No. of vertices

    # function to add an edge to graph
    def addEdge(self,u,v):
        self.graph[u].append(v)

    # A recursive function used by topologicalSort
    def topologicalSortUtil(self,v,visited,stack):

        # Mark the current node as visited.
        visited[v] = True

        # Recur for all the vertices adjacent to this vertex
        for i in self.graph[v]:
            if visited[i] == False:
                self.topologicalSortUtil(i,visited,stack)

        # Push current vertex to stack which stores result
        stack.insert(0,v)

    # The function to do Topological Sort. It uses recursive
    # topologicalSortUtil()
    def topologicalSort(self):
        # Mark all the vertices as not visited
        visited = [False]*self.V
        stack =[]

        # Call the recursive helper function to store Topological
        # Sort starting from all vertices one by one
        for i in range(self.V):
            if visited[i] == False:
                self.topologicalSortUtil(i,visited,stack)

        # Print contents of stack
        stack = stack[::-1]
        print(stack)
        return stack

g= Graph(len(tables))
for dep in dependency:
    g.addEdge(tables.index(dep[0]), tables.index(dep[1]))

print("Following is a Topological Sort of the given graph")
stack = g.topologicalSort()
#This code is contributed by Neelam Yadav


Following is a Topological Sort of the given graph
[0, 2, 3, 1, 4, 5, 6, 7, 8, 9]


In [93]:
def stack_to_sort_tables(stack, tables):
    sort_table = []
    for s in stack:
        sort_table.append(tables[s])
    return sort_table

In [94]:
sort_table = stack_to_sort_tables(stack, tables)
print(sort_table)

['person', 'languageb', 'genre', 'book', 'user_library', 'written_by', 'translated_by', 'language_book', 'genre_book', 'borrowed']


# ETL

In [101]:
for table in sort_table:
    pipeline(table)

ForeignKeyViolation: update or delete on table "person" violates foreign key constraint "book_main_author_fkey" on table "book"
DETAIL:  Key (person_id)=(1) is still referenced from table "book".
