In [None]:
from __future__ import division
import pymssql
import pyodbc
import sys
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import logging
import sqlite3

In [None]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

By default uses a SQLite database to store the metadata.
Tables:
* `summary`: Stores data about server, catalog, database and table name and column name. Each row in a column in a table.
* `summary_v2`: Its a summary from summary. Computes number of rows and number of records. Each row is a table in a database.
* `summary_v3`: Per column, it adds the data type of the column and the number `NULL` values and number of distinct data values.
* `summary_v4`: Stores each distinct data value of each column based on a threshould of distinct values (5000 distinct values by default) and has the frequency of the data value. It doesn't store data of `date` types columns.
* `summary_v5`: Same as `summary_v4` but it only stores date types so it simplifies to group and visualise the data over time.

`Level 1` analysis: exhausted search of each data value in the database.
`Level 2` analysis: Only frequency level of the tables.

In [None]:
def close_db_sqlite(db):
    db.close()
    return

def get_db_sqlite(path, db_name):
    db = sqlite3.connect(path + '/' + db_name)
    return db

def create_metadata_db(path, db_name):
    """
    Creates the necessary tables to store metadata about the databases.
    """
    
    sql_summary = '''CREATE TABLE summary (SERVER_NAME TEXT
            , TABLE_CATALOG TEXT
            , TABLE_SCHEMA TEXT
            , TABLE_NAME TEXT
            , COLUMN_NAME TEXT
            , ORDINAL_POSITION INTEGER
            , DATA_TYPE TEXT)'''

    sql_summary_v2 = '''CREATE TABLE summary_v2 (SERVER_NAME TEXT
            , TABLE_CATALOG TEXT
            , TABLE_SCHEMA TEXT
            , TABLE_NAME TEXT
            , N_COLUMNS INTEGER
            , N_ROWS INTEGER)'''

    sql_summary_v3 = '''CREATE TABLE summary_v3 (SERVER_NAME TEXT
            , TABLE_CATALOG TEXT
            , TABLE_SCHEMA TEXT
            , TABLE_NAME TEXT
            , COLUMN_NAME TEXT
            , ORDINAL_POSITION INTEGER
            , DATA_TYPE TEXT
            , DISTINCT_VALUES INTEGER
            , NULL_VALUES INTEGER)'''

    sql_summary_v4 = '''CREATE TABLE summary_v4 (SERVER_NAME TEXT
            , TABLE_CATALOG TEXT
            , TABLE_SCHEMA TEXT
            , TABLE_NAME TEXT
            , COLUMN_NAME TEXT
            , DATA_VALUE TEXT
            , FREQUENCY_NUMBER INTEGER)'''

    sql_summary_v5 = '''CREATE TABLE summary_v5 (SERVER_NAME TEXT
            , TABLE_CATALOG TEXT
            , TABLE_SCHEMA TEXT
            , TABLE_NAME TEXT
            , COLUMN_NAME TEXT
            , DATA_VALUE TEXT
            , FREQUENCY_NUMBER INTEGER)'''

    db = get_db_sqlite(path, db_name)
    cursor = db.cursor()
    cursor.execute(sql_summary)
    cursor.execute(sql_summary_v2)
    cursor.execute(sql_summary_v3)
    cursor.execute(sql_summary_v4)
    cursor.execute(sql_summary_v5)
    db.commit()
    return

Functions to connect to databases

In [None]:
def get_db_connection(string_connection, verbose = False):
    with open(string_connection, 'r') as cs:
        connection_string = cs.read().replace('\n', '')

    connection = pyodbc.connect(connection_string)
    if verbose:
        logger.info('Connection established to {}'.format(string_connection.split('/')[-1]))
        logger.info('Connection string: {}'.format(connection_string))
    return connection

def get_db_cursor(connection):
    return connection.cursor()

def close_db_connection(connection):
    connection.close()
    return

def close_db_cursor(cursor):
    cursor.close()
    return

In [None]:
#connection_params = 'string_connections/ODSQUERY.CorporateDW'
connection_params = 'string_connections/aggrekoanalytics.database.windows.net.AGK_IOT_POC'

conn_source = get_db_connection(connection_params)
cursor_source = get_db_cursor(conn_source)

#cursor_source.close()
#conn_source.close()

In [None]:
conn_metadata = get_db_sqlite('data', 'metadata')
cursor_metadata = conn_metadata.cursor()

In [None]:
def insertOrUpdateSummary(server_name, table_catalog, table_schema, table_name, column_name, ordinal_position, data_type, verbose= False):
    def checkIfTableExistInSummary(server_name, table_catalog, table_schema, table_name, column_name):
        sql = """select * from summary
            WHERE SERVER_NAME = ?
             AND TABLE_CATALOG = ?
             AND TABLE_SCHEMA = ?
             AND TABLE_NAME = ?
             AND COLUMN_NAME = ?;"""
        cursor_metadata.execute(sql, (server_name, table_catalog, table_schema, table_name, column_name))
        return len(cursor_metadata.fetchall())
    
    if checkIfTableExistInSummary(server_name, table_catalog, table_schema, table_name, column_name):
        sql = """delete from summary
                WHERE SERVER_NAME = ?
                 AND TABLE_CATALOG = ?
                 AND TABLE_SCHEMA = ?
                 AND TABLE_NAME = ?
                 AND COLUMN_NAME = ?;"""
        cursor_metadata.execute(sql, (server_name, table_catalog, table_schema, table_name, column_name))
        conn_metadata.commit()
    
    sql = """insert into summary(SERVER_NAME, TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, ORDINAL_POSITION, DATA_TYPE)
             values (?, ?, ?, ?, ?, ?, ?)"""
    cursor_metadata.execute(sql, (server_name, table_catalog, table_schema, table_name, column_name, ordinal_position, data_type))
    conn_metadata.commit()
    if verbose:
        logger.info('{}.{}.{}.{}.{} has been updated into summary...'.format(server_name, table_catalog, table_schema, table_name, column_name))
    return

In [None]:
SERVER = 'aggrekoanalytics.database.windows.net'
sql = """SELECT ? AS SERVER_NAME
        , TABLE_CATALOG
        , TABLE_SCHEMA
        , TABLE_NAME
        , COLUMN_NAME
        , ORDINAL_POSITION
        , DATA_TYPE
    FROM INFORMATION_SCHEMA.COLUMNS;"""

cursor_source.execute(sql, SERVER)
rows = cursor_source.fetchall()

for row in rows:
    server_name, table_catalog, table_schema, table_name, column_name, ordinal_position, data_type = row
    insertOrUpdateSummary(server_name, table_catalog, table_schema, table_name, column_name, ordinal_position, data_type)

In [None]:
cursor_metadata.execute("select * from summary_v2;")
res = cursor_metadata.fetchall()
for r in res:
    print(r)

In [None]:
def insertOrUpdateSummaryV2(server_name, table_catalog, table_schema, table_name, verbose = False, ignore_views = True):
    """
    Stores the number of columns and the number of rows of the table.
    Each row is one table.
    """
    def checkIfTableExistInSummaryV2(server_name, table_catalog, table_schema, table_name):
        sql = """select * from summary_v2
            WHERE SERVER_NAME = ?
             AND TABLE_CATALOG = ?
             AND TABLE_SCHEMA = ?
             AND TABLE_NAME = ?;"""
        cursor_metadata.execute(sql, (server_name, table_catalog, table_schema, table_name))
        return len(cursor_metadata.fetchall())
    
    def updateNumberOfRows(server_name, table_catalog, table_schema, table_name):
        query = """select count(*) as n from {}.{}.{}""".format(table_catalog, table_schema, table_name)
        cursor_source.execute(query)
        num_rows = cursor_source.fetchone()

        sql_update = """UPDATE summary_v2 
                        SET N_ROWS = ? 
                        WHERE SERVER_NAME = ?
                         AND TABLE_CATALOG = ?
                         AND TABLE_SCHEMA = ?
                         AND TABLE_NAME = ?;"""
        cursor_metadata.execute(sql_update, (num_rows[0], server_name, table_catalog, table_schema, table_name))
        conn_metadata.commit()
        return 
    
    def updateNumberOfColumns(server_name, table_catalog, table_schema, table_name):
        sql = """SELECT ? AS SERVER_NAME
        , TABLE_CATALOG
        , TABLE_SCHEMA
        , TABLE_NAME
        , COUNT(*) AS N_COLUMNS
        , CAST(NULL as INTEGER) AS N_ROWS
        FROM INFORMATION_SCHEMA.COLUMNS
        WHERE TABLE_CATALOG = ?
         AND TABLE_SCHEMA = ?
         AND TABLE_NAME = ?
        GROUP BY TABLE_CATALOG
            , TABLE_SCHEMA
            , TABLE_NAME
        ORDER BY 1,2,3,4;"""
        cursor_source.execute(sql, (server_name, table_catalog, table_schema, table_name))
        rows = cursor_source.fetchall()
        sql_insert = """insert into summary_v2 (SERVER_NAME, TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, N_COLUMNS, N_ROWS)
                        values (?, ?, ?, ?, ?, ?);"""
        for row in rows:
            cursor_metadata.execute(sql_insert, (row[0], row[1], row[2], row[3], row[4], row[5]))
            conn_metadata.commit()
        return
    
    if checkIfTableExistInSummaryV2:
        sql = """delete from summary_v2
                WHERE SERVER_NAME = ?
                 AND TABLE_CATALOG = ?
                 AND TABLE_SCHEMA = ?
                 AND TABLE_NAME = ?;"""
        cursor_metadata.execute(sql, (server_name, table_catalog, table_schema, table_name))
        conn_metadata.commit()
        
    updateNumberOfColumns(server_name, table_catalog, table_schema, table_name)
    updateNumberOfRows(server_name, table_catalog, table_schema, table_name)
        
    if verbose:
        logger.info('{}.{}.{}.{} updated into summary_v2...'.format(server_name, table_catalog, table_schema, table_name))
        
    return

In [None]:
sql = """select distinct SERVER_NAME 
            , TABLE_CATALOG 
            , TABLE_SCHEMA 
            , TABLE_NAME
            from summary
            where SERVER_NAME = ?;"""
cursor_metadata.execute(sql, (SERVER,))
rows = cursor_metadata.fetchall()
for row in rows:
    insertOrUpdateSummaryV2(row[0],row[1],row[2],row[3], verbose = True)