Skip to content
This repository has been archived by the owner on Mar 15, 2021. It is now read-only.

Commit

Permalink
pulling out the file streaming as a module
Browse files Browse the repository at this point in the history
  • Loading branch information
brifordwylie committed Aug 4, 2014
1 parent 6495d7f commit ecf4276
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 26 deletions.
42 changes: 42 additions & 0 deletions workbench_cli/workbench_cli/file_streamer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""File Streaming for Workbench CLI"""
import os, sys
import lz4

class FileStreamer(object):
"""File Streaming for Workbench CLI"""

def __init__(self, workbench, progress):
''' FileStreamer Initialization '''

# Setup workbench and progress handles
self.workbench = workbench
self.progress = progress

# Setup compression
self.compressor = lz4.dumps
self.decompressor = lz4.loads
self.compress_ident = 'lz4'

# Some defaults and counters
self.chunk_size = 1024*1024 # 1 MB

def _file_chunks(self, data, chunk_size):
""" Yield compressed chunks from a data array"""
for i in xrange(0, len(data), chunk_size):
yield self.compressor(data[i:i+chunk_size])

def stream_to_workbench(self, raw_bytes, filename, type_tag):
"""Split up a large file into chunks and send to Workbench"""
md5_list = []
sent_bytes = 0
total_bytes = len(raw_bytes)
for chunk in self._file_chunks(raw_bytes, self.chunk_size):
md5_list.append(self.workbench.store_sample(chunk, filename, self.compress_ident))
sent_bytes += self.chunk_size
self.progress(sent_bytes, total_bytes)

# Now we just ask Workbench to combine these
return self.workbench.combine_samples(md5_list, filename, type_tag)
34 changes: 8 additions & 26 deletions workbench_cli/workbench_cli/workbench_shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@

"""Workbench Interactive Shell using IPython"""
import os, sys
import re
import hashlib
import zerorpc
import IPython
from IPython.core.prefilter import PrefilterTransformer
import functools
from colorama import Fore as F
import re
import lz4

try:
import pandas as pd
Expand All @@ -21,12 +20,14 @@
try:
from . import client_helper
from . import version
from . import file_streamer

# Okay this happens when you're running in a debugger so having this is
# super handy and we'll keep it even though it hurts coverage score.
except (ImportError,ValueError):
import client_helper
import version
import file_streamer

# These little helpers get around IPython wanting to take the
# __repr__ of string output instead of __str__.
Expand Down Expand Up @@ -111,7 +112,7 @@ def __init__(self):
# Grab server arguments
self.server_info = client_helper.grab_server_args()

# Spin up workbench server
# Spin up workbench server (this sets the self.workbench object)
self.connect(self.server_info)

# Create a user session
Expand All @@ -124,6 +125,9 @@ def __init__(self):
# Our Interactive IPython shell
self.ipshell = None

# Our File Streamer
self.streamer = file_streamer.FileStreamer(self.workbench, self.progress_print)

# What OS/Version do we have?
self.beer = '\360\237\215\272' if sys.platform == 'darwin' else ' '

Expand Down Expand Up @@ -166,28 +170,6 @@ def progress_print(self, sent, total):
format(F.GREEN, '#'*(percent/2), ' '*(50-percent/2), F.YELLOW, percent, F.RESET))
sys.stdout.flush()

@staticmethod
def chunks(data, chunk_size):
""" Yield chunk_size chunks from data."""
for i in xrange(0, len(data), chunk_size):
yield lz4.dumps(data[i:i+chunk_size])

def file_chunker(self, raw_bytes, filename, type_tag):
"""Split up a large file into chunks and send to Workbench"""
md5_list = []
sent_bytes = 0
mb = 1024*1024
chunk_size = 1*mb # 1 MB
total_bytes = len(raw_bytes)
for chunk in self.chunks(raw_bytes, chunk_size):
md5_list.append(self.workbench.store_sample(chunk, filename, 'chunk'))
sent_bytes += chunk_size
self.progress_print(sent_bytes, total_bytes)
# print '\t%s- Sending %.1f MB (%.1f MB)...%s' % (F.YELLOW, sent_bytes/mb, total_bytes/mb, F.RESET)

# Now we just ask Workbench to combine these
return self.workbench.combine_samples(md5_list, filename, type_tag)

def load_sample(self, file_path):
"""Load a sample (or samples) into workbench
load_sample </path/to/file_or_dir> """
Expand All @@ -206,7 +188,7 @@ def load_sample(self, file_path):
if not self.workbench.has_sample(md5):
print '%sStreaming Sample...%s' % (F.MAGENTA, F.RESET)
basename = os.path.basename(path)
md5 = self.file_chunker(raw_bytes, basename, 'unknown')
md5 = self.streamer.stream_to_workbench(raw_bytes, basename, 'unknown')

print '\n%s %s%s %sLocked and Loaded...%s\n' % \
(self.beer, F.MAGENTA, md5[:6], F.YELLOW, F.RESET)
Expand Down

0 comments on commit ecf4276

Please sign in to comment.