Skip to content

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also .

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also .
  • 3 commits
  • 5 files changed
  • 0 commit comments
  • 1 contributor
Showing with 274 additions and 180 deletions.
  1. +19 −22 app.py
  2. +253 −0 dedupe_utils.py
  3. +0 −151 deduper.py
  4. +1 −3 queue.py
  5. +1 −4 requirements.txt
View
41 app.py
@@ -11,7 +11,8 @@
from dedupe import AsciiDammit
from dedupe.serializer import _to_json, dedupe_decoder
import dedupe
-from deduper import dedupeit, static_dedupeit
+from dedupe_utils import dedupeit, static_dedupeit, DedupeFileIO,\
+ DedupeFileError
from cStringIO import StringIO
import csv
from queue import DelayedResult
@@ -22,7 +23,7 @@
redis = Redis()
UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), 'upload_data')
-ALLOWED_EXTENSIONS = set(['csv', 'json'])
+ALLOWED_EXTENSIONS = set(['csv', 'xls', 'xlsx'])
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
@@ -44,15 +45,11 @@ def index():
f = request.files['input_file']
if f and allowed_file(f.filename):
deduper_id = str(uuid4())
- inp_file = f.read()
- row_count = inp_file.count('\n')
- if row_count > 10000:
- error = "Your spreadsheet must have less than 10,000 rows. Email info@datamade.us to dedupe larger files."
- status_code = 500
- else:
+ filename = secure_filename(str(time.time()) + "_" + f.filename)
+ try:
+ inp_file = DedupeFileIO(f, filename)
dedupers[deduper_id] = {
'csv': inp_file,
- 'filename': secure_filename(str(time.time()) + "_" + f.filename),
'last_interaction': datetime.now()
}
for key, deduper in dedupers.items():
@@ -61,9 +58,12 @@ def index():
if last_interaction < old:
del dedupers[key]
flask_session['session_id'] = deduper_id
- flask_session['filename'] = secure_filename(str(time.time()) + "_" + f.filename)
- flask_session['row_count'] = dedupers[deduper_id]['csv'].count('\n')
+ flask_session['filename'] = filename
+ flask_session['row_count'] = inp_file.line_count
return redirect(url_for('select_fields'))
+ except DedupeFileError as e:
+ error = e.message
+ status_code = 500
else:
error = 'Error uploading file. Did you forget to select one?'
status_code = 500
@@ -76,9 +76,9 @@ def preProcess(column):
column = column.strip().strip('"').strip("'").lower().strip()
return column
-def readData(f):
+def readData(inp):
data = {}
- reader = csv.DictReader(f)
+ reader = csv.DictReader(StringIO(inp))
for i, row in enumerate(reader):
clean_row = [(k, preProcess(v)) for (k,v) in row.items()]
row_id = i
@@ -93,12 +93,12 @@ def select_fields():
return redirect(url_for('index'))
else:
deduper_id = flask_session['session_id']
- inp = StringIO(dedupers[deduper_id]['csv'])
+ inp = dedupers[deduper_id]['csv'].converted
filename = flask_session['filename']
dedupers[deduper_id]['last_interaction'] = datetime.now()
- reader = csv.reader(inp)
+ reader = csv.reader(StringIO(inp))
fields = reader.next()
- inp = StringIO(dedupers[deduper_id]['csv'])
+ del reader
if request.method == 'POST':
field_list = [r for r in request.form]
if field_list:
@@ -178,11 +178,8 @@ def mark_pair():
counter['no'] += 1
resp = {'counter': counter}
elif action == 'finish':
- filename = flask_session['filename']
- file_path = os.path.join(UPLOAD_FOLDER, filename)
- with open(file_path, 'wb') as f:
- f.write(dedupers[deduper_id]['csv'])
- training_file_path = os.path.join(UPLOAD_FOLDER, '%s-training.json' % filename)
+ file_io = dedupers[deduper_id]['csv']
+ training_file_path = os.path.join(UPLOAD_FOLDER, '%s-training.json' % file_io.file_path)
training_data = dedupers[deduper_id]['training_data']
with open(training_file_path, 'wb') as f:
f.write(json.dumps(training_data, default=_to_json))
@@ -191,7 +188,7 @@ def mark_pair():
args = {
'field_defs': field_defs,
'training_data': training_file_path,
- 'file_path': file_path,
+ 'file_io': file_io,
'data_sample': sample,
}
rv = dedupeit.delay(**args)
View
253 dedupe_utils.py
@@ -0,0 +1,253 @@
+import csv
+import re
+import os
+import json
+from dedupe import AsciiDammit
+import dedupe
+from cStringIO import StringIO
+from collections import defaultdict, OrderedDict
+import logging
+from datetime import datetime
+from queue import queuefunc
+import pdb
+from operator import itemgetter
+from csvkit import convert
+import xlwt
+from openpyxl import Workbook
+from openpyxl.cell import get_column_letter
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), 'upload_data')
+
+class DedupeFileError(Exception):
+ def __init__(self, message):
+ Exception.__init__(self, message)
+ self.message = message
+
+class DedupeFileIO(object):
+ """
+ Take an uploaded file, figure out what type it is, convert it to csv
+ then save it back as the same format.
+ """
+ def __init__(self, raw, filename):
+ self.file_type = convert.guess_format(raw.filename)
+ if self.file_type not in ['xls', 'csv', 'xlsx']:
+ raise DedupeFileError('%s is not a supported format' % self.file_type)
+ self.converted = convert.convert(raw, self.file_type)
+ self.line_count = self.converted.count('\n')
+ if self.line_count > 10000:
+ raise DedupeFileError('Your file has %s rows and we can only currently handle 10,000.' % self.line_count)
+ self.file_path = os.path.abspath(os.path.join(UPLOAD_FOLDER, filename))
+ with open(self.file_path, 'wb') as f:
+ f.write(self.converted)
+
+ def prepare(self, clustered_dupes):
+ self.clustered_dupes = clustered_dupes
+ self.cluster_count = self._prepareResults()
+ self._prepareUniqueResults()
+
+ def _prepareResults(self):
+ """
+ Prepare deduplicated file for writing to various formats with
+ duplicates clustered.
+ """
+ cluster_membership = {}
+ for cluster_id, cluster in enumerate(self.clustered_dupes):
+ for record_id in cluster:
+ cluster_membership[record_id] = cluster_id
+
+ unique_record_id = cluster_id + 1
+
+ f = open(self.file_path, 'rU')
+ reader = csv.reader(f)
+
+ heading_row = reader.next()
+ heading_row.insert(0, 'Group ID')
+
+ rows = []
+
+ for row_id, row in enumerate(reader):
+ if row_id in cluster_membership:
+ cluster_id = cluster_membership[row_id]
+ else:
+ cluster_id = unique_record_id
+ unique_record_id += 1
+ row.insert(0, cluster_id)
+ rows.append(row)
+ rows = sorted(rows, key=itemgetter(0))
+ rows.insert(0, heading_row)
+ self.clustered_rows = []
+ for row in rows:
+ d = OrderedDict()
+ for k,v in zip(heading_row, row):
+ d[k] = v
+ self.clustered_rows.append(d)
+ f.close()
+ return unique_record_id
+
+ def _prepareUniqueResults(self):
+ """ """
+ cluster_membership = {}
+ for (cluster_id, cluster) in enumerate(self.clustered_dupes):
+ for record_id in cluster:
+ cluster_membership[record_id] = cluster_id
+
+ f = open(self.file_path, 'rU')
+ reader = csv.reader(f)
+
+ rows = [reader.next()]
+ seen_clusters = set()
+ for row_id, row in enumerate(reader):
+ if row_id in cluster_membership:
+ cluster_id = cluster_membership[row_id]
+ if cluster_id not in seen_clusters:
+ rows.append(row)
+ seen_clusters.add(cluster_id)
+ else:
+ rows.append(row)
+ self.unique_rows = []
+ for row in rows:
+ d = OrderedDict()
+ for k,v in zip(rows[0], row):
+ d[k] = v
+ self.unique_rows.append(d)
+ f.close()
+ return self.unique_rows
+
+ def writeCSV(self):
+ u_path = '%s-deduped_unique.csv' % self.file_path
+ d_path = '%s-deduped.csv' % self.file_path
+ unique = open(u_path, 'wb')
+ writer = csv.DictWriter(unique, self.unique_rows[0].keys())
+ writer.writeheader()
+ writer.writerows(self.unique_rows)
+ unique.close()
+ clusters = open(d_path, 'wb')
+ writer = csv.DictWriter(clusters, self.clustered_rows[0].keys())
+ writer.writeheader()
+ writer.writerows(self.clustered_rows)
+ clusters.close()
+ return d_path, u_path, self.cluster_count, self.line_count
+
+ def _iterExcel(self, outp_type):
+ rows = getattr(self,outp_type)
+ header = rows[0].keys()
+ for r, row in enumerate(rows):
+ for c, key in enumerate(header):
+ value = row[key]
+ yield r,c,value
+
+ def writeXLS(self):
+ u_path = '%s-deduped_unique.xls' % self.file_path
+ d_path = '%s-deduped.xls' % self.file_path
+ clustered_book = xlwt.Workbook(encoding='utf-8')
+ clustered_sheet = clustered_book.add_sheet('Clustered Results')
+ for r,c,value in self._iterExcel('clustered_rows'):
+ clustered_sheet.write(r,c,label=value)
+ clustered_book.save(d_path)
+ unique_book = xlwt.Workbook(encoding='utf-8')
+ unique_sheet = unique_book.add_sheet('Unique Results')
+ for r,c,value in self._iterExcel('unique_rows'):
+ unique_sheet.write(r,c,label=value)
+ unique_book.save(u_path)
+ return d_path, u_path, self.cluster_count, self.line_count
+
+ def writeXLSX(self):
+ u_path = '%s-deduped_unique.xlsx' % self.file_path
+ d_path = '%s-deduped.xlsx' % self.file_path
+ d_book = Workbook()
+ d_ws = d_book.active
+ d_ws.title = 'Clustered Results'
+ for r,c,value in self._iterExcel('clustered_rows'):
+ col = get_column_letter(c + 1)
+ d_ws.cell('%s%s' % (col, r + 1)).value = value
+ d_book.save(filename=d_path)
+ u_book = Workbook()
+ u_ws = u_book.active
+ u_ws.title = 'Unique Results'
+ for r,c,value in self._iterExcel('unique_rows'):
+ col = get_column_letter(c + 1)
+ u_ws.cell('%s%s' % (col, r + 1)).value = value
+ u_book.save(filename=u_path)
+ return d_path, u_path, self.cluster_count, self.line_count
+
+class WebDeduper(object):
+
+ def __init__(self, deduper,
+ file_io=None,
+ training_data=None,
+ recall_weight=2):
+ self.file_io = file_io
+ self.data_d = self.readData()
+ self.deduper = deduper
+ self.recall_weight = float(recall_weight)
+ self.training_data = training_data
+ if training_data:
+ self.deduper.readTraining(self.training_data)
+ self.deduper.train()
+ self.settings_path = '%s-settings.dedupe' % self.file_io.file_path
+ self.deduper.writeTraining(self.training_data)
+ self.deduper.writeSettings(self.settings_path)
+
+ def dedupe(self):
+ threshold = self.deduper.threshold(self.data_d, recall_weight=self.recall_weight)
+ clustered_dupes = self.deduper.match(self.data_d, threshold)
+ self.file_io.prepare(clustered_dupes)
+ if self.file_io.file_type == 'csv':
+ deduped, deduped_unique, cluster_count, line_count = self.file_io.writeCSV()
+ if self.file_io.file_type == 'xls':
+ deduped, deduped_unique, cluster_count, line_count = self.file_io.writeXLS()
+ if self.file_io.file_type == 'xlsx':
+ deduped, deduped_unique, cluster_count, line_count = self.file_io.writeXLSX()
+ files = {
+ 'deduped': deduped,
+ 'deduped_unique': deduped_unique,
+ 'cluster_count': cluster_count,
+ 'line_count': line_count,
+ }
+ if self.training_data:
+ files['training'] = os.path.relpath(self.training_data, __file__)
+ files['settings'] = os.path.relpath(self.settings_path, __file__)
+ logger.info(files)
+ return files
+
+ def preProcess(self, column):
+ column = AsciiDammit.asciiDammit(column)
+ column = re.sub(' +', ' ', column)
+ column = re.sub('\n', ' ', column)
+ column = column.strip().strip('"').strip("'").lower().strip()
+ return column
+
+ def readData(self):
+ data = {}
+ f = open(self.file_io.file_path, 'rU')
+ reader = csv.DictReader(f)
+ for i, row in enumerate(reader):
+ clean_row = [(k, self.preProcess(v)) for (k,v) in row.items()]
+ row_id = i
+ data[row_id] = dedupe.core.frozendict(clean_row)
+ return data
+
+@queuefunc
+def dedupeit(**kwargs):
+ d = dedupe.Dedupe(kwargs['field_defs'], kwargs['data_sample'])
+ deduper = WebDeduper(d,
+ file_io=kwargs['file_io'],
+ training_data=kwargs['training_data'])
+ files = deduper.dedupe()
+ d.pool.terminate()
+ del d
+ return files
+
+@queuefunc
+def static_dedupeit(**kwargs):
+ d = dedupe.StaticDedupe(kwargs['settings_path'])
+ deduper = WebDeduper(d,
+ file_io=kwargs['file_io'],
+ recall_weight=kwargs['recall_weight'])
+ files = deduper.dedupe()
+ d.pool.terminate()
+ del d
+ return files
View
151 deduper.py
@@ -1,151 +0,0 @@
-import csv
-import re
-import os
-import json
-from dedupe import AsciiDammit
-import dedupe
-from cStringIO import StringIO
-from collections import defaultdict
-import logging
-from datetime import datetime
-from queue import queuefunc
-import pdb
-from operator import itemgetter
-
-logging.basicConfig(level=logging.INFO)
-logger = logging.getLogger(__name__)
-
-class WebDeduper(object):
-
- def __init__(self, deduper,
- file_path=None,
- training_data=None,
- recall_weight=2):
- self.file_path = file_path
- self.data_d = self.readData()
- self.deduper = deduper
- self.recall_weight = float(recall_weight)
- self.training_data = training_data
- if training_data:
- self.deduper.readTraining(self.training_data)
- self.deduper.train()
- self.settings_path = '%s-settings.dedupe' % file_path
- self.deduper.writeTraining(self.training_data)
- self.deduper.writeSettings(self.settings_path)
-
- def dedupe(self):
- logger.info('### Dedupe started')
- threshold = self.deduper.threshold(self.data_d, recall_weight=self.recall_weight)
- logger.info('clustering done')
- clustered_dupes = self.deduper.match(self.data_d, threshold)
- self.deduped_file_path = '%s-deduped.csv' % self.file_path
- self.deduped_unique_file_path = '%s-deduped_unique.csv' % self.file_path
- self.writeUniqueResults(clustered_dupes)
- cluster_count, line_count = self.writeResults(clustered_dupes)
- files = {
- 'deduped': os.path.relpath(self.deduped_file_path, __file__),
- 'deduped_unique': os.path.relpath(self.deduped_unique_file_path, __file__),
- 'cluster_count': cluster_count,
- 'line_count': line_count,
- }
- if self.training_data:
- files['training'] = os.path.relpath(self.training_data, __file__)
- files['settings'] = os.path.relpath(self.settings_path, __file__)
- logger.info(files)
- return files
-
- def writeResults(self, clustered_dupes):
-
- cluster_membership = {}
- for cluster_id, cluster in enumerate(clustered_dupes):
- for record_id in cluster:
- cluster_membership[record_id] = cluster_id
-
- unique_record_id = cluster_id + 1
-
- writer = csv.writer(open(self.deduped_file_path, 'wb'))
-
- reader = csv.reader(open(self.file_path, 'rb'))
-
- heading_row = reader.next()
- heading_row.insert(0, 'Group ID')
- writer.writerow(heading_row)
-
- rows = []
-
- for row_id, row in enumerate(reader):
- if row_id in cluster_membership:
- cluster_id = cluster_membership[row_id]
- else:
- cluster_id = unique_record_id
- unique_record_id += 1
- row.insert(0, cluster_id)
- rows.append(row)
- rows = sorted(rows, key=itemgetter(0))
- writer.writerows(rows)
- return unique_record_id, len(rows)
-
- def writeUniqueResults(self, clustered_dupes):
-
- cluster_membership = {}
- for (cluster_id, cluster) in enumerate(clustered_dupes):
- logging.info(cluster)
- for record_id in cluster:
- cluster_membership[record_id] = cluster_id
-
- writer = csv.writer(open(self.deduped_unique_file_path, 'wb'))
-
- reader = csv.reader(open(self.file_path, 'rb'))
-
- heading_row = reader.next()
- writer.writerow(heading_row)
-
- seen_clusters = set()
- for i, row in enumerate(reader):
- row_id = i
- if row_id in cluster_membership:
- cluster_id = cluster_membership[row_id]
- if cluster_id not in seen_clusters:
- writer.writerow(row)
- seen_clusters.add(cluster_id)
- else:
- writer.writerow(row)
-
- def preProcess(self, column):
- column = AsciiDammit.asciiDammit(column)
- column = re.sub(' +', ' ', column)
- column = re.sub('\n', ' ', column)
- column = column.strip().strip('"').strip("'").lower().strip()
- return column
-
- def readData(self):
- data = {}
- f = open(self.file_path, 'rU')
- reader = csv.DictReader(f)
- for i, row in enumerate(reader):
- clean_row = [(k, self.preProcess(v)) for (k,v) in row.items()]
- row_id = i
- data[row_id] = dedupe.core.frozendict(clean_row)
- return data
-
-@queuefunc
-def dedupeit(**kwargs):
- d = dedupe.Dedupe(kwargs['field_defs'], kwargs['data_sample'])
- deduper = WebDeduper(d,
- file_path=kwargs['file_path'],
- training_data=kwargs['training_data'])
- files = deduper.dedupe()
- d.pool.terminate()
- del d
- return files
-
-@queuefunc
-def static_dedupeit(**kwargs):
- d = dedupe.StaticDedupe(kwargs['settings_path'])
- deduper = WebDeduper(d,
- file_path=kwargs['file_path'],
- recall_weight=kwargs['recall_weight'])
- files = deduper.dedupe()
- d.pool.terminate()
- del d
- return files
View
4 queue.py
@@ -37,9 +37,7 @@ def queue_daemon(app, rv_ttl=500):
try:
rv = func(*args, **kwargs)
except Exception, e:
- exc_type, exc_obj, exc_tb = sys.exc_info()
- fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
- rv = 'Exc: %s, file: %s, line:%s' % (exc_type, fname, exc_tb.tb_lineno)
+ rv = 'Exc: %s' % (e.message)
if rv is not None:
redis.set(key, dumps(rv))
redis.expire(key, rv_ttl)
View
5 requirements.txt
@@ -1,11 +1,8 @@
Flask
gunicorn
requests
-# xlrd==0.9.2
-
-argparse
-simplejson
redis
+csvkit
# ====================
# Dedupe

No commit comments for this range

Something went wrong with that request. Please try again.