## Lesen der Konfigurationfiles und generierung einer entsprechenden Datenzustruktur zur Steuerung der Data Ingestion 

In [1]:
# pip install datasketch[redis]
# pip install elasticsearch==7.10.0
# pip install neo4j
# pip install pyyaml
# pip install deepdiff

from datetime import datetime
from datasketch import MinHash
from datasketch import MinHashLSH
from datasketch import LeanMinHash
from elasticsearch import Elasticsearch
from redis import StrictRedis
import redis
import json
import pickle
import base64
import numpy as np
import pandas as pd
import logging
import yaml
from yaml.loader import SafeLoader
from dataclasses import dataclass
from deepdiff import DeepDiff
from pprint import pprint
from typing import Dict, Any
import hashlib
from pyspark.sql import SparkSession


pd.set_option('display.max_colwidth', None)
logging.basicConfig(format='%(asctime)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
logger = logging.getLogger()
logger.setLevel(logging.ERROR)
debug = False

def calc_idx_resource(source, schema, dataset):
    return "%s-%s-%s"%(source.replace(' ', '_'), schema.replace(' ', '_'), dataset.replace(' ', '_'))
hadoop_uri = 'hdfs://fonduta.fritz.box:9000'
abs_path = '/sample-data'
date = '2022-11-20'

config = []
config_data = []
hdfs_resources = []

try:
    with open('havel-config.yaml', 'r') as f:
        config = yaml.load(f, Loader=SafeLoader)
        for source in config['sources']:
            for schema in config['sources'][source]:
                url = config['sources'][source][schema]['url']
                datasets = config['sources'][source][schema]['datasets']
                for dataset in datasets:
                    
                    # TODO: needed?
                    dataframe = 0
                    
                    dataset_name = dataset['name']
                    
                    if ('keywords_whitelist' in dataset):
                        keywords_whitelist = dataset['keywords_whitelist']
                    
                    if ('columns_blacklist' in dataset):
                        columns_blacklist = dataset['columns_blacklist']
                    
                    hdfs_uri = f"/{abs_path}/{source}/{schema}/{date}/{dataset_name}.csv"
                    
                    hdfs_res = {
                        'idx': calc_idx_resource(source, schema, dataset_name),
                        'uri': hdfs_uri
                    }
                    
                    if keywords_whitelist is None: keywords_whitelist = []
                    if columns_blacklist is None: columns_blacklist = []
                    config_data.append({
                        'source': source,
                        'schema': schema,
                        'dataset': dataset_name,
                        'url': url,
                        'dataframe': '',
                        'keywords_whitelist': keywords_whitelist,
                        'columns_blacklist': columns_blacklist,
                        'hdfs_uri': hdfs_uri
                    })
                    hdfs_resources.append(hdfs_res)

except yaml.YAMLError as e:
    print("Error in configuration file", e)


Intitializing PySpark ...

Spark Web UI available at http://fonduta.fritz.box:4040
SparkContext available as 'sc' (version = 3.3.1, master = local[*], app id = local-1675607611909)
SparkSession available as 'spark'


In [2]:
!hadoop fs -copyFromLocal ./sample-data/ /

copyFromLocal: `/sample-data/adventureworks-for-postgres/person/2022-11-23/address.csv': File exists


copyFromLocal: `/sample-data/adventureworks-for-postgres/person/2022-11-20/vadditionalcontactinfo.csv': File exists


copyFromLocal: `/sample-data/adventureworks-for-postgres/person/2022-11-20/stateprovince.csv': File exists


copyFromLocal: `/sample-data/adventureworks-for-postgres/person/2022-11-20/phonenumbertype.csv': File exists


copyFromLocal: `/sample-data/adventureworks-for-postgres/person/2022-11-20/personphone.csv': File exists


copyFromLocal: `/sample-data/adventureworks-for-postgres/person/2022-11-20/person.csv': File exists


copyFromLocal: `/sample-data/adventureworks-for-postgres/person/2022-11-20/password.csv': File exists


copyFromLocal: `/sample-data/adventureworks-for-postgres/person/2022-11-20/emailaddress.csv': File exists


copyFromLocal: `/sample-data/adventureworks-for-postgres/person/2022-11-20/countryregion.csv': File exists


copyFromLocal: `/sample-data/

In [6]:
def read_data(i, config_data):
    globals()[f"df_{i}"] = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load(f"{hadoop_uri}{abs_path}/{config_data[i]['source']}/{config_data[i]['schema']}/{date}/{config_data[i]['dataset']}.csv")

for i in range(0, len(config_data)): read_data(i, config_data)
if (debug):
    for i in range(0, len(config_data)): globals()[f"df_{i}"].show()

In [7]:
df_16 = spark.read.format("csv").option("header", "true").load("hdfs://fonduta.fritz.box:9000/sample-data/adventureworks-for-postgres/person/2022-11-23/address.csv")

In [8]:
for i in range(0, len(config_data)):
    config_data[i]['dataframe'] = globals()[f"df_{i}"]

if (debug): config_data[0][4]

## Data Mainteinance 

In [9]:
def calc_idx_content(data):
    return "%s-%s-%s-%s"%(data['source'].replace(' ', '_'), data['schema'].replace(' ', '_'), data['dataset'].replace(' ', '_'), data['attribute'].replace(' ', '_'))

def calc_idx_signature(data):
    return "%s-%s-%s"%(data['source'].replace(' ', '_'), data['schema'].replace(' ', '_'), data['dataset'].replace(' ', '_'))

def get_df_data_types_as_dict(df):
    return df.toPandas().dtypes.apply(lambda x: x.name).to_dict()

def serialize_dict_data_types(data_types):
    return json.dumps(data_types, sort_keys=True)

def deserialize_dict_data_types(ser_df_data_types):
    return  json.loads(ser_df_data_types)

def compare_dicts(dict_a, dict_b):
    return DeepDiff(dict_a, dict_b)


def dict_hash(dictionary: Dict[str, Any]) -> str:
    """MD5 hash of a dictionary."""
    dhash = hashlib.md5()
    encoded = json.dumps(dictionary, sort_keys=True).encode()
    dhash.update(encoded)
    return dhash.hexdigest()


def calc_profile_content(source, schema, dataset, df, columns_blacklist, keywords_whitelist):
    doc = []
    min_hashes = []
    
    for c_n in df.columns:
        if c_n not in columns_blacklist:
            m = MinHash(num_perm=128)
            df_s = df.select(c_n)
            dtype = dict(df.dtypes)[c_n]
            crd = df_s.distinct().count()
            size = df_s.count()
            keywords = []
            
            count_boolean = 0
            for iterator in df_s.collect():
                v = iterator[0]
                
                try:
                    if dtype == 'int':
                        v = str(v).strip()
                        m.update(v.encode('utf8'))
                    elif dtype == 'bool':
                        # do nothing
                        #m.update((str(v)).encode('utf8'))
                        count_boolean = count_boolean + 1 
                        if count_boolean == 1: print("%s: %s", dtype,str(v)) 
                    elif dtype == 'date': 
                        v = v.strftime("%Y-%m-%d %H:%M:%S.%f")
                        m.update(v.encode('utf8'))
                    elif (dtype == 'double'):
                        m.update((str(v)).encode('utf8'))
                    else:
                        if v is not None:
                            m.update(str(v).encode('utf8'))

                    if  c_n in keywords_whitelist and v is not None and v != '[Null]' :
                        keywords.append(v)
                except Exception as e:
                    pprint(e)
                    print("%s: %s", dtype,str(v))
                
            data = {}
            data['source'] = source
            data['schema'] = schema
            data['dataset'] = dataset
            data['attribute'] = c_n
            data['data_type'] = dtype
            data['timestamp'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
            data['size'] = size
            data['cardinality'] = crd
            data['uniqueness'] = crd / size     
            data['min_hash'] =  m.hashvalues
            data['min_hash_seed'] = m.seed
            data['keywords'] = ' '.join(keywords)

            min_hashes.append((calc_idx_content(data), m))
            doc.append(data)

    return doc, min_hashes


def calc_profile_signature(source, schema, dataset, df):
    doc = []
    min_hashes = []
    keywords = []
       
    m = MinHash(num_perm=128)
    for c in df.columns:
        m.update(c.encode('utf8'))
        keywords.append(c)
        
    data = {}
    data['source'] = source
    data['schema'] = schema
    data['dataset'] = dataset
    data['timestamp'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
    data['min_hash'] =  m.hashvalues
    data['min_hash_seed'] = m.seed
    data['data_types'] = serialize_dict_data_types(get_df_data_types_as_dict(df))
    data['keywords'] = ' '.join(keywords)
    
    min_hashes.append((calc_idx_signature(data), m))
    
    data['version'] = calc_version(calc_idx_signature(data), get_df_data_types_as_dict(df))
    
    doc.append(data)
    
    return doc, min_hashes


def calc_version(k, d):
    h = dict_hash(d)
    v = version_cache.get(k)
    version = 0
    
    if debug: print(v)
    if v:
        ser = json.loads(v)
        if debug: print("%s: %s", ser, k)
        version = int(ser['version'])
        if ser['hash'] != h:
            version = version + 1
            if debug: print('Version incremented for key: %s' % k)\
    
    nd = {
        'hash': h,
        'version': version
    }
    
    version_cache.set(k, json.dumps(nd))
        
    return version


def insert_min_hashes(min_hashes, lsh_list):
    for threshold, lsh in lsh_list.items():
        with lsh.insertion_session() as session:
            for key, minhash in min_hashes:
                label = key
                if debug: print(label)
                if debug: print(minhash)
                try:
                    lsh.remove(label)
                except:
                    if debug:  print("Key %s doesn't exist."%label) 
                session.insert(label, minhash)


                
def upsert(env, index, func_calc_idx, doc, v=True):
    for data in doc:
        res = search_engine.index(index=index, id=func_calc_idx(data), body=data)

    search_engine.indices.refresh(index=index)
    
    if v is True:
        res = search_engine.search(size=es_res_max_size, index=index, body={"query": {"match_all": {}}})
        if debug: print("Got %d Hits." % res['hits']['total']['value'])

        
def clear_cache_ns(ns):
    """
    Clears a namespace in redis cache.
    This may be very time consuming.
    :param ns: str, namespace i.e your:prefix*
    :return: int, num cleared keys
    """
    count = 0
    pipe = version_cache.pipeline()
    for key in version_cache.scan_iter(ns):
        pipe.delete(key)
        count += 1
    pipe.execute()
    return count


def clear_es(env):
    # delete all
    indices = [index_content[env], index_signature[env]]
        
    try:
        search_engine.delete_by_query(index=indices, body={"query": {"match_all": {}}})
        search_engine.indices.delete(index=indices, ignore=[400, 404])
    except Exception as e:
        if debug: print("Delete Index: %s", e)
        
def reset_all():
    all_indices = search_engine.indices.get_alias().keys()
    print ("\nAttempting to delete", len(all_indices), "indexes.")

    for _index in all_indices:
        try:
            if "." not in _index: # avoid deleting indexes like `.kibana`
                search_engine.indices.delete(index=_index)
                if debug: print ("Successfully deleted:", _index)
        except Exception as error:
            if debug: print ('indices.delete error:', error, 'for index:', _index)

def create_es_schema_signature(env):
    schema = {
        "mappings": {
            "properties": {
                "source": {
                    "type": "text" # formerly "string"
                },
                "schema": {
                    "type": "text" # formerly "string"
                },
                "dataset": {
                    "type": "text" # formerly "string"
                },
                "timestamp": {
                    "type": "date",
                    "format": "yyyy-MM-dd HH:mm:ss.SSSSSS"
                    # data format for Python's datetime.now() method
                },
                "min_hash": {
                    "ignore_malformed": "true",
                    "type": "integer"
                },
                "min_hash_seed": {
                    "type": "integer"
                },
                "data_types": {
                    "type": "text"
                },
                "version": {
                    "type": "integer"
                },
                "keywords": {
                    "type": "text"
                }
            }
        }
    }

    resp = search_engine.indices.create(
        index = index_signature[env],
        body = schema
    )
    
    if debug: print ("_mapping response:", json.dumps(resp, indent=4), "\n")

    
def create_es_schema_content(env):
    schema = {
        "mappings": {
            "properties": {
                "source": {
                    "type": "text" # formerly "string"
                },
                "schema": {
                    "type": "text"
                },
                "dataset": {
                    "type": "text"
                },
                "attribute": {
                    "type": "text"
                },
                "data_type": {
                    "type": "text"
                },
                 "timestamp": {
                    "type": "date",
                    "format": "yyyy-MM-dd HH:mm:ss.SSSSSS"
                    # data format for Python's datetime.now() method
                },
                "size": {
                    "type": "integer"
                },
                "cardinality": {
                    "type": "float"
                },
                "uniqueness": {
                    "type": "float"
                },
                "min_hash": {
                    "ignore_malformed": "true",
                    "type": "integer"
                },
                "min_hash_seed": {
                    "type": "integer"
                },
                "keywords": {
                    "type": "text"
                }
            }
        }
    }

    resp = search_engine.indices.create(
        index = index_content[env],
        body = schema
    )
    
    if debug: print ("_mapping response:", json.dumps(resp, indent=4), "\n")


def create_profiles_content(source, schema, dataset, df, columns_blacklist, keywords_whitelist, env, lsh_list):
    doc, min_hashes = calc_profile_content(source, schema, dataset, df, columns_blacklist, keywords_whitelist)
    insert_min_hashes(min_hashes, lsh_list)
    index = index_content[env]
    
    try:
        upsert(env, index, calc_idx_content, doc)
    except Exception as e:
        print("something didn't work", doc, "\n", str(e))
        

def create_profiles_signature(source, schema, dataset, df, env, lsh_list):
    doc, min_hashes = calc_profile_signature(source, schema, dataset, df)
    insert_min_hashes(min_hashes, lsh_list)
    index = index_signature[env]
    
    try:
        upsert(env, index, calc_idx_signature, doc)
    except Exception as e:
        print("something didn't work", doc, "\n", str(e))


def get_nodes_and_edges(env, index, lsh_list):
    res = search_engine.search(size=es_res_max_size, index=index, body={"query": {"match_all": {}}})
    if debug: print("Got %d Hits." % res['hits']['total']['value'])

    relations = []

    for hit in res['hits']['hits']:
        min_hash_hashes = np.array(hit['_source']["min_hash"],  dtype=np.int64)
        min_hash_seed = hit['_source']["min_hash_seed"]

        mt = MinHash(num_perm=128)
        mt.hashvalues = min_hash_hashes
        mt.seed = min_hash_seed
        
        for threshold, lsh in lsh_list.items():
            bucket = lsh.query(mt)
            relations.append((hit['_id'], bucket, str(threshold)))
    return (res['hits']['hits'], relations)


def search_dataset_for_topic(index, topic):
    resp = search_engine.search(
        index= index,
        body={
            "query": {
                "match": {
                    "keywords": topic
                }
            }
        }
    )
    
    res = []
    for r in resp['hits']['hits']:
        res.append({
            'id': r['_id'], 
            'score': r['_score'],
            'source': r['_source']['source'], 
            'schema': r['_source']['schema'], 
            'dataset': r['_source']['dataset'], 
            'attribute': r['_source']['attribute']})
    
    return res


In [10]:
from neo4j import GraphDatabase
from textwrap import dedent

class GraphStorage:

    def __init__(self, uri, user, password, env, debug):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))
        self.env = env
        self.debug = debug
        
    def close(self):
        self.driver.close()

    def insert_column_node(self, idx_col, idx_tab, data):
        with self.driver.session() as session:
            res1 = session.execute_write(self._create_column_node, idx_col, data, self.env)
            res2 = session.execute_write(self._create_column_table_relation, idx_col, idx_tab, self.env)
            if self.debug: print(res1)
            if self.debug: print(res2)
    
    def insert_table_node(self, idx, data):
        with self.driver.session() as session:
            res = session.execute_write(self._create_table_node, idx, data, self.env)
            if self.debug: print(res)
            
    def add_column_column_relation(self, idx_a, idx_b, threshold):
        with self.driver.session() as session:
            res = session.execute_write(self._create_column_column_relation, idx_a, idx_b, threshold, self.env)
            if self.debug: print(res)
    
    def add_table_table_relation(self, idx_a, idx_b, threshold):
        with self.driver.session() as session:
            res = session.execute_write(self._create_table_table_relation, idx_a, idx_b, threshold, self.env)
            if self.debug: print(res)
            
    def remove_self_relations(self):
        with self.driver.session() as session:
            res = session.execute_write(self._delete_self_relations, self.env)
            if self.debug: print(res)     
    
    def delete_all(self):
        with self.driver.session() as session:
            res = session.execute_write(self._delete_all, self.env)
            if self.debug: print(res)
    
    
    def find_related_tables_by_signature_to_2_grade(self, dataset, weight, dastart, daend):
        with self.driver.session() as session:
            res = session.execute_read(self._fetch_related_tables_by_signature_to_2_grade, dataset, weight, dastart, daend, self.env)
            if self.debug: print(res)

            return res
        
    def find_related_tables_by_content_to_1_grad(self, attribute, weight, dastart, daend):
        with self.driver.session() as session:
            res = session.execute_read(self._fetch_related_tables_by_content_to_1_grade, attribute, weight, dastart, daend, self.env)
            if self.debug: print(res)

            return res    
    
    
    @staticmethod
    def _create_column_node(tx, idx, data, env):
        result = tx.run("MERGE (a:Column {idx: $idx, source: $source, schema: $schema, dataset: $dataset, version: $version, env: $env, "
                        "attribute: $attribute, data_type: $data_type, uniqueness: $uniqueness}) "
                        "ON CREATE SET a.createdAt = timestamp(), a.updatetAt = timestamp() "
                        "ON MATCH SET a.updatetAt = timestamp() "
                        "RETURN a.name + ' created as node ' + id(a) ",
                        source=data['source'], schema=data['schema'], dataset=data['dataset'], version=1, timestamp=data['timestamp'],
                        attribute=data['attribute'],  data_type=data['data_type'], cardinality=data['cardinality'],  uniqueness=data['uniqueness'], 
                        env=env, idx=idx)
        
        return result.single()[0]
    
    
    @staticmethod
    def _create_table_node(tx, idx, data, env):
        result = tx.run("MERGE (a:Table{idx: $idx, source: $source, schema: $schema, dataset: $dataset, version: $version, env: $env}) "
                        "ON CREATE SET a.createdAt = timestamp(), a.updatetAt = timestamp() "
                        "ON MATCH SET a.updatetAt = timestamp() "
                        "RETURN a.name + ' created as node ' + id(a) ", 
                        source=data['source'], schema=data['schema'], dataset=data['dataset'], version=data['version'], timestamp=data['timestamp'], 
                        env=env, idx=idx)
        return result.single()[0]
    
    
    @staticmethod
    def _create_table_table_relation(tx, idx_a, idx_b, weight, env):
        result = tx.run("MATCH (a:Table {idx: $idx_a, env: $env}), (b:Table {idx: $idx_b, env: $env}) "
                        "WHERE a <> b "
                        "MERGE (a)-[r:IS_SIMILAR_TO {weight: $weight}]-(b) "
                        "ON CREATE SET r.createdAt = timestamp() , r.updatetAt = timestamp()"
                        "ON MATCH SET r.updatetAt = timestamp() "
                        "RETURN 'relation ' + type(r) + ' created between ' + a.dataset + ' and ' + b.dataset",
                        idx_a=idx_a, idx_b=idx_b, weight=weight, env=env)
        
        entire_result = []
        for record in result:
            entire_result.append(record[0])
            
        return entire_result
    
    @staticmethod
    def _create_column_table_relation(tx, idx_col, idx_tab, env):
        result = tx.run("MATCH (t:Table {idx: $idx_tab, env: $env}) "
                        "WITH max(t.version) AS maximum "
                        "MATCH (c:Column {idx: $idx_col, env: $env}), (t:Table {idx: $idx_tab, env: $env}) "
                        "WHERE t.version = maximum "
                        "MERGE (c)-[r:IS_ATTRIBUTE_OF {weight: '1.0'}]->(t) "
                        "ON CREATE SET r.createdAt = timestamp(), r.updatetAt = timestamp() "
                        "ON MATCH SET r.updatetAt = timestamp() "
                        "RETURN 'relation ' + type(r) + ' created between ' + c.attribute + ' and ' + t.dataset", 
                        idx_tab=idx_tab, idx_col=idx_col, env=env)
        
        entire_result = []
        for record in result:
            entire_result.append(record[0])
            
        return entire_result

    @staticmethod
    def _create_column_column_relation(tx, idx_a, idx_b, weight, env):
        result = tx.run("MATCH (a:Column {idx: $idx_a, env: $env}), (b:Column {idx: $idx_b, env: $env}) "
                        "WHERE a.dataset <> b.dataset AND a <> b "
                        "MERGE (a)-[r:IS_SIMILAR_TO {weight: $weight}]-(b) "
                        "ON CREATE SET r.createdAt = timestamp(), r.updatetAt = timestamp() "
                        "ON MATCH SET r.updatetAt = timestamp() "
                        "RETURN 'relation ' + type(r) + ' created between ' + a.attribute + ' and ' + b.attribute", 
                        idx_a=idx_a, idx_b=idx_b, env=env, weight=weight)
        
        entire_result = []
        for record in result:
            entire_result.append(record[0])
            
        return entire_result
    
    
    @staticmethod
    def _delete_self_relations(tx, env):
        result = tx.run("MATCH (a {env: $env})-[rel:IS_SIMILAR_TO]->(a) DELETE rel", env=env)
        
        entire_result = []
        for record in result:
            entire_result.append(record[0])
            
        return entire_result
   
    @staticmethod
    def _delete_all(tx, env):
        result = tx.run("MATCH (n {env: $env}) DETACH DELETE n", env=env)
        
        entire_result = []
        for record in result:
            entire_result.append(record[0])
            
        return entire_result
    
    @staticmethod
    def _fetch_related_tables_by_signature_to_2_grade(tx, dataset, weight, dastart, daend, env):
        q = '''
            MATCH (a:Table)-[r1:IS_SIMILAR_TO]-(b:Table) 
            WHERE a.dataset = $dataset AND a.env = $env AND b.env = $env AND r1.weight = $weight
                AND $dastart <= r1.updatetAt AND r1.updatetAt <= $daend
            OPTIONAL MATCH (b:Table)-[r2:IS_SIMILAR_TO]-(c:Table) 
            WHERE c.env = $env AND c <> a AND r2.weight = $weight 
                AND $dastart <= r2.updatetAt AND r2.updatetAt <= $daend
            RETURN a, b, c
        '''
        
        result = tx.run(dedent(q), env=env, dataset=dataset, weight=weight, dastart=dastart, daend=daend)

        entire_result = []
        for record in result:
            entire_result.append((record[0], record[1], record[2]))

        return entire_result
    
    @staticmethod
    def _fetch_related_tables_by_content_to_1_grade(tx, attribute, weight, dastart, daend, env):
        q = '''
            MATCH (a:Column)-[r1:IS_ATTRIBUTE_OF]-(b:Table)<-[r2:IS_ATTRIBUTE_OF]-(c:Column)-[r3:IS_SIMILAR_TO]-(d:Column)-[r4:IS_ATTRIBUTE_OF]->(e:Table)
            WHERE a.attribute = $attribute AND a.env=$env AND b.env=$env AND c.env=$env AND d.env=$env AND e.env=$env AND b <> e AND r3.weight=$weight
                AND $dastart <= r1.updatetAt AND r1.updatetAt <= $daend
                AND $dastart <= r2.updatetAt AND r2.updatetAt <= $daend
                AND $dastart <= r3.updatetAt AND r3.updatetAt <= $daend
                AND $dastart <= r4.updatetAt AND r4.updatetAt <= $daend
            RETURN b, c, d, e
        '''
        
        result = tx.run(dedent(q), env=env, attribute=attribute, weight=weight, dastart=dastart, daend=daend)

        entire_result = []
        for record in result:
            entire_result.append((record[0], record[1], record[2], record[3]))

        return entire_result


In [16]:
env = 'dev'
reset_on_restart = True
debug = False

index_content = {
    'dev': 'dev-havel-index-content',
    'test': 'test-havel-index-content',
    'prod': 'prod-havel-index-content'
}

index_signature = {
    'dev': 'dev-havel-index-signature',
    'test': 'test-havel-index-signature',
    'prod': 'prod-havel-index-signature'
}

es_res_max_size = '10000'

# cache_provider = StrictRedis()
search_engine = Elasticsearch("http://192.168.178.52:9200")
graph = GraphStorage("bolt://192.168.178.52:7687", "neo4j", "password", env, debug)

pool = redis.ConnectionPool(host='192.168.178.52', port=6379, db=4)
version_cache = redis.Redis(connection_pool=pool)


lsh = {
    '0.5':  MinHashLSH(
        threshold=0.5, num_perm=128, storage_config={
        'type': 'redis',
        'basename': bytearray(index_content[env], 'utf-8'),
        'redis': {'host': '192.168.178.52', 'port': 6379, 'db': 0}
    }),
    '0.7': MinHashLSH(
        threshold=0.7, num_perm=128, storage_config={
        'type': 'redis',
        'basename': bytearray(index_content[env], 'utf-8'),
        'redis': {'host': '192.168.178.52', 'port': 6379, 'db': 1}
    }),
    '0.9': MinHashLSH(
        threshold=0.9, num_perm=128, storage_config={
        'type': 'redis',
        'basename': bytearray(index_content[env], 'utf-8'),
        'redis': {'host': '192.168.178.52', 'port': 6379, 'db': 2}
     })
}

lsh_sig = {
    '0.8': MinHashLSH(
        threshold=0.8, num_perm=128, storage_config={
        'type': 'redis',
        'basename': bytearray(index_signature[env], 'utf-8'),
        'redis': {'host': '192.168.178.52', 'port': 6379, 'db': 3}
     })
}

def reset_storage_system(verbose=True):
    clear_cache_ns('db0')
    clear_cache_ns('db1')
    clear_cache_ns('db2')
    clear_cache_ns('db3')
    
    clear_es(env)
    create_es_schema_content(env)
    create_es_schema_signature(env)
    
    graph.delete_all()
    
    if (verbose): print("system storage reset completed")

def create_profiles(verbose=True):
    for dataset in config_data:
        create_profiles_signature(dataset['source'], dataset['schema'], dataset['dataset'], dataset['dataframe'], env, lsh_sig)
    if (verbose): print("created signature profiles")

    for dataset in config_data:
        create_profiles_content(dataset['source'], dataset['schema'], dataset['dataset'], dataset['dataframe'], dataset['columns_blacklist'], dataset['keywords_whitelist'], env, lsh)
    if (verbose): print("created content profiles")

    
def store_profiles(verbose=True):
    # Create Table nodes and their relations - if something has changed, a new node is added
    nodes, edges = get_nodes_and_edges(env, index_signature[env], lsh_sig)
    for hit in nodes: graph.insert_table_node(hit['_id'], hit['_source'])
    for k,b,th in edges:
        for v in b: 
            if debug: print("key: %s - bucket: %s - threshold: %s"%(k, b, th))
            graph.add_table_table_relation(k,v,th)
    
    if (verbose): print("created dataset nodes and their relationships")
    
    # Create Column nodes and their relations - if something has changed, a new node is added
    nodes, edges = get_nodes_and_edges(env, index_content[env], lsh)
    for hit in nodes: graph.insert_column_node(hit['_id'], calc_idx_signature(hit['_source']), hit['_source'])
    for k,b,th in edges:
        for v in b: 
            if debug: print("key: %s - bucket: %s - threshold: %s"%(k, b, th))
            graph.add_column_column_relation(k,v,th)
    
    if (verbose): print("created attribute nodes and their relationships")
    
    # Cleanup
    graph.remove_self_relations()
    graph.close()
    
    if (verbose): print("cleaned up")



In [14]:
if (reset_on_restart):
    reset_storage_system()

create_profiles()

store_profiles()


23/02/05 15:39:27 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
created signature profiles
created content profiles


## Data Discovery

In [30]:
import time
from datetime import datetime
import pandas as pd
from IPython.display import display

graph = GraphStorage("bolt://192.168.178.52:7687", "neo4j", "password", env, False)
search_engine = Elasticsearch("http://192.168.178.52:9200")
hdfs_url = "hdfs://fonduta.fritz.box:9000"
debug = False

def ts_to_str(ts):
    date_time = datetime.fromtimestamp(int(ts / 1000))
    d = date_time.strftime("%m/%d/%Y, %H:%M:%S")
    return d


def related_nodes_by_content_to_str(node):
     return (str(node[0]['dataset']) + ' version: '+ str(node[0]['version']) + ' -> (' + str(node[1]['attribute']) + ' - ' + str(node[2]['attribute']) + ') <- ' + str(node[3]['dataset']) + ' version: '+ str(node[3]['version']) + '\n')
            

def discover_datasets(topic, dastart, daend, treshold_signature='0.9', treshold_content='0.7', verbose=True):
    if (verbose): print('Dataset Discovery for topic %s, from date %s to date %s\n\n' % (topic, ts_to_str(dastart), ts_to_str(daend)))
    
    # find datasets for topic over keywords with elasticsearch
    hits_for_topic = search_dataset_for_topic(index_content[env], topic)
    
    # find related datasets with neo4j
    results = []
    for hit in hits_for_topic:
        related_nodes_by_signature = []
        related_nodes_by_signature_to_2_grade = graph.find_related_tables_by_signature_to_2_grade(hit['dataset'], treshold_signature, dastart, daend)
        for node in related_nodes_by_signature_to_2_grade: 
            related_nodes_by_signature.append({
                'source_dataset': (node[0]['idx'], node[0]['source'], node[0]['schema'], node[0]['dataset'], node[0]['version']),
                'f_grad_dataset': (node[1]['idx'], node[1]['source'], node[1]['schema'], node[1]['dataset'], node[1]['version']),
                's_grad_dataset': (node[2]['idx'], node[2]['source'], node[2]['schema'], node[2]['dataset'], node[2]['version'])
            })
            
        related_nodes_by_content = []
        related_nodes_by_content_to_1_grad = graph.find_related_tables_by_content_to_1_grad(hit['attribute'], treshold_content, dastart, daend)

        for node in related_nodes_by_content_to_1_grad: 
            related_nodes_by_content.append({
                'source_dataset': (node[0]['idx'], node[0]['source'], node[0]['schema'], node[0]['dataset'], node[0]['version']),
                'source_attribute': (node[1]['idx'], node[1]['source'], node[0]['schema'], node[1]['dataset'], node[1]['attribute'], node[1]['version']),
                'target_attribute': (node[2]['idx'], node[2]['source'], node[2]['schema'], node[2]['dataset'], node[2]['attribute'], node[2]['version']),
                'target_dataset': (node[3]['idx'], node[3]['source'], node[3]['schema'], node[3]['dataset'], node[3]['version'])
            })
            
            related_nodes_by_content_to_str(node)
        
        results.append({'hit': hit,
                        'related_nodes_by_signature': related_nodes_by_signature,
                        'related_nodes_by_content': related_nodes_by_content
                       })
        
    return results


def create_ds_attribute_node(d):
    return {
        'idx': d[0],
        'source': d[1],
        'schema': d[2],
        'dataset': d[3],
        'attribute': d[4],
        'version':  d[5]
    }


def create_ds_dataset_node(d):
    return {
        'idx': d[0],
        'source': d[1],
        'schema': d[2],
        'dataset': d[3],
        'version':  d[4],
        'uri': ''
    }


def generate_queries_for_discovered_datasets(discovered_datasets):
    queries = {}
    for hit in discovered_datasets:
        for related_node_by_content in hit['related_nodes_by_content']:    
            data_x_q = {
                'source': create_ds_dataset_node(related_node_by_content['source_dataset']),
                'join_a_s': create_ds_attribute_node(related_node_by_content['source_attribute']), 
                'join_a_t': create_ds_attribute_node(related_node_by_content['target_attribute']), 
                'target': create_ds_dataset_node(related_node_by_content['target_dataset']),
            }

            idx_source = calc_idx_resource(data_x_q['source']['source'], data_x_q['source']['schema'], data_x_q['source']['dataset'])
            hdfs_uri_source = next(item['uri'] for item in hdfs_resources if item['idx'] == idx_source)
            data_x_q['source']['uri'] = hdfs_url + hdfs_uri_source

            idx_target = calc_idx_resource(data_x_q['target']['source'], data_x_q['target']['schema'], data_x_q['target']['dataset'])
            hdfs_uri_target = next(item['uri'] for item in hdfs_resources if item['idx'] == idx_target)
            data_x_q['target']['uri'] = hdfs_url + hdfs_uri_target

            q = f'''
                SELECT * FROM {data_x_q['source']['dataset']} a JOIN {data_x_q['target']['dataset']} b ON (a.{data_x_q['join_a_s']['attribute']} = b.{data_x_q['join_a_t']['attribute']})
            '''

            queries[q.strip()] = {
                    'source_name': data_x_q['source']['dataset'].strip(),
                    'source_uri': data_x_q['source']['uri'].strip(),
                    'source_attribute_name': data_x_q['join_a_s']['attribute'].strip(),
                    'target_name': data_x_q['target']['dataset'].strip(),
                    'target_uri': data_x_q['target']['uri'].strip(),
                    'target_attribute_name': data_x_q['join_a_t']['attribute'].strip()
            }
    
    return queries

def exec_query(query, context):
    #if debug: print(f'{queries[i]['source_name']} {queries[i]['source_uri']} {queries[i]['target_name']} {queries[i]['target_uri']}')
    
    df_source = spark.read.format('csv').option('header', 'true').load(context['source_uri'])
    df_target = spark.read.format('csv').option('header', 'true').load(context['target_uri'])

    df_source.createOrReplaceTempView(context['source_name'])
    df_target.createOrReplaceTempView(context['target_name'])
    
    if debug: df_source.show()
    if debug: df_target.show()
    
    df_res = None
    try:
        pprint(query)
        df_res = spark.sql(query)
    except:
        print('Something didn\'t work executing this query: %s ' % query)
    
    return df_res


def exec_query_by_number(n):
    query = list(queries.keys())[n]
    return exec_query(query, queries[query])


def get_pd_dataframe_of_results(queries):
    sources_datasets = [i['source_name'] for i in queries.values()]
    sources_attributes = [i['source_attribute_name'] for i in queries.values()]
    targets_datasets = [i['target_name'] for i in queries.values()]
    targets_attributes = [i['target_attribute_name'] for i in queries.values()]

    table_q = pd.DataFrame(zip(queries.keys(), sources_datasets, sources_attributes, targets_attributes, targets_datasets), columns=["Query", "Dataset a", "Attribute a", "Attribute b", "Dataset b"])
    table_oq = pd.DataFrame(zip(sources_datasets, sources_attributes, targets_attributes, targets_datasets), columns=["Dataset a", "Attribute a", "Attribute b", "Dataset b"])
    
    return table_q, table_oq


In [31]:
def exec_experiment(topic, dastart, daend, treshold_signature, treshold_content):
    discovered_datasets = discover_datasets(topic=topic, dastart=dastart, daend=daend, treshold_signature=treshold_signature, treshold_content=treshold_content)
    queries = generate_queries_for_discovered_datasets(discovered_datasets)
    results_dataframe_q, results_dataframe_oq = get_pd_dataframe_of_results(queries)
    display(results_dataframe_q)
    display(results_dataframe_oq)
    return results_dataframe_q

In [34]:
# Eminhizer
# Firestone Drive
topic = "Shoop"
treshold_signature='0.9'
treshold_content='0.5'

daend = int(datetime.timestamp(datetime.now())*1000)
a = datetime(2022, 1, 29, 12, 12, 12)
dastart = int(datetime.timestamp(a)*1000)

res_exp_01 = exec_experiment(topic, dastart, daend, treshold_signature, treshold_content)

Dataset Discovery for topic Shoop, from date 01/29/2022, 12:12:12 to date 02/05/2023, 17:43:32




Unnamed: 0,Query,Dataset a,Attribute a,Attribute b,Dataset b
0,SELECT * FROM person a JOIN personphone b ON (a.emailpromotion = b.phonenumbertypeid),person,emailpromotion,phonenumbertypeid,personphone
1,SELECT * FROM person a JOIN phonenumbertype b ON (a.emailpromotion = b.phonenumbertypeid),person,emailpromotion,phonenumbertypeid,phonenumbertype
2,SELECT * FROM person a JOIN HRDataset_generated b ON (a.emailpromotion = b.DaysLateLast30),person,emailpromotion,DaysLateLast30,HRDataset_generated
3,SELECT * FROM person a JOIN HRDataset_generated b ON (a.emailpromotion = b.Termd),person,emailpromotion,Termd,HRDataset_generated
4,SELECT * FROM person a JOIN HRDataset_generated b ON (a.emailpromotion = b.FromDiversityJobFairID),person,emailpromotion,FromDiversityJobFairID,HRDataset_generated
5,SELECT * FROM person a JOIN HRDataset_generated b ON (a.emailpromotion = b.GenderID),person,emailpromotion,GenderID,HRDataset_generated
6,SELECT * FROM person a JOIN HRDataset_generated b ON (a.emailpromotion = b.MaritalStatusID),person,emailpromotion,MaritalStatusID,HRDataset_generated
7,SELECT * FROM person a JOIN HRDataset_generated b ON (a.emailpromotion = b.MarriedID),person,emailpromotion,MarriedID,HRDataset_generated
8,SELECT * FROM person a JOIN HRDataset_v14 b ON (a.emailpromotion = b.Termd),person,emailpromotion,Termd,HRDataset_v14
9,SELECT * FROM person a JOIN HRDataset_v14 b ON (a.emailpromotion = b.FromDiversityJobFairID),person,emailpromotion,FromDiversityJobFairID,HRDataset_v14


Unnamed: 0,Dataset a,Attribute a,Attribute b,Dataset b
0,person,emailpromotion,phonenumbertypeid,personphone
1,person,emailpromotion,phonenumbertypeid,phonenumbertype
2,person,emailpromotion,DaysLateLast30,HRDataset_generated
3,person,emailpromotion,Termd,HRDataset_generated
4,person,emailpromotion,FromDiversityJobFairID,HRDataset_generated
5,person,emailpromotion,GenderID,HRDataset_generated
6,person,emailpromotion,MaritalStatusID,HRDataset_generated
7,person,emailpromotion,MarriedID,HRDataset_generated
8,person,emailpromotion,Termd,HRDataset_v14
9,person,emailpromotion,FromDiversityJobFairID,HRDataset_v14


In [35]:
# Eminhizer
# Firestone Drive
topic = "Shoop"

daend = int(datetime.timestamp(datetime.now())*1000)
a = datetime(2022, 1, 29, 12, 12, 12)
dastart = int(datetime.timestamp(a)*1000)
treshold_signature='0.9'
treshold_content='0.7'

res_exp_02 = exec_experiment(topic, dastart, daend, treshold_signature, treshold_content)

Dataset Discovery for topic Shoop, from date 01/29/2022, 12:12:12 to date 02/05/2023, 17:43:43




Unnamed: 0,Query,Dataset a,Attribute a,Attribute b,Dataset b
0,SELECT * FROM person a JOIN vadditionalcontactinfo b ON (a.middlename = b.middlename),person,middlename,middlename,vadditionalcontactinfo
1,SELECT * FROM person a JOIN vadditionalcontactinfo b ON (a.firstname = b.firstname),person,firstname,firstname,vadditionalcontactinfo
2,SELECT * FROM person a JOIN vadditionalcontactinfo b ON (a.businessentityid = b.businessentityid),person,businessentityid,businessentityid,vadditionalcontactinfo
3,SELECT * FROM person a JOIN personphone b ON (a.businessentityid = b.businessentityid),person,businessentityid,businessentityid,personphone
4,SELECT * FROM person a JOIN password b ON (a.businessentityid = b.businessentityid),person,businessentityid,businessentityid,password
5,SELECT * FROM person a JOIN emailaddress b ON (a.businessentityid = b.emailaddressid),person,businessentityid,emailaddressid,emailaddress
6,SELECT * FROM person a JOIN emailaddress b ON (a.businessentityid = b.businessentityid),person,businessentityid,businessentityid,emailaddress
7,SELECT * FROM person a JOIN businessentity b ON (a.businessentityid = b.businessentityid),person,businessentityid,businessentityid,businessentity
8,SELECT * FROM person a JOIN businessentityaddress b ON (a.businessentityid = b.businessentityid),person,businessentityid,businessentityid,businessentityaddress
9,SELECT * FROM vadditionalcontactinfo a JOIN person b ON (a.middlename = b.middlename),vadditionalcontactinfo,middlename,middlename,person


Unnamed: 0,Dataset a,Attribute a,Attribute b,Dataset b
0,person,middlename,middlename,vadditionalcontactinfo
1,person,firstname,firstname,vadditionalcontactinfo
2,person,businessentityid,businessentityid,vadditionalcontactinfo
3,person,businessentityid,businessentityid,personphone
4,person,businessentityid,businessentityid,password
5,person,businessentityid,emailaddressid,emailaddress
6,person,businessentityid,businessentityid,emailaddress
7,person,businessentityid,businessentityid,businessentity
8,person,businessentityid,businessentityid,businessentityaddress
9,vadditionalcontactinfo,middlename,middlename,person


In [None]:
import matplotlib.pyplot as plt

data = {'C':20, 'C++':15, 'Java':30,
        'Python':35}
courses = list(data.keys())
values = list(data.values())
  
fig = plt.figure(figsize = (10, 5))
 
# creating the bar plot
plt.bar(courses, values, color ='maroon',
        width = 0.4)
 
plt.xlabel("Courses offered")
plt.ylabel("No. of students enrolled")
plt.title("Students enrolled in different courses")
plt.show()

In [51]:
df = exec_query_by_number(1)
df.limit(10).toPandas().head()

('SELECT * FROM person a JOIN vadditionalcontactinfo b ON (a.firstname = '
 'b.firstname)')


Unnamed: 0,businessentityid,persontype,namestyle,title,firstname,middlename,lastname,suffix,emailpromotion,additionalcontactinfo,demographics,rowguid,modifieddate,businessentityid.1,firstname.1,middlename.1,lastname.1,telephonenumber,telephonespecialinstructions,street,city,stateprovince,postalcode,countryregion,homeaddressspecialinstructions,emailaddress,emailspecialinstructions,emailtelephonenumber,rowguid.1,modifieddate.1
0,1,EM,False,,Ken,J,Sánchez,,0,[NULL],"""<IndividualSurvey xmlns=""""http://schemas.microsoft.com/sqlserver/2004/07/adventure-works/IndividualSurvey""""><TotalPurchaseYTD>0</TotalPurchaseYTD></IndividualSurvey>""",92c4279f-1207-48a3-8448-4636514eb7e2,2009-01-07 00:00:00.000,2300,Ken,,Kwok,[NULL],,[NULL],[NULL],[NULL],[NULL],[NULL],[NULL],[NULL],,[NULL],79d54447-9708-4cb7-8094-806f50d75c84,2008-12-22 00:00:00.000
1,1,EM,False,,Ken,J,Sánchez,,0,[NULL],"""<IndividualSurvey xmlns=""""http://schemas.microsoft.com/sqlserver/2004/07/adventure-works/IndividualSurvey""""><TotalPurchaseYTD>0</TotalPurchaseYTD></IndividualSurvey>""",92c4279f-1207-48a3-8448-4636514eb7e2,2009-01-07 00:00:00.000,2140,Ken,,Meyer,[NULL],,[NULL],[NULL],[NULL],[NULL],[NULL],[NULL],[NULL],,[NULL],2c842c92-9626-456c-836b-2df9be36721a,2009-01-09 00:00:00.000
2,1,EM,False,,Ken,J,Sánchez,,0,[NULL],"""<IndividualSurvey xmlns=""""http://schemas.microsoft.com/sqlserver/2004/07/adventure-works/IndividualSurvey""""><TotalPurchaseYTD>0</TotalPurchaseYTD></IndividualSurvey>""",92c4279f-1207-48a3-8448-4636514eb7e2,2009-01-07 00:00:00.000,1726,Ken,,Sánchez,[NULL],,[NULL],[NULL],[NULL],[NULL],[NULL],[NULL],[NULL],,[NULL],029fb95e-f6b6-410a-a889-03fa0e4e7b45,2014-01-06 00:00:00.000
3,1,EM,False,,Ken,J,Sánchez,,0,[NULL],"""<IndividualSurvey xmlns=""""http://schemas.microsoft.com/sqlserver/2004/07/adventure-works/IndividualSurvey""""><TotalPurchaseYTD>0</TotalPurchaseYTD></IndividualSurvey>""",92c4279f-1207-48a3-8448-4636514eb7e2,2009-01-07 00:00:00.000,1525,Ken,,Myer,[NULL],,[NULL],[NULL],[NULL],[NULL],[NULL],[NULL],[NULL],,[NULL],0d7e8d2a-ce67-431d-b66d-4d5a643544fd,2012-01-25 00:00:00.000
4,1,EM,False,,Ken,J,Sánchez,,0,[NULL],"""<IndividualSurvey xmlns=""""http://schemas.microsoft.com/sqlserver/2004/07/adventure-works/IndividualSurvey""""><TotalPurchaseYTD>0</TotalPurchaseYTD></IndividualSurvey>""",92c4279f-1207-48a3-8448-4636514eb7e2,2009-01-07 00:00:00.000,203,Ken,L,Myer,[NULL],,[NULL],[NULL],[NULL],[NULL],[NULL],[NULL],[NULL],,[NULL],981fcdb7-9792-463d-bf1f-f1b4ced52473,2009-02-17 00:00:00.000


In [20]:
# Eminhizer
# Firestone Drive
topic = "Eminhizer"

daend = int(datetime.timestamp(datetime.now())*1000)
a = datetime(2022, 1, 29, 12, 12, 12)
dastart = int(datetime.timestamp(a)*1000)

discovered_datasets = discover_datasets(topic=topic, dastart=dastart, daend=daend, treshold_signature='0.9', treshold_content='0.5')
queries = generate_queries_for_discovered_datasets(discovered_datasets)

results_dataframe = get_pd_dataframe_of_results(queries)
display(results_dataframe)

Dataset Discovery for topic Eminhizer, from date 01/29/2022, 12:12:12 to date 02/05/2023, 16:33:09




Unnamed: 0,Query,Dataset a,Attribute a,Attribute b,Dataset b
0,SELECT * FROM person a JOIN personphone b ON (a.emailpromotion = b.phonenumbertypeid),person,emailpromotion,phonenumbertypeid,personphone
1,SELECT * FROM person a JOIN phonenumbertype b ON (a.emailpromotion = b.phonenumbertypeid),person,emailpromotion,phonenumbertypeid,phonenumbertype
2,SELECT * FROM person a JOIN HRDataset_generated b ON (a.emailpromotion = b.DaysLateLast30),person,emailpromotion,DaysLateLast30,HRDataset_generated
3,SELECT * FROM person a JOIN HRDataset_generated b ON (a.emailpromotion = b.Termd),person,emailpromotion,Termd,HRDataset_generated
4,SELECT * FROM person a JOIN HRDataset_generated b ON (a.emailpromotion = b.FromDiversityJobFairID),person,emailpromotion,FromDiversityJobFairID,HRDataset_generated
5,SELECT * FROM person a JOIN HRDataset_generated b ON (a.emailpromotion = b.GenderID),person,emailpromotion,GenderID,HRDataset_generated
6,SELECT * FROM person a JOIN HRDataset_generated b ON (a.emailpromotion = b.MaritalStatusID),person,emailpromotion,MaritalStatusID,HRDataset_generated
7,SELECT * FROM person a JOIN HRDataset_generated b ON (a.emailpromotion = b.MarriedID),person,emailpromotion,MarriedID,HRDataset_generated
8,SELECT * FROM person a JOIN HRDataset_v14 b ON (a.emailpromotion = b.Termd),person,emailpromotion,Termd,HRDataset_v14
9,SELECT * FROM person a JOIN HRDataset_v14 b ON (a.emailpromotion = b.FromDiversityJobFairID),person,emailpromotion,FromDiversityJobFairID,HRDataset_v14
