In [0]:
pip install zingg

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
pip show zingg

Name: zingg
Version: 0.4.0
Summary: Zingg Entity Resolution, Data Mastering and Deduplication
Home-page: https://github.com/zinggAI/zingg
Author: Zingg.AI
Author-email: sonalgoyal4@gmail.com
License: https://github.com/zinggAI/zingg/blob/main/LICENSE
Location: /local_disk0/.ephemeral_nfs/envs/pythonEnv-8d40cd2d-dd77-439c-984f-ba45209715a7/lib/python3.11/site-packages
Requires: py4j
Required-by: 


In [0]:
pip install tabulate

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
pip show tabulate

Name: tabulate
Version: 0.9.0
Summary: Pretty-print tabular data
Home-page: 
Author: 
Author-email: Sergey Astanin <s.astanin@gmail.com>
License: MIT
Location: /local_disk0/.ephemeral_nfs/envs/pythonEnv-8d40cd2d-dd77-439c-984f-ba45209715a7/lib/python3.11/site-packages
Requires: 
Required-by: 


In [0]:
##you can change these to the locations of your choice
##these are the only two settings that need to change
zinggDir = "/models"
modelId = "zinggtrial4"

In [0]:

##please leave the following unchanged
MARKED_DIR = zinggDir + "/" + modelId + "/trainingData/marked/"
UNMARKED_DIR = zinggDir + "/" + modelId + "/trainingData/unmarked/"

MARKED_DIR_DBFS = "/dbfs" + MARKED_DIR
UNMARKED_DIR_DBFS = "/dbfs" + UNMARKED_DIR  


import pandas as pd
import numpy as np
 
import time
import uuid
 
from tabulate import tabulate
from ipywidgets import widgets, interact, GridspecLayout
import base64

import pyspark.sql.functions as fn

##this code sets up the Zingg Python interface
from zingg.client import *
from zingg.pipes import *

def cleanModel():
    dbutils.fs.rm(MARKED_DIR, recurse=True)
    # drop unmarked data
    dbutils.fs.rm(UNMARKED_DIR, recurse=True)
    return

# assign label to candidate pair
def assign_label(candidate_pairs_pd, z_cluster, label):
  '''
  The purpose of this function is to assign a label to a candidate pair
  identified by its z_cluster value.  Valid labels include:
     0 - not matched
     1 - matched
     2 - uncertain
  '''
  
  # assign label
  candidate_pairs_pd.loc[ candidate_pairs_pd['z_cluster']==z_cluster, 'z_isMatch'] = label
  
  return
 
def count_labeled_pairs(marked_pd):
  '''
  The purpose of this function is to count the labeled pairs in the marked folder.
  '''

  n_total = len(np.unique(marked_pd['z_cluster']))
  n_positive = len(np.unique(marked_pd[marked_pd['z_isMatch']==1]['z_cluster']))
  n_negative = len(np.unique(marked_pd[marked_pd['z_isMatch']==0]['z_cluster']))
  
  return n_positive, n_negative, n_total

# setup widget 
available_labels = {
    'No Match':0,
    'Match':1,
    'Uncertain':2
    }
#dbutils.widgets.dropdown('label', 'Uncertain', available_labels.keys(), 'Is this pair a match?')




In [0]:

#build the arguments for zingg
args = Arguments()
# Set the modelid and the zingg dir. You can use this as is
args.setModelId(modelId)
args.setZinggDir(zinggDir)


In [0]:
# # Import pandas
# import pandas as pd

# # Define the schema (optional for validation)
# schema = ["id", "fname", "lname", "stNo", "add1", "add2", "city", "state", "dob", "ssn"]

# # Load the CSV file
# data = pd.read_csv("/dbfs/FileStore/tables/data.csv")

# # Ensure column names match the schema
# data.columns = schema  # Adjust only if the file's column names differ

# # Display the data
# data.head()


In [0]:
schema = "rec_id string, fname string, lname string, stNo string, add1 string, add2 string, city string, state string, dob string, ssn string"
inputPipe = CsvPipe("testFebrl", "/FileStore/tables/data.csv", schema)

args.setData(inputPipe)

set schema 


In [0]:
#setting outputpipe in 'args'
outputPipe = CsvPipe("resultOutput", "/tmp/output26")
args.setOutput(outputPipe)

In [0]:
# Set field definitions
rec_id = FieldDefinition("rec_id", "string", MatchType.EXACT)        # ID should use exact match
fname = FieldDefinition("fname", "string", MatchType.FUZZY)  # First Name
lname = FieldDefinition("lname", "string", MatchType.FUZZY)  # Last Name
stNo = FieldDefinition("stNo", "string", MatchType.FUZZY)    # Street Number
add1 = FieldDefinition("add1", "string", MatchType.FUZZY)    # Address Line 1
add2 = FieldDefinition("add2", "string", MatchType.FUZZY)    # Address Line 2
city = FieldDefinition("city", "string", MatchType.FUZZY)    # City
state = FieldDefinition("state", "string", MatchType.FUZZY)  # State
dob = FieldDefinition("dob", "string", MatchType.EXACT)      # Date of Birth (prefer exact match)
ssn = FieldDefinition("ssn", "string", MatchType.EXACT)      # SSN (should use exact match)

# Create the field definitions list
fieldDefs = [rec_id, fname, lname, stNo, add1, add2, city, state, dob, ssn]

# Set field definitions in args
args.setFieldDefinition(fieldDefs)

In [0]:
# The numPartitions define how data is split across the cluster. 
# Please change the fllowing as per your data and cluster size by referring to the docs.

args.setNumPartitions(4)
args.setLabelDataSampleSize(0.5)

In [0]:
options = ClientOptions([ClientOptions.PHASE,"findTrainingData"])

#Zingg execution for the given phase
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()

['--phase', 'findTrainingData']
arguments for client options are  ['--phase', 'findTrainingData', '--license', 'zinggLic.txt', '--email', 'zingg@zingg.ai', '--conf', 'dummyConf.json']


In [0]:
options = ClientOptions([ClientOptions.PHASE,"label"])

#Zingg execution for the given phase
zingg = ZinggWithSpark(args, options)
zingg.init()

['--phase', 'label']
arguments for client options are  ['--phase', 'label', '--license', 'zinggLic.txt', '--email', 'zingg@zingg.ai', '--conf', 'dummyConf.json']


In [0]:
# get candidate pairs
candidate_pairs_pd = getPandasDfFromDs(zingg.getUnmarkedRecords())
 
# if no candidate pairs, run job and wait
if candidate_pairs_pd.shape[0] == 0:
  print('No unlabeled candidate pairs found.  Run findTraining job ...')

else:
    # get list of pairs (as identified by z_cluster) to label 
    z_clusters = list(np.unique(candidate_pairs_pd['z_cluster'])) 

    # identify last reviewed cluster
    last_z_cluster = '' # none yet

    # print candidate pair stats
    print('{0} candidate pairs found for labeling'.format(len(z_clusters)))
  



12 candidate pairs found for labeling


In [0]:
# Label Training Set

# define variable to avoid duplicate saves
ready_for_save = False
print(candidate_pairs_pd)

# user-friendly labels and corresponding zingg numerical value
# (the order in the dictionary affects how displayed below)
LABELS = {
  'Uncertain':2,
  'Match':1,
  'No Match':0  
  }

# GET CANDIDATE PAIRS
# ========================================================
#candidate_pairs_pd = get_candidate_pairs()
n_pairs = int(candidate_pairs_pd.shape[0]/2)
# ========================================================

# DEFINE IPYWIDGET DISPLAY
# ========================================================
display_pd = candidate_pairs_pd.drop(
  labels=[
    'z_zid', 'z_prediction', 'z_score', 'z_isMatch', 'z_zsource'
    ], 
  axis=1)

# define header to be used with each displayed pair
html_prefix = "<p><span style='font-family:Courier New,Courier,monospace'>"
html_suffix = "</p></span>"
header = widgets.HTML(value=f"{html_prefix}<b>" + "<br />".join([str(i)+"&nbsp;&nbsp;" for i in display_pd.columns.to_list()]) + f"</b>{html_suffix}")

# initialize display
vContainers = []
vContainers.append(widgets.HTML(value=f'<h2>Indicate if each of the {n_pairs} record pairs is a match or not</h2></p>'))

# for each set of pairs
for n in range(n_pairs):

  # get candidate records
  candidate_left = display_pd.loc[2*n].to_list()
  print(candidate_left)
  candidate_right = display_pd.loc[(2*n)+1].to_list()
  print(candidate_right)

  # define grid to hold values
  html = ''

  for i in range(display_pd.shape[1]):

    # get column name
    column_name = display_pd.columns[i]

    # if field is image
    if column_name == 'image_path':

      # define row header
      html += '<tr>'
      html += '<td><b>image</b></td>'

      # read left image to encoded string
      l_endcode = ''
      if candidate_left[i] != '':
        with open(candidate_left[i], "rb") as l_file:
          l_encode = base64.b64encode( l_file.read() ).decode()

      # read right image to encoded string
      r_encode = ''
      if candidate_right[i] != '':
        with open(candidate_right[i], "rb") as r_file:
          r_encode = base64.b64encode( r_file.read() ).decode()      

      # present images
      html += f'<td><img src="data:image/png;base64,{l_encode}"></td>'
      html += f'<td><img src="data:image/png;base64,{r_encode}"></td>'
      html += '</tr>'

    elif column_name != 'image_path':  # display text values

      if column_name == 'z_cluster': z_cluster = candidate_left[i]

      html += '<tr>'
      html += f'<td style="width:10%"><b>{column_name}</b></td>'
      html += f'<td style="width:45%">{str(candidate_left[i])}</td>'
      html += f'<td style="width:45%">{str(candidate_right[i])}</td>'
      html += '</tr>'

  # insert data table
  table = widgets.HTML(value=f'<table data-title="{z_cluster}" style="width:100%;border-collapse:collapse" border="1">'+html+'</table>')
  z_cluster = None

  # assign label options to pair
  label = widgets.ToggleButtons(
    options=LABELS.keys(), 
    button_style='info'
    )

  # define blank line between displayed pair and next
  blankLine=widgets.HTML(value='<br>')

  # append pair, label and blank line to widget structure
  vContainers.append(widgets.VBox(children=[table, label, blankLine]))

# present widget
display(widgets.VBox(children=vContainers))
# ========================================================

# mark flag to allow save 
ready_for_save = True


    z_zid         z_cluster  z_prediction  ...        dob       ssn  z_zsource
0      44   1732623711258:0          -1.0  ...   19180205   1909717  testFebrl
1      14   1732623711258:0          -1.0  ...   19180205   1909717  testFebrl
2      51   1732623711258:1          -1.0  ...   19210210   3808808  testFebrl
3      21   1732623711258:1          -1.0  ...   19210210   3808808  testFebrl
4      38  1732623711258:11          -1.0  ...   19410111   2540080  testFebrl
5       8  1732623711258:11          -1.0  ...   19410111   2540080  testFebrl
6      37  1732623711258:15          -1.0  ...   19830807   2932837  testFebrl
7      20  1732623711258:15          -1.0  ...   19461101   4783085  testFebrl
8      50  1732623711258:19          -1.0  ...   19461101   4783085  testFebrl
9      20  1732623711258:19          -1.0  ...   19461101   4783085  testFebrl
10     36   1732623711258:2          -1.0  ...   19830807   2932837  testFebrl
11     19   1732623711258:2          -1.0  ...   194

VBox(children=(HTML(value='<h2>Indicate if each of the 12 record pairs is a match or not</h2></p>'), VBox(chil…

In [0]:
if not ready_for_save:
  print('No labels have been assigned. Run the previous cell to create candidate pairs and assign labels to them before re-running this cell.')

else:

  # ASSIGN LABEL VALUE TO CANDIDATE PAIRS IN DATAFRAME
  # ========================================================
  # for each pair in displayed widget
  for pair in vContainers[1:]:

    # get pair and assigned label
    html_content = pair.children[1].get_interact_value() # the displayed pair as html
    user_assigned_label = pair.children[1].get_interact_value() # the assigned label

    # extract candidate pair id from html pair content
    start = pair.children[0].value.find('data-title="')
    if start > 0: 
      start += len('data-title="') 
      end = pair.children[0].value.find('"', start+2)
    pair_id = pair.children[0].value[start:end]



    # assign label to candidate pair entry in dataframe
    candidate_pairs_pd.loc[candidate_pairs_pd['z_cluster']==pair_id, 'z_isMatch'] = LABELS.get(user_assigned_label)
  # ========================================================

  # SAVE LABELED DATA TO ZINGG FOLDER
  # ========================================================
  # make target directory if needed
  dbutils.fs.mkdirs(MARKED_DIR)
  
  # save label assignments
  # save labels
  zingg.writeLabelledOutputFromPandas(candidate_pairs_pd,args)

  # count labels accumulated
  marked_pd_df = getPandasDfFromDs(zingg.getMarkedRecords())
  n_pos, n_neg, n_tot = count_labeled_pairs(marked_pd_df)
  print(f'You have accumulated {n_pos} pairs labeled as positive matches.')
  print(f'You have accumulated {n_neg} pairs labeled as not matches.')
  print("If you need more pairs to label, re-run the cell for 'findTrainingData'")
  # ========================================================  

  # save completed
  ready_for_save = False

You have accumulated 7 pairs labeled as positive matches.
You have accumulated 5 pairs labeled as not matches.
If you need more pairs to label, re-run the cell for 'findTrainingData'




In [0]:
options = ClientOptions([ClientOptions.PHASE,"trainMatch"])

#Zingg execution for the given phase
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()

['--phase', 'trainMatch']
arguments for client options are  ['--phase', 'trainMatch', '--license', 'zinggLic.txt', '--email', 'zingg@zingg.ai', '--conf', 'dummyConf.json']


In [0]:
outputDF = spark.read.csv("/tmp/output26Nov")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-94396997902751>, line 1[0m
[0;32m----> 1[0m outputDF [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mcsv([38;5;124m"[39m[38;5;124m/tmp/output26Nov[39m[38;5;124m"[39m)

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:47[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     45[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     46[0m [38;5;28;01mtry[39;00m:
[0;32m---> 47[0m     res [38;5;241m=[39m func([38;5;241m*[39margs, [38;5;241m*[39m[38;5;241m*[39mkwargs)
[1;32m     48[0m     logger[38;5;241m.[39mlog_success(
[1;32m     49[0m         module_name, class_name, function_name, time[38;5;241m.[39mperf_counter() [38;5;241m-[39m start, signature
[1;32m     50[0m     )
[1;32

In [0]:
dbutils.fs.ls("/tmp")

[FileInfo(path='dbfs:/tmp/Output/', name='Output/', size=0, modificationTime=1732601650000),
 FileInfo(path='dbfs:/tmp/checkpoint/', name='checkpoint/', size=0, modificationTime=1732010367000),
 FileInfo(path='dbfs:/tmp/febrlOutput/', name='febrlOutput/', size=0, modificationTime=1732601622000)]

In [0]:
colNames = ["z_minScore", "z_maxScore", "z_cluster", "rec_id", "fname", "lname", "stNo", "add1", "add2", "city", "state", "dob", "ssn"]
outputDF.toDF(*colNames).show(100)

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
File [0;32m<command-94396997902752>, line 2[0m
[1;32m      1[0m colNames [38;5;241m=[39m [[38;5;124m"[39m[38;5;124mz_minScore[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mz_maxScore[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mz_cluster[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mid[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mfname[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mlname[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mstNo[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124madd1[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124madd2[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mcity[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mstate[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mdob[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mssn[39m[38;5

In [0]:
options = ClientOptions([ClientOptions.PHASE,"generateDocs"])

#Zingg execution for the given phase
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:464)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:571)
	at com.data

In [0]:
DOCS_DIR = zinggDir + "/" + modelId + "/docs/"

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:464)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:571)
	at com.data

In [0]:
dbutils.fs.ls('file:'+DOCS_DIR)

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:464)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:571)
	at com.data

In [0]:
displayHTML(open(DOCS_DIR+"model.html", 'r').read())

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:464)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:571)
	at com.data

In [0]:
displayHTML(open(DOCS_DIR+"data.html", 'r').read())

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:464)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:571)
	at com.data