Skip to content
This repository has been archived by the owner on Sep 9, 2020. It is now read-only.

Commit

Permalink
Merge 7679fcd into 13da21e
Browse files Browse the repository at this point in the history
  • Loading branch information
falkerson committed Jul 15, 2019
2 parents 13da21e + 7679fcd commit 1e9ba4b
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 24 deletions.
60 changes: 38 additions & 22 deletions datarobot_batch_scoring/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
REPORT_INTERVAL,
ProgressQueueMsg)
from datarobot_batch_scoring.detect import Detector
from datarobot_batch_scoring.utils import get_rusage, SerializableDialect
from datarobot_batch_scoring.utils import (get_rusage,
gzip_with_encoding,
SerializableDialect)


if six.PY2:
Expand Down Expand Up @@ -383,7 +385,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.p.terminate()


def sniff_dialect(sample, encoding, sep, skip_dialect, ui):
def sniff_dialect(sample, sep, skip_dialect, ui):
t1 = time()
try:
if skip_dialect:
Expand All @@ -396,11 +398,11 @@ def sniff_dialect(sample, encoding, sep, skip_dialect, ui):
dialect = csv.get_dialect('dataset_dialect')
else:
sniffer = csv.Sniffer()
dialect = sniffer.sniff(sample.decode(encoding), delimiters=sep)
dialect = sniffer.sniff(sample, delimiters=sep)
ui.debug('investigate_encoding_and_dialect - seconds to detect '
'csv dialect: {}'.format(time() - t1))
except csv.Error:
decoded_one = sample.decode(encoding)
decoded_one = sample
t2 = time()
detector = Detector()
delimiter, resampled = detector.detect(decoded_one)
Expand Down Expand Up @@ -432,6 +434,25 @@ def sniff_dialect(sample, encoding, sep, skip_dialect, ui):
return dialect


def get_opener_and_mode(is_gz, text=False):
if is_gz:
if six.PY2:
mode = 'r' if text else 'rb'
return (gzip_with_encoding, mode)
else:
mode = 'rt' if text else 'rb'
return (gzip.open, mode)
else:
if six.PY2:
if text:
from io import open as io_open
return (io_open, 'r')
return (open, 'rU')
else:
mode = 'rt' if text else 'rb'
return (open, mode)


def investigate_encoding_and_dialect(dataset, sep, ui, fast=False,
encoding=None, skip_dialect=False,
output_delimiter=None):
Expand All @@ -443,12 +464,8 @@ def investigate_encoding_and_dialect(dataset, sep, ui, fast=False,
sample_size = DETECT_SAMPLE_SIZE_FAST
else:
sample_size = DETECT_SAMPLE_SIZE_SLOW

is_gz = dataset.endswith('.gz')
opener, mode = (
(gzip.open, 'rb') if is_gz
else (open, ('rU' if six.PY2 else 'rb'))
)
opener, mode = get_opener_and_mode(is_gz)
with opener(dataset, mode) as dfile:
sample = dfile.read(sample_size)

Expand All @@ -462,8 +479,12 @@ def investigate_encoding_and_dialect(dataset, sep, ui, fast=False,
encoding = encoding.lower()
sample[:1000].decode(encoding) # Fail here if the encoding is invalid

opener, mode = get_opener_and_mode(is_gz, text=True)
with opener(dataset, mode, encoding=encoding) as dfile:
sample = dfile.read(sample_size)

try:
dialect = sniff_dialect(sample, encoding, sep, skip_dialect, ui)
dialect = sniff_dialect(sample, sep, skip_dialect, ui)
except csv.Error as ex:
ui.fatal(ex)
if len(sample) < 10:
Expand Down Expand Up @@ -518,15 +539,11 @@ def auto_sampler(dataset, encoding, ui):

sample_size = AUTO_SAMPLE_SIZE
is_gz = dataset.endswith('.gz')
opener, mode = (gzip.open, 'rb') if is_gz else (open, 'rU')
with opener(dataset, mode) as dfile:
opener, mode = get_opener_and_mode(is_gz, text=True)
with opener(dataset, mode, encoding=encoding) as dfile:
sample = dfile.read(sample_size)
if six.PY3 and not is_gz:
sample = sample.encode(encoding or 'utf-8')

ingestable_sample = sample.decode(encoding)
size_bytes = sys.getsizeof(ingestable_sample.encode('utf-8'))

size_bytes = sys.getsizeof(sample.encode('utf-8'))
if size_bytes < (sample_size * 0.75):
# if dataset is tiny, don't bother auto sampling.
ui.info('auto_sampler: total time seconds - {}'.format(time() - t0))
Expand All @@ -536,15 +553,15 @@ def auto_sampler(dataset, encoding, ui):

if six.PY3:
buf = io.StringIO()
buf.write(ingestable_sample)
buf.write(sample)
else:
buf = StringIO.StringIO()
buf.write(sample)
buf.write(sample.encode('utf-8'))
buf.seek(0)

file_lines, csv_lines = 0, 0
dialect = csv.get_dialect('dataset_dialect')
fd = Recoder(buf, encoding)
reader = csv.reader(fd, dialect=dialect, delimiter=dialect.delimiter)
reader = csv.reader(buf, dialect=dialect, delimiter=dialect.delimiter)
line_pos = []
for _ in buf:
file_lines += 1
Expand All @@ -559,7 +576,6 @@ def auto_sampler(dataset, encoding, ui):
# PRED-1240 there's no guarantee that we got _any_ fully formed lines.
# If so, the dataset is super wide, so we only send 10 rows at a time
return AUTO_SAMPLE_FALLBACK

try:
for _ in reader:
csv_lines += 1
Expand Down
16 changes: 16 additions & 0 deletions datarobot_batch_scoring/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import codecs
import csv
import getpass
import gzip
import io
import logging
import os
import sys
from contextlib import contextmanager
from collections import namedtuple
from functools import partial
from gzip import GzipFile
Expand Down Expand Up @@ -518,3 +521,16 @@ def state(self, status):

def state_name(self, s=None):
return self.state_names[s or self.state]


@contextmanager
def gzip_with_encoding(data, mode, encoding=None):
""" Decorator to support encoding for gzip in PY2
"""
if encoding is not None:
reader = codecs.getreader(encoding)
with gzip.open(data, mode) as f:
yield reader(f)
else:
with gzip.open(data, mode) as f:
yield f
4 changes: 2 additions & 2 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,9 @@ def test_investigate_encoding_and_dialect_substitute_delimiter():
data = 'tests/fixtures/windows_encoded.csv'
encoding = investigate_encoding_and_dialect(data, '|', ui,
fast=False,
encoding='utf-8',
encoding='',
skip_dialect=True)
assert encoding == 'utf-8' # Intentionally wrong
assert encoding == 'windows-1252'
assert not sn.called
dialect = csv.get_dialect('dataset_dialect')
assert dialect.delimiter == '|'
Expand Down

0 comments on commit 1e9ba4b

Please sign in to comment.