Skip to content

Commit

Permalink
Merge pull request #9 from JohnVinyard/python3
Browse files Browse the repository at this point in the history
Python3
  • Loading branch information
JohnVinyard committed Mar 2, 2019
2 parents a44314f + e803204 commit 8f95553
Show file tree
Hide file tree
Showing 33 changed files with 402 additions and 314 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: python
python:
# We don't actually use the Travis Python, but this keeps it organized.
- "2.7"
- "3.6"
before_install:
- chmod +x setup.sh
- sudo ./setup.sh
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[![Build Status](https://travis-ci.org/JohnVinyard/featureflow.svg?branch=master)](https://travis-ci.org/JohnVinyard/featureflow)
[![Coverage Status](https://coveralls.io/repos/github/JohnVinyard/featureflow/badge.svg?branch=master)](https://coveralls.io/github/JohnVinyard/featureflow?branch=master)
![Python 3](https://img.shields.io/pypi/pyversions/featureflow.svg)
[![PyPI](https://img.shields.io/pypi/v/featureflow.svg)](https://pypi.python.org/pypi/featureflow)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

Expand Down
4 changes: 2 additions & 2 deletions examples/wordcount.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import print_function

import featureflow as ff
import argparse
from collections import Counter
Expand Down Expand Up @@ -28,7 +28,7 @@ def _dequeue(self):
return matches

def _process(self, data):
yield map(lambda x: x.groupdict()['word'].lower(), data)
yield [x.groupdict()['word'].lower() for x in data]


class WordCount(ff.Aggregator, ff.Node):
Expand Down
32 changes: 16 additions & 16 deletions featureflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,41 @@
__version__ = '2.12.1'

from model import BaseModel, ModelExistsError
from .model import BaseModel, ModelExistsError

from feature import Feature, JSONFeature, TextFeature, CompressedFeature, \
from .feature import Feature, JSONFeature, TextFeature, CompressedFeature, \
PickleFeature, ClobberPickleFeature, ClobberJSONFeature

from extractor import Node, Graph, Aggregator, NotEnoughData
from .extractor import Node, Graph, Aggregator, NotEnoughData

from bytestream import ByteStream, ByteStreamFeature, ZipWrapper, iter_zip
from .bytestream import ByteStream, ByteStreamFeature, ZipWrapper, iter_zip

from data import \
from .data import \
IdProvider, UuidProvider, UserSpecifiedIdProvider, StaticIdProvider, \
KeyBuilder, StringDelimitedKeyBuilder, Database, FileSystemDatabase, \
InMemoryDatabase

from datawriter import DataWriter
from .datawriter import DataWriter

from database_iterator import DatabaseIterator
from .database_iterator import DatabaseIterator

from encoder import IdentityEncoder, PickleEncoder
from .encoder import IdentityEncoder, PickleEncoder

from decoder import Decoder, PickleDecoder
from .decoder import Decoder, PickleDecoder

from lmdbstore import LmdbDatabase
from .lmdbstore import LmdbDatabase

from objectstore import ObjectStoreDatabase
from .objectstore import ObjectStoreDatabase

from persistence import PersistenceSettings, simple_in_memory_settings
from .persistence import PersistenceSettings, simple_in_memory_settings

from iteratornode import IteratorNode
from .iteratornode import IteratorNode

from eventlog import EventLog, RedisChannel, InMemoryChannel
from .eventlog import EventLog, RedisChannel, InMemoryChannel

from var import Var
from .var import Var

try:
from nmpy import NumpyEncoder, PackedNumpyEncoder, StreamingNumpyDecoder, \
from .nmpy import NumpyEncoder, PackedNumpyEncoder, StreamingNumpyDecoder, \
BaseNumpyDecoder, NumpyMetaData, NumpyFeature
except ImportError:
pass
32 changes: 16 additions & 16 deletions featureflow/bytestream.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from extractor import Node
from decoder import Decoder
from feature import Feature
from util import chunked
from .extractor import Node
from .decoder import Decoder
from .feature import Feature
from .util import chunked
import requests
from urlparse import urlparse
from urllib.parse import urlparse
import os
import struct
import zipfile
Expand All @@ -18,7 +18,7 @@ def _generator(self, stream, content_length):
if not content_length:
raise ValueError('content_length should be greater than zero')
for chunk in chunked(stream, chunksize=self._chunksize):
yield StringWithTotalLength(chunk, content_length)
yield BytesWithTotalLength(chunk, content_length)

def _from_http_response(self, resp):
resp.raise_for_status()
Expand Down Expand Up @@ -97,21 +97,21 @@ def filename(self):
return self.zipinfo.filename


class StringWithTotalLength(str):
class BytesWithTotalLength(bytes):
def __new__(cls, s, total_length):
o = str.__new__(cls, s)
o = bytes.__new__(cls, s)
o.total_length = int(total_length)
return o

def __radd__(self, other):
return StringWithTotalLength(other + str(self), self.total_length)
return BytesWithTotalLength(other + bytes(self), self.total_length)


class StringWithTotalLengthEncoder(Node):
class BytesWithTotalLengthEncoder(Node):
content_type = 'application/octet-stream'

def __init__(self, needs=None):
super(StringWithTotalLengthEncoder, self).__init__(needs=needs)
super(BytesWithTotalLengthEncoder, self).__init__(needs=needs)
self._metadata_written = False

def _process(self, data):
Expand All @@ -121,9 +121,9 @@ def _process(self, data):
yield data


class StringWithTotalLengthDecoder(Decoder):
class BytesWithTotalLengthDecoder(Decoder):
def __init__(self, chunksize=4096):
super(StringWithTotalLengthDecoder, self).__init__()
super(BytesWithTotalLengthDecoder, self).__init__()
self._chunksize = chunksize
self._total_length = None

Expand All @@ -133,7 +133,7 @@ def __call__(self, flo):
def __iter__(self, flo):
self._total_length = struct.unpack('I', flo.read(4))[0]
for chunk in chunked(flo, self._chunksize):
yield StringWithTotalLength(chunk, self._total_length)
yield BytesWithTotalLength(chunk, self._total_length)


class ByteStreamFeature(Feature):
Expand All @@ -148,8 +148,8 @@ def __init__(
extractor,
needs=needs,
store=store,
encoder=StringWithTotalLengthEncoder,
decoder=StringWithTotalLengthDecoder(
encoder=BytesWithTotalLengthEncoder,
decoder=BytesWithTotalLengthDecoder(
chunksize=extractor_args['chunksize']),
key=key,
**extractor_args)
30 changes: 17 additions & 13 deletions featureflow/data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from StringIO import StringIO
from io import StringIO, BytesIO
from uuid import uuid4
import os

Expand Down Expand Up @@ -115,9 +115,9 @@ def __delitem__(self, key):
raise NotImplementedError()


class IOWithLength(StringIO):
class IOWithLength(BytesIO):
def __init__(self, content):
StringIO.__init__(self, content)
super().__init__(content)
self._length = len(content)

def __len__(self):
Expand All @@ -130,17 +130,17 @@ def __init__(self, key_builder=None):
self._dict = dict()

def write_stream(self, key, content_type):
sio = StringIO()
self._dict[key] = sio
bio = BytesIO()
self._dict[key] = bio

def hijacked_close():
sio.seek(0)
self._dict[key] = sio.read()
sio._old_close()
bio.seek(0)
self._dict[key] = bio.read()
bio._old_close()

sio._old_close = sio.close
sio.close = hijacked_close
return sio
bio._old_close = bio.close
bio.close = hijacked_close
return bio

def read_stream(self, key):
return IOWithLength(self._dict[key])
Expand All @@ -150,7 +150,7 @@ def size(self, key):

def iter_ids(self):
seen = set()
for key in self._dict.iterkeys():
for key in list(self._dict.keys()):
_id, _, _ = self.key_builder.decompose(key)
if _id in seen:
continue
Expand All @@ -168,6 +168,7 @@ class LazyFile(object):
"""
A file wrapper that won't create a file until some bytes have been written
"""

def __init__(self, path):
super(LazyFile, self).__init__()
self.path = path
Expand All @@ -192,7 +193,10 @@ def write(self, data):

if self._file is None:
self._file = open(self.path, 'wb')
self._file.write(data)
try:
self._file.write(data.encode())
except AttributeError:
self._file.write(data)


class FileSystemDatabase(Database):
Expand Down
2 changes: 1 addition & 1 deletion featureflow/database_iterator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from extractor import Node
from .extractor import Node


class DatabaseIterator(Node):
Expand Down
26 changes: 18 additions & 8 deletions featureflow/datawriter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from cStringIO import StringIO
from extractor import Node
from io import StringIO, BytesIO
from .extractor import Node
import json


Expand Down Expand Up @@ -72,6 +72,10 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self._log_events()

def _process(self, data):
try:
data = data.encode()
except AttributeError:
pass
yield self._stream.write(data)


Expand Down Expand Up @@ -102,12 +106,15 @@ def _cleanup_after_error(self):

def _process(self, data):
self._stream = self.database.write_stream(self.key, self.content_type)
x = self._stream.write(data)
try:
x = self._stream.write(data)
except TypeError:
x = self._stream.write(data.encode())
self._stream.close()
yield x


class StringIODataWriter(BaseDataWriter):
class BytesIODataWriter(BaseDataWriter):
def __init__(
self,
needs=None,
Expand All @@ -119,7 +126,7 @@ def __init__(
event_log=None,
buffer_size_limit=None):

super(StringIODataWriter, self).__init__(
super(BytesIODataWriter, self).__init__(
needs=needs,
key_builder=key_builder,
database=database)
Expand All @@ -132,17 +139,20 @@ def __init__(
self._id = _id
self.feature_name = feature_name
self.content_type = needs.content_type
self._stream = StringIO()
self._stream = BytesIO()

def __exit__(self, exc_type, exc_val, exc_tb):
del self._stream

def _process(self, data):
if len(data) < self.buffer_size_limit:
yield self._stream.write(data)
try:
yield self._stream.write(data)
except TypeError:
yield self._stream.write(data.encode())
else:
chunksize = self.buffer_size_limit // 2
for i in xrange(0, len(data), chunksize):
for i in range(0, len(data), chunksize):
yield self._stream.write(data[i: i + chunksize])


30 changes: 27 additions & 3 deletions featureflow/decoder.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import json
from util import chunked
from extractor import Node
from .util import chunked
from .extractor import Node
import bz2
from dill import loads
from io import StringIO


class Decoder(object):
Expand All @@ -21,6 +22,18 @@ def __iter__(self, flo):
yield chunk


class TextDecoder(Decoder):
def __init__(self):
super().__init__()

def __call__(self, flo):
return StringIO(flo.read().decode())

def __iter__(self):
for chunk in super().__iter__(self.flo):
yield chunk.decode()


class GreedyDecoder(Decoder):
"""
A decoder that reads the entire file contents into memory
Expand All @@ -36,7 +49,18 @@ def __iter__(self, flo):
yield self(flo)


class JSONDecoder(GreedyDecoder):
class GreedyTextDecoder(TextDecoder):
def __init__(self):
super().__init__()

def __call__(self, flo):
return flo.read().decode()

def __iter__(self, flo):
yield self(flo)


class JSONDecoder(GreedyTextDecoder):
"""
A decoder that interprets the data as JSON
"""
Expand Down
8 changes: 4 additions & 4 deletions featureflow/dummyserver.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import BaseHTTPServer
import http.server
import sys


def handler_class(static_content):
class DummyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
class DummyHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content-Length', len(static_content))
self.send_header('Content-Type', 'text/plain')
self.end_headers()
self.wfile.write(content)
self.wfile.write(content.encode())

return DummyHandler

if __name__ == '__main__':
port = int(sys.argv[1])
content = sys.argv[2]
server = BaseHTTPServer.HTTPServer(
server = http.server.HTTPServer(
('localhost', port),
handler_class(content))
server.serve_forever()

0 comments on commit 8f95553

Please sign in to comment.