In [15]:
%%bash
# pip install datasketch[redis]
# pip install elasticsearch==7.10.0
# pip install neo4j

In [1]:
%%spark -o df_1

df_1 = spark.read.format("csv").option("header", "true").load("/grades.csv")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
14,application_1668674056008_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
%%spark -o df_2

df_2 = spark.read.format("csv").option("header", "true").load("/grades.csv")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
%%spark -o df_3

df_3 = spark.read.format("csv").option("header", "true").load("/grades.csv")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
%%spark 
df_1.show()
df_2.show()
df_3.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----------+-----------+-----+-----+-----+-----+-----+-----+
|Last name|First name|        SSN|Test1|Test2|Test3|Test4|Final|Grade|
+---------+----------+-----------+-----+-----+-----+-----+-----+-----+
|  Alfalfa|  Aloysius|123-45-6789|   40|   90|  100|   83|   49|   D-|
|   Alfred|University|123-12-1234|   41|   97|   96|   97|   48|   D+|
|    Gerty|    Gramma|567-89-0123|   41|   80|   60|   40|   44|    C|
|  Android|  Electric|087-65-4321|   42|   23|   36|   45|   47|   B-|
|  Bumpkin|      Fred|456-78-9012|   43|   78|   88|   77|   45|   A-|
|   Rubble|     Betty|234-56-7890|   44|   90|   80|   90|   46|   C-|
|   Noshow|     Cecil|345-67-8901|   45|   11|   -1|    4|   43|    F|
|     Buff|       Bif|632-79-9939|   46|   20|   30|   40|   50|   B+|
|  Airpump|    Andrew|223-45-6789|   49|    1|   90|  100|   83|    A|
|   Backus|       Jim|143-12-1234|   48|    1|   97|   96|   97|   A+|
|Carnivore|       Art|565-89-0123|   44|    1|   80|   60|   40|   D+|
|    D

In [5]:
%%local

from datetime import datetime
from datasketch import MinHash
from datasketch import MinHashLSH
from datasketch import LeanMinHash
from elasticsearch import Elasticsearch
from redis import StrictRedis
import json
import pickle
import base64
import numpy as np


cache = StrictRedis()
es = Elasticsearch()

index_name = 'test-havel-index'
max_size = '10000'

def calc_idx(data):
    return "%s-%s"%(data['dataset'], data['name'].replace(' ', '_'))

def calc_profile(df, dataset_name):
    doc = []
    min_hashes = []
    
    for c_n in df.columns:
        m = MinHash(num_perm=128)
        s = df[c_n]
        dtype = str(s.dtype)
        crd = s.nunique()
        
        for v in s:
            if dtype == 'int64': v = str(v)
            m.update(v.encode('utf8'))
        
        data = {}
        data['dataset'] = dataset_name
        data['name'] = c_n
        data['data_type'] = dtype
        data['timestamp']: datetime.now()
        data['size'] = s.size
        data['cardinality'] = crd
        data['uniqueness'] = crd / s.size       
        data['min_hash'] =  m.hashvalues
        data['min_hash_seed'] = m.seed
        
        min_hashes.append((calc_idx(data), m))
        doc.append(data)

    return doc, min_hashes


def insert_min_hashes(min_hashes, lsh):
    with lsh.insertion_session() as session:
        for key, minhash in min_hashes:
            print(key)
            print(minhash)
            lsh.remove(key)
            session.insert(key, minhash)


def upsert(index_name, doc, v=True):
    for data in doc:     
        res = es.index(index=index_name, id=calc_idx(data), body=data)

    es.indices.refresh(index=index_name)
    
    if v is True:
        res = es.search(size=max_size, index=index_name, body={"query": {"match_all": {}}})
        print("Got %d Hits." % res['hits']['total']['value'])

        
def clear_cache_ns(ns):
    """
    Clears a namespace in redis cache.
    This may be very time consuming.
    :param ns: str, namespace i.e your:prefix*
    :return: int, num cleared keys
    """
    count = 0
    pipe = cache.pipeline()
    for key in cache.scan_iter(ns):
        pipe.delete(key)
        count += 1
    pipe.execute()
    return count


def clear_es(index_name):
    # delete all
    indices = [index_name]
    es.delete_by_query(index=indices, body={"query": {"match_all": {}}})
    es.indices.delete(index=indices, ignore=[400, 404])


def create_es_schema(index_name):
    schema = {
        "mappings": {
            "properties": {
                "name": {
                    "type": "text" # formerly "string"
                },
                "data_type": {
                    "type": "text"
                },
                 "timestamp": {
                    "type": "date",
                    "format": "yyyy-MM-dd HH:mm:ss.SSSSSS"
                    # data format for Python's datetime.now() method
                },
                "size": {
                    "type": "integer"
                },
                "cardinality": {
                    "type": "float"
                },
                "uniqueness": {
                    "type": "float"
                },
                "min_hash": {
                    "type": "integer"
                },
                "min_hash_seed": {
                    "type": "integer"
                }
            }
        }
    }

    resp = es.indices.create(
        index = index_name,
        body = schema
    )
    
    print ("_mapping response:", json.dumps(resp, indent=4), "\n")

    
def create_profiles(df, dataset_name, es_index_name, lsh):
    doc, min_hashes = calc_profile(df, dataset_name)
    insert_min_hashes(min_hashes, lsh)
    upsert(index_name, doc)


In [111]:
%%local

from neo4j import GraphDatabase

class GraphStorage:

    def __init__(self, uri, user, password):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))

    def close(self):
        self.driver.close()

    def insert_column_node(self, column_name, table_name):
        with self.driver.session() as session:
            res1 = session.execute_write(self._create_column_node, column_name, table_name)
            res2 = session.execute_write(self._create_column_table_relation, column_name, table_name)
            print(res1)
            print(res2)
    
    def insert_table_node(self, table_name):
        with self.driver.session() as session:
            res = session.execute_write(self._create_table_node, table_name)
            print(res)
            
    def add_column_column_relation(self, column_name_a, column_name_b):
        with self.driver.session() as session:
            res = session.execute_write(self._create_column_column_relation, column_name_a, column_name_b)
            print(res)

    def remove_column_self_relations(self):
        with self.driver.session() as session:
            res = session.execute_write(self._delete_column_self_relations)
            print(res)     


    @staticmethod
    def _create_column_node(tx, column_name, table_name):
        result = tx.run("MERGE (a:Column {name: $column_name, table_name: $table_name}) "
                        "ON CREATE SET a.createdAt = timestamp() "
                        "ON MATCH SET a.updatetAt = timestamp() "
                        "RETURN a.name + ' created as node ' + id(a) ",
                        column_name=column_name, table_name=table_name)
        
        return result.single()[0]
    
    
    @staticmethod
    def _create_table_node(tx, name):
        result = tx.run("MERGE (a:Table{name: $name}) "
                        "ON CREATE SET a.createdAt = timestamp() "
                        "ON MATCH SET a.updatetAt = timestamp() "
                        "RETURN a.name + ' created as node ' + id(a) ", name=name)
        return result.single()[0]
    
    
    @staticmethod
    def _create_column_table_relation(tx, column_name, table_name):
        result = tx.run("MATCH (c:Column {name: $column_name}), (t:Table {name: $table_name}) "
                        "MERGE (c)-[r:IS_ATTRIBUTE_OF]->(t) "
                        "RETURN 'relation ' + type(r) + ' created between ' + c.name + ' and ' + t.name", column_name=column_name, table_name=table_name)
        
        entire_result = []
        for record in result:
            entire_result.append(record[0])
            
        return entire_result

    @staticmethod
    def _create_column_column_relation(tx, column_name_a, column_name_b):
        result = tx.run("MATCH (a:Column {name: $column_name_a}), (b:Column {name: $column_name_b}) "
                        "MERGE (a)-[r:IS_SIMILAR_TO]-(b) "
                        "RETURN 'relation ' + type(r) + ' created between ' + a.name + ' and ' + b.name", column_name_a=column_name_a, column_name_b=column_name_b)
        
        entire_result = []
        for record in result:
            entire_result.append(record[0])
            
        return entire_result
    
    
    @staticmethod
    def _delete_column_self_relations(tx):
        result = tx.run("MATCH (a:Column)-[rel:IS_SIMILAR_TO]->(a) DELETE rel")
        
        entire_result = []
        for record in result:
            entire_result.append(record[0])
            
        return entire_result
   
    @staticmethod
    def _delete_all(tx):
        result = tx.run("MATCH (n) DETACH DELETE n")
        
        entire_result = []
        for record in result:
            entire_result.append(record[0])
            
        return entire_result

In [112]:
%%local

clear_cache_ns('db0')

lsh_05 = MinHashLSH(
    threshold=0.5, num_perm=128, storage_config={
        'type': 'redis',
        'basename': bytearray(index_name, 'utf-8'),
        'redis': {'host': 'localhost', 'port': 6379, 'db': 0},
    }
)

lsh_07 = MinHashLSH(
    threshold=0.7, num_perm=128, storage_config={
        'type': 'redis',
        'basename': bytearray(index_name, 'utf-8'),
        'redis': {'host': 'localhost', 'port': 6379, 'db': 1},
    }
)

create_profiles(df_1, 'dataset_1', index_name, lsh_05)
create_profiles(df_2, 'dataset_2', index_name, lsh_05)
create_profiles(df_3, 'dataset_3', index_name, lsh_05)


dataset_1-Last_name
<datasketch.minhash.MinHash object at 0x000001ABBF6B9B80>
dataset_1-First_name
<datasketch.minhash.MinHash object at 0x000001ABBC9A2700>
dataset_1-SSN
<datasketch.minhash.MinHash object at 0x000001ABBF6C4700>
dataset_1-Test1
<datasketch.minhash.MinHash object at 0x000001ABBF6C4670>
dataset_1-Test2
<datasketch.minhash.MinHash object at 0x000001ABBF6C4340>
dataset_1-Test3
<datasketch.minhash.MinHash object at 0x000001ABBF715100>
dataset_1-Test4
<datasketch.minhash.MinHash object at 0x000001ABBF7151C0>
dataset_1-Final
<datasketch.minhash.MinHash object at 0x000001ABBF7159D0>
dataset_1-Grade
<datasketch.minhash.MinHash object at 0x000001ABBF715760>
Got 36 Hits.
dataset_2-Last_name
<datasketch.minhash.MinHash object at 0x000001ABBF6660A0>
dataset_2-First_name
<datasketch.minhash.MinHash object at 0x000001ABBF6661F0>
dataset_2-SSN
<datasketch.minhash.MinHash object at 0x000001ABBF6B99A0>
dataset_2-Test1
<datasketch.minhash.MinHash object at 0x000001ABBF55BA90>
dataset_2-T

In [114]:
%local 

res = es.search(size=max_size, index=index_name, body={"query": {"match_all": {}}})
print("Got %d Hits." % res['hits']['total']['value'])

relations = []

for hit in res['hits']['hits']:
    min_hash_hashes = np.array(hit['_source']["min_hash"],  dtype=np.int64)
    min_hash_seed = hit['_source']["min_hash_seed"]
        
    mt = MinHash(num_perm=128)
    mt.hashvalues = min_hash_hashes
    mt.seed = min_hash_seed
    
    bucket = lsh_05.query(mt)    
    relations.append((hit['_id'], bucket))

graph = GraphStorage("bolt://localhost:7687", "neo4j", "password")

graph.insert_table_node("dataset_1")
graph.insert_table_node("dataset_2")
graph.insert_table_node("dataset_3")

for hit in res['hits']['hits']:
    graph.insert_column_node(hit['_id'], hit['_source']["dataset"])

for k,b in relations:
    for v in b: 
        print(k, b)
        graph.add_column_column_relation(k,v)
    
graph.remove_column_self_relations()
graph.close()

Got 36 Hits.
dataset_1 created as node 78
dataset_2 created as node 79
dataset_3 created as node 80
test-dataset_01-Last_name created as node 81
[]
test-dataset_01-First_name created as node 82
[]
test-dataset_01-SSN created as node 83
[]
test-dataset_01-Test1 created as node 84
[]
test-dataset_01-Test2 created as node 85
[]
test-dataset_01-Test3 created as node 86
[]
test-dataset_01-Test4 created as node 87
[]
test-dataset_01-Final created as node 88
[]
test-dataset_01-Grade created as node 89
[]
dataset_1-Last_name created as node 90
['relation IS_ATTRIBUTE_OF created between dataset_1-Last_name and dataset_1']
dataset_1-First_name created as node 91
['relation IS_ATTRIBUTE_OF created between dataset_1-First_name and dataset_1']
dataset_1-SSN created as node 92
['relation IS_ATTRIBUTE_OF created between dataset_1-SSN and dataset_1']
dataset_1-Test1 created as node 93
['relation IS_ATTRIBUTE_OF created between dataset_1-Test1 and dataset_1']
dataset_1-Test2 created as node 94
['relati

In [88]:
%%local 

arr = np.array([ 42114592,  127266168,  215678469,   17266034,   34121180,  942441018,
  227629861,  772424727,  258055101,  289724735,  117062477,  555008702,
   29604049,  840321879,  378698042, 1209184744,  243903852,   64306427,
  418573613,   50885453,  443551390,  238376316,  629454598,  646390910,
  258707738,  287206176,   86790830,  230629753,  353312205,   72417695,
  113209494,  247807691,  494356438,  598711389,  163905412,   92705481,
  167455200,  120333262,  690658470,  503937613,   13774441,  138757320,
  429215096,  166965839,  604024314,  151611451,  482383227,   80185077,
  132702729,  637863885,  809374076,   15601261,  231429825,  508796949,
   93190304,  246310319,  252884464,   76719615,   27566880,  600163519,
    9727857,   11786542,  156050497,  303860790,  228751560,  912519144,
    9156249,  369895830,  727826405,  416426394, 1081480011,   66120612,
  135376155,  210143265, 1031356013,   90284798, 1016601632,   13573364,
   68970483,  161433822,  542646600,   96345609,  685539749,   84541516,
   86327512,  187842457,  132296212,  857191589,   20737788,  590826688,
  444073826,  192175731,  212992765,  114574843,  171782812,   70248969,
  202706043,  702713690,  338155350,  342468131,  542793283,  261167155,
  378396244,  211528111,  231333263,  103754312,  129317962,   55448546,
 1041271092,  594880347,  931295081,  225781925,   83509603,  776091994,
  298124272,  467369673,    8909661,   60688953,  115820188,   54821235,
  168100490,   33183095,  712469771,  323850472,  414195156,  550484318,
  388634331,   42270916], dtype=np.int64)

mt = MinHash(num_perm=128)
mt.hashvalues = arr
mt.seed = 1

lsh_05.query(mt)


['dataset_3-Final', 'dataset_1-Final', 'dataset_2-Final']

In [126]:
%%local 

# import parser object from tike
from tika import parser  

import tika
  
url = 'https://aclanthology.org/2020.acl-main.577.pdf'
url = 'http://127.0.0.1:9864/webhdfs/v1/grades.csv?op=OPEN&namenoderpcaddress=master:9000&offset=0'
pdfFile = parser.from_file(url)

print(pdfFile['content'])

print ("_mapping response:", json.dumps(pdfFile['metadata'], indent=4), "\n")


2022-11-17 12:49:16,515 [MainThread  ] [INFO ]  Retrieving http://127.0.0.1:9864/webhdfs/v1/grades.csv?op=OPEN&namenoderpcaddress=master:9000&offset=0 to C:\Users\DEL-MO~1\AppData\Local\Temp/webhdfs-v1-grades.csv.













	Last name	First name	SSN	Test1	Test2	Test3	Test4	Final	Grade
	Alfalfa	Aloysius	123-45-6789	40	90	100	83	49	D-
	Alfred	University	123-12-1234	41	97	96	97	48	D+
	Gerty	Gramma	567-89-0123	41	80	60	40	44	C
	Android	Electric	087-65-4321	42	23	36	45	47	B-
	Bumpkin	Fred	456-78-9012	43	78	88	77	45	A-
	Rubble	Betty	234-56-7890	44	90	80	90	46	C-
	Noshow	Cecil	345-67-8901	45	11	-1	4	43	F
	Buff	Bif	632-79-9939	46	20	30	40	50	B+
	Airpump	Andrew	223-45-6789	49	1	90	100	83	A
	Backus	Jim	143-12-1234	48	1	97	96	97	A+
	Carnivore	Art	565-89-0123	44	1	80	60	40	D+
	Dandy	Jim	087-75-4321	47	1	23	36	45	C+
	Elephant	Ima	456-71-9012	45	1	78	88	77	B-
	Franklin	Benny	234-56-2890	50	1	90	80	90	B-
	George	Boy	345-67-3901	40	1	11	-1	4	B
	Heffalump	Harvey	632-79-9439	30	1	20	30	40	C


_mapping response: {
    "X-TIKA:Parsed-By": [
        "org.apache.tika.parser.DefaultParser",
        "org.apache.tika.parser.csv.TextAndCSVParser"
    ],
    "X-TIKA:Parsed-By-Full-Set": [
        "org.apache.tika.parser.