Skip to content

Commit

Permalink
CsvPipe python api issue zinggAI#401
Browse files Browse the repository at this point in the history
  • Loading branch information
RavirajBaraiya committed Jul 17, 2022
1 parent dca5a9b commit 0bffcbc
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 35 deletions.
12 changes: 4 additions & 8 deletions examples/amazon-google/AmazonGoogle.py
Expand Up @@ -23,21 +23,17 @@
#in that case, replace df with input df
dfAmazon = spark.read.format("csv").schema("id string, title string, description string, manufacturer string, price double ").load("examples/amazon-google/Amazon.csv")
dfSchemaAmazon = str(dfAmazon.schema.json())
inputPipeAmazon = CsvPipe("testAmazon")
inputPipeAmazon.setLocation("examples/amazon-google/Amazon.csv")
inputPipeAmazon.setSchema(dfSchemaAmazon)
inputPipeAmazon = CsvPipe("testAmazon", dfSchemaAmazon, "examples/amazon-google/Amazon.csv")

dfGoogle = spark.read.format("csv").schema("id string, title string, description string, manufacturer string, price double ").load("examples/amazon-google/GoogleProducts.csv")
dfSchemaGoogle = str(dfGoogle.schema.json())
inputPipeGoogle = CsvPipe("testGoogle")
inputPipeGoogle.setLocation("examples/amazon-google/GoogleProducts.csv")
inputPipeGoogle.setSchema(dfSchemaGoogle)
inputPipeGoogle = CsvPipe("testGoogle", dfSchemaGoogle, "examples/amazon-google/GoogleProducts.csv")

args.setData(inputPipeAmazon,inputPipeGoogle)

#setting outputpipe in 'args'
outputPipe = CsvPipe("resultAmazonGoogle")
outputPipe.setLocation("/tmp/AwsGoogleOutput")
outputPipe = CsvPipe("resultAmazonGoogle", None, "/tmp/AwsGoogleOutput")

args.setOutput(outputPipe)

options = ClientOptions()
Expand Down
9 changes: 2 additions & 7 deletions examples/febrl/FebrlExample.py
Expand Up @@ -28,18 +28,13 @@
#below line should not be required if you are reading from in memory dataset
#in that case, replace df with input df
df = spark.read.format("csv").schema("id string, fname string, lname string, stNo string, add1 string, add2 string, city string, state string, areacode string, dob string, ssn string").load("examples/febrl/test.csv")

inputPipe = CsvPipe("test")
inputPipe.setLocation("examples/febrl/test.csv")

dfSchema = str(df.schema.json())
inputPipe.setSchema(dfSchema)
inputPipe = CsvPipe("test", dfSchema, "examples/febrl/test.csv")

args.setData(inputPipe)

#setting outputpipe in 'args'
outputPipe = CsvPipe("csv")
outputPipe.setLocation("/tmp")
outputPipe = CsvPipe("csv", None, "/tmp")

args.setOutput(outputPipe)

Expand Down
13 changes: 4 additions & 9 deletions examples/iTunes-amazon/iTunesAmazon.py
Expand Up @@ -28,22 +28,17 @@
#below line should not be required if you are reading from in memory dataset
#in that case, replace df with input df
dfiTunes = spark.read.format("csv").schema("id string, Song_Name string, Artist_Name string, Album_Name string, Genre string, Price double, CopyRight string, Time string, Released string").load("examples/iTunes-amazon/iTunesMusic.csv")
dfSchema = str(dfiTunes.schema.json())
inputPipeiTunes = CsvPipe("testiTunes")
inputPipeiTunes.setLocation("examples/iTunes-amazon/iTunesMusic.csv")
inputPipeiTunes.setSchema(dfSchema)
dfSchemaiTunes = str(dfiTunes.schema.json())
inputPipeiTunes = CsvPipe("testiTunes", dfSchemaiTunes, "examples/iTunes-amazon/iTunesMusic.csv")

dfAmazon = spark.read.format("csv").schema("id string, Song_Name string, Artist_Name string, Album_Name string, Genre string, Price double, CopyRight string, Time string, Released string").load("examples/iTunes-amazon/AmazonMusic.csv")
dfSchemaAmazon = str(dfAmazon.schema.json())
inputPipeAmazon = CsvPipe("testAmazon")
inputPipeAmazon.setLocation("examples/iTunes-amazon/AmazonMusic.csv")
inputPipeAmazon.setSchema(dfSchemaAmazon)
inputPipeAmazon = CsvPipe("testAmazon", dfSchemaAmazon, "examples/iTunes-amazon/AmazonMusic.csv")

args.setData(inputPipeiTunes,inputPipeAmazon)

#setting outputpipe in 'args'
outputPipe = CsvPipe("iTunesAmazonresult")
outputPipe.setLocation("/tmp/iTunesAmazonOutput")
outputPipe = CsvPipe("iTunesAmazonresult", None, "/tmp/iTunesAmazonOutput")

args.setOutput(outputPipe)

Expand Down
10 changes: 4 additions & 6 deletions examples/ncVoters5M/ncVoters.py
Expand Up @@ -22,16 +22,14 @@
#below line should not be required if you are reading from in memory dataset
#in that case, replace df with input df
df = spark.read.format("csv").schema("recid string, givenname string, surname string, suburb string, postcode double ").load("examples/ncVoters5M/5Party-ocp20/")
dfSchemaA = str(df.schema.json())
dfSchema = str(df.schema.json())
inputPipe = CsvPipe("test", dfSchema, "examples/ncVoters5M/5Party-ocp20/")

inputPipe = CsvPipe("test")
inputPipe.setLocation("examples/ncVoters5M/5Party-ocp20/")
inputPipe.setSchema(dfSchemaA)
args.setData(inputPipe)

#setting outputpipe in 'args'
outputPipe = CsvPipe("ncVotersResult")
outputPipe.setLocation("/tmp/ncVotersOutput")
outputPipe = CsvPipe("ncVotersResult", None, "/tmp/ncVotersOutput")

args.setOutput(outputPipe)

options = ClientOptions()
Expand Down
19 changes: 14 additions & 5 deletions python/zingg/pipes/pipes.py
Expand Up @@ -19,9 +19,17 @@ class CsvPipe(Pipe):
:param name: name of the pipe.
:type name: String
:param schema: (optional) json schema for the pipe
:type schema: Schema or None
:param location: (optional) location from where we read data
:type location: String or None
"""
def __init__(self, name):
def __init__(self, name, schema = None, location = None):
Pipe.__init__(self, name, Format.CSV.type())
if(schema != None):
Pipe.setSchema(schema)
if(location != None):
Pipe.addProperty(self, FilePipe.LOCATION, location)

def setDelimiter(self, delimiter):
""" This method is used to define delimiter of CsvPipe
Expand All @@ -40,11 +48,12 @@ def setLocation(self, location):
Pipe.addProperty(self, FilePipe.LOCATION, location)

def setHeader(self, header):
Pipe.addProperty(self, FilePipe.HEADER, header)

class BigQueryPipe(Pipe):
""" Method to set header property of pipe
=======
:param header: true if data contains header otherwise false
:type header: Bool
"""
Pipe.addProperty(self, FilePipe.HEADER, header)

class BigQueryPipe(Pipe):
""" Pipe Class for working with BigQuery pipeline
Expand Down
56 changes: 56 additions & 0 deletions test/testFebrl/testFebrl.py
@@ -0,0 +1,56 @@
import unittest
from unittest.case import TestCase
import unittest
from io import StringIO


from zingg import *
from zingg.pipes import *

args = Arguments()
fname = FieldDefinition("fname", "string", MatchType.FUZZY)
lname = FieldDefinition("lname", "string", MatchType.FUZZY)
stNo = FieldDefinition("stNo", "string", MatchType.FUZZY)
add1 = FieldDefinition("add1","string", MatchType.FUZZY)
add2 = FieldDefinition("add2", "string", MatchType.FUZZY)
city = FieldDefinition("city", "string", MatchType.FUZZY)
areacode = FieldDefinition("areacode", "string", MatchType.FUZZY)
state = FieldDefinition("state", "string", MatchType.FUZZY)
dob = FieldDefinition("dob", "string", MatchType.FUZZY)
ssn = FieldDefinition("ssn", "string", MatchType.FUZZY)

fieldDefs = [fname, lname, stNo, add1, add2, city, areacode, state, dob, ssn]

args.setFieldDefinition(fieldDefs)
args.setModelId("100")
args.setZinggDir("models")
args.setNumPartitions(4)
args.setLabelDataSampleSize(0.5)

df = spark.read.format("csv").schema("id string, fname string, lname string, stNo string, add1 string, add2 string, city string, state string, areacode string, dob string, ssn string").load("examples/febrl/test.csv")
dfSchema = str(df.schema.json())
inputPipe = CsvPipe("test", dfSchema, "examples/febrl/test.csv")
outputPipe = CsvPipe("result", None, "/tmp/febrlTest")
args.setData(inputPipe)
args.setOutput(outputPipe)
options = ClientOptions()
options.setPhase("trainMatch")

#testing
class Accuracy_recordCount(TestCase):
def test_recordCount(self):
client = Zingg(args, options)
client.initAndExecute()
pMarkedDF = client.getPandasDfFromDs(client.getMarkedRecords())
labelledData = spark.createDataFrame(pMarkedDF)

total_marked = pMarkedDF.shape[0]

# marked record count test
self.assertEqual(total_marked, 76)

pMarkedDF.drop(pMarkedDF[pMarkedDF[ColName.PREDICTION_COL] == -1].index, inplace=True)
acc = (pMarkedDF[ColName.MATCH_FLAG_COL]== pMarkedDF[ColName.PREDICTION_COL]).mean()

# accuracy test
self.assertGreater(acc, 0.9)

0 comments on commit 0bffcbc

Please sign in to comment.