Skip to content

Commit

Permalink
CsvPipe python api issue zinggAI#401
Browse files Browse the repository at this point in the history
  • Loading branch information
RavirajBaraiya committed Jul 17, 2022
1 parent dca5a9b commit bc4a969
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 32 deletions.
10 changes: 4 additions & 6 deletions examples/amazon-google/AmazonGoogle.py
Expand Up @@ -23,15 +23,13 @@
#in that case, replace df with input df
dfAmazon = spark.read.format("csv").schema("id string, title string, description string, manufacturer string, price double ").load("examples/amazon-google/Amazon.csv")
dfSchemaAmazon = str(dfAmazon.schema.json())
inputPipeAmazon = CsvPipe("testAmazon")
inputPipeAmazon.setLocation("examples/amazon-google/Amazon.csv")
inputPipeAmazon.setSchema(dfSchemaAmazon)

inputPipeAmazon = CsvPipe("testAmazon", dfSchemaAmazon, "examples/amazon-google/Amazon.csv")

dfGoogle = spark.read.format("csv").schema("id string, title string, description string, manufacturer string, price double ").load("examples/amazon-google/GoogleProducts.csv")
dfSchemaGoogle = str(dfGoogle.schema.json())
inputPipeGoogle = CsvPipe("testGoogle")
inputPipeGoogle.setLocation("examples/amazon-google/GoogleProducts.csv")
inputPipeGoogle.setSchema(dfSchemaGoogle)

inputPipeGoogle = CsvPipe("testGoogle", dfSchemaGoogle, "examples/amazon-google/GoogleProducts.csv")

args.setData(inputPipeAmazon,inputPipeGoogle)

Expand Down
7 changes: 2 additions & 5 deletions examples/febrl/FebrlExample.py
Expand Up @@ -28,12 +28,9 @@
#below line should not be required if you are reading from in memory dataset
#in that case, replace df with input df
df = spark.read.format("csv").schema("id string, fname string, lname string, stNo string, add1 string, add2 string, city string, state string, areacode string, dob string, ssn string").load("examples/febrl/test.csv")

inputPipe = CsvPipe("test")
inputPipe.setLocation("examples/febrl/test.csv")

dfSchema = str(df.schema.json())
inputPipe.setSchema(dfSchema)

inputPipe = CsvPipe("test", dfSchema, "examples/febrl/test.csv")

args.setData(inputPipe)

Expand Down
10 changes: 3 additions & 7 deletions examples/iTunes-amazon/iTunesAmazon.py
Expand Up @@ -28,16 +28,12 @@
#below line should not be required if you are reading from in memory dataset
#in that case, replace df with input df
dfiTunes = spark.read.format("csv").schema("id string, Song_Name string, Artist_Name string, Album_Name string, Genre string, Price double, CopyRight string, Time string, Released string").load("examples/iTunes-amazon/iTunesMusic.csv")
dfSchema = str(dfiTunes.schema.json())
inputPipeiTunes = CsvPipe("testiTunes")
inputPipeiTunes.setLocation("examples/iTunes-amazon/iTunesMusic.csv")
inputPipeiTunes.setSchema(dfSchema)
dfSchemaiTunes = str(dfiTunes.schema.json())
inputPipeiTunes = CsvPipe("testiTunes", dfSchemaiTunes, "examples/iTunes-amazon/iTunesMusic.csv")

dfAmazon = spark.read.format("csv").schema("id string, Song_Name string, Artist_Name string, Album_Name string, Genre string, Price double, CopyRight string, Time string, Released string").load("examples/iTunes-amazon/AmazonMusic.csv")
dfSchemaAmazon = str(dfAmazon.schema.json())
inputPipeAmazon = CsvPipe("testAmazon")
inputPipeAmazon.setLocation("examples/iTunes-amazon/AmazonMusic.csv")
inputPipeAmazon.setSchema(dfSchemaAmazon)
inputPipeAmazon = CsvPipe("testAmazon", dfSchemaAmazon, "examples/iTunes-amazon/AmazonMusic.csv")

args.setData(inputPipeiTunes,inputPipeAmazon)

Expand Down
5 changes: 1 addition & 4 deletions examples/ncVoters5M/ncVoters.py
Expand Up @@ -24,10 +24,7 @@
df = spark.read.format("csv").schema("recid string, givenname string, surname string, suburb string, postcode double ").load("examples/ncVoters5M/5Party-ocp20/")
dfSchemaA = str(df.schema.json())

inputPipe = CsvPipe("test")
inputPipe.setLocation("examples/ncVoters5M/5Party-ocp20/")
inputPipe.setSchema(dfSchemaA)
args.setData(inputPipe)
inputPipe = CsvPipe("test", dfSchemaA, "examples/ncVoters5M/5Party-ocp20/")

#setting outputpipe in 'args'
outputPipe = CsvPipe("ncVotersResult")
Expand Down
14 changes: 9 additions & 5 deletions python/zingg/pipes/pipes.py
Expand Up @@ -19,9 +19,17 @@ class CsvPipe(Pipe):
:param name: name of the pipe.
:type name: String
:param schema: (optional) json schema for the pipe
:type schema: Schema or None
:param location: (optional) location from where we read data
:type location: String or None
"""
def __init__(self, name):
def __init__(self, name, schema = None, location = None):
Pipe.__init__(self, name, Format.CSV.type())
if(schema != None):
Pipe.setSchema(self, schema)
if(location != None):
Pipe.addProperty(self, FilePipe.LOCATION, location)

def setDelimiter(self, delimiter):
""" This method is used to define delimiter of CsvPipe
Expand All @@ -42,10 +50,6 @@ def setLocation(self, location):
def setHeader(self, header):
Pipe.addProperty(self, FilePipe.HEADER, header)

class BigQueryPipe(Pipe):

=======

class BigQueryPipe(Pipe):
""" Pipe Class for working with BigQuery pipeline
Expand Down
44 changes: 39 additions & 5 deletions python/zingg/zingg.py
Expand Up @@ -37,6 +37,7 @@ class Zingg:
def __init__(self, args, options):
self.client = jvm.zingg.client.Client(args.getArgs(), options.getClientOptions())


def init(self):
""" Method to initialize zingg client by reading internal configurations and functions """
self.client.init()
Expand Down Expand Up @@ -158,6 +159,21 @@ def getPandasDfFromDs(self, data):
return pd.DataFrame(df.collect(), columns=df.columns)


class ZinggWithSpark(Zingg):

""" This class is the main point of interface with the Zingg matching product. Construct a client to Zingg using provided arguments and spark master. If running locally, set the master to local.
:param args: arguments for training and matching
:type args: Arguments
:param options: client option for this class object
:type options: ClientOptions
"""

def __init__(self, args, options):
self.client = jvm.zingg.client.Client(args.getArgs(), options.getClientOptions(), spark._jsparkSession)


class Arguments:
""" This class helps supply match arguments to Zingg. There are 3 basic steps in any match process.
Expand Down Expand Up @@ -261,6 +277,15 @@ def writeArgumentsToJSON(self, fileName):
"""
jvm.zingg.client.Arguments.writeArgumentsToJSON(fileName, self.args)

def setStopWordsCutoff(self, stopWordsCutoff):
""" Method to set stopWordsCutoff parameter vlaue
By default, Zingg extracts 10% of the high frequency unique words from a dataset. If user wants different selection, they should set up StopWordsCutoff property
:param stopWordsCutoff: The stop words cutoff parameter value of ClientOption object or file address of json file
:type stopWordsCutoff: float
"""
self.args.setStopWordsCutoff(stopWordsCutoff)

@staticmethod
def createArgumentsFromJSON(fileName, phase):
""" Method to create an object of this class from the JSON file and phase parameter value.
Expand Down Expand Up @@ -295,10 +320,14 @@ class ClientOptions:
""":LOCATION: location parameter for this class"""

def __init__(self, args = None):
if(args!=None):
self.co = jvm.zingg.client.ClientOptions(args)
else:
self.co = jvm.zingg.client.ClientOptions(["--phase", "trainMatch", "--conf", "dummy", "--license", "dummy", "--email", "xxx@yyy.com"])
if(args == None):
args = []
args.append(self.LICENSE)
args.append("zinggLic.txt")
args.append(self.EMAIL)
args.append("zingg@zingg.ai")
print("arguments for client options are ", args)
self.co = jvm.zingg.client.ClientOptions(args)

def getClientOptions(self):
""" Method to get pointer address of this class
Expand Down Expand Up @@ -383,13 +412,18 @@ class FieldDefinition:
:type dataType: String
:param matchType: match type of this field e.g. FUSSY, EXACT, etc.
:type matchType: MatchType
:param stopWords: The stop Words containing csv file's location
:type stopWords: String or None
"""

def __init__(self, name, dataType, *matchType):
def __init__(self, name, dataType, *matchType, stopWords = None):
self.fd = jvm.zingg.client.FieldDefinition()
self.fd.setFieldName(name)
self.fd.setDataType(self.stringify(dataType))
self.fd.setMatchType(matchType)
self.fd.setFields(name)
if(stopWords!= None):
self.fd.setStopWords(stopWords)

def getFieldDefinition(self):
""" Method to get pointer address of this class
Expand Down
64 changes: 64 additions & 0 deletions test/testFebrl/testFebrl.py
@@ -0,0 +1,64 @@
import unittest
from unittest.case import TestCase
import unittest
from io import StringIO


from zingg import *
from zingg.pipes import *

args = Arguments()
fname = FieldDefinition("fname", "string", MatchType.FUZZY)
lname = FieldDefinition("lname", "string", MatchType.FUZZY)
stNo = FieldDefinition("stNo", "string", MatchType.FUZZY)
add1 = FieldDefinition("add1","string", MatchType.FUZZY)
add2 = FieldDefinition("add2", "string", MatchType.FUZZY)
city = FieldDefinition("city", "string", MatchType.FUZZY)
areacode = FieldDefinition("areacode", "string", MatchType.FUZZY)
state = FieldDefinition("state", "string", MatchType.FUZZY)
dob = FieldDefinition("dob", "string", MatchType.FUZZY)
ssn = FieldDefinition("ssn", "string", MatchType.FUZZY)

fieldDefs = [fname, lname, stNo, add1, add2, city, areacode, state, dob, ssn]

args.setFieldDefinition(fieldDefs)
args.setModelId("100")
args.setZinggDir("models")
args.setNumPartitions(4)
args.setLabelDataSampleSize(0.5)

df = spark.read.format("csv").schema("id string, fname string, lname string, stNo string, add1 string, add2 string, city string, state string, areacode string, dob string, ssn string").load("examples/febrl/test.csv")
dfSchema = str(df.schema.json())

inputPipe = CsvPipe("test", dfSchema, "examples/febrl/test.csv")

outputPipe = CsvPipe("result")
outputPipe.setLocation("/tmp/pythonTest")

args.setData(inputPipe)
args.setOutput(outputPipe)

options = ClientOptions()
# options.setPhase("trainMatch")
options.setPhase("trainMatch")

#testing

class Accuracy_recordCount(TestCase):
def test_recordCount(self):
client = Zingg(args, options)
client.initAndExecute()
pMarkedDF = client.getPandasDfFromDs(client.getMarkedRecords())
labelledData = spark.createDataFrame(pMarkedDF)

total_marked = pMarkedDF.shape[0]

# marked record count test
self.assertEqual(total_marked, 76)

pMarkedDF.drop(pMarkedDF[pMarkedDF[ColName.PREDICTION_COL] == -1].index, inplace=True)
acc = (pMarkedDF[ColName.MATCH_FLAG_COL]== pMarkedDF[ColName.PREDICTION_COL]).mean()

# accuracy test
self.assertGreater(acc, 0.9)

0 comments on commit bc4a969

Please sign in to comment.