In [1]:
from py4j.java_gateway import JavaGateway, CallbackServerParameters
import cProfile

In [2]:
# connect to the JVM
gateway = JavaGateway(callback_server_parameters=CallbackServerParameters())  

In [3]:
# LambdaWrapper: a wrapper for lambda expression in python. This wrapper will be converted to the Function class in the Java side
class LambdaWrapper(object):

    def __init__(self, f):
        self._f = f

    def apply(self, arg):
        return self._f(arg)

    class Java:
        implements = ['java.util.function.Function']

In [4]:
class Pipeline(object):
    
    memPs = dict()
    count = 0
    
    def __init__(self):
        global gateway
        self.gateway = gateway
        # map the python-side Pipeline wrapper to a index
        Pipeline.memPs[self] = Pipeline.count
        Pipeline.count += 1
        # call the java-side wrapper to create a real Pipeline identified by the index
        #gateway.entry_point.create_MemPipeline()
        gateway.jvm.crunch_in_python.JavaWrapper.Pipeline_.getMemPipelineInstance_()
        
    def readTextFile(self, inputPath):
        # retrieve the index of the Pipeline wrapper
        index = Pipeline.memPs[self]
        # call the java-side wrapper to execute the readTextFile method for the real Pipeline identified by the index
        gateway.jvm.crunch_in_python.JavaWrapper.Pipeline_.readTextFile_(index, inputPath)
        # return a PCollection wrapper to represent the result of the readTextFile
        return PCollection()
    
    def writeTextFile(self, data, outputPath):
        # retrieve the index of the Pipeline wrapper
        index_pipeline = Pipeline.memPs[self]
        
        if isinstance(data, PTable):
            # retrieve the index of the wrapper
            index_data = PTable.pTables[data]
            # call the java-side wrapper to execute the writeTextFile method for the real Pipeline identified by the index
            gateway.jvm.crunch_in_python.JavaWrapper.Pipeline_.writeTextFile_(index_pipeline, index_data, outputPath, "PTable")
        elif isinstance(data, PCollection):
            # retrieve the index of the wrapper
            index_data = PCollection.pCols[data]
            # call the java-side wrapper to execute the writeTextFile method for the real Pipeline identified by the index
            gateway.jvm.crunch_in_python.JavaWrapper.Pipeline_.writeTextFile_(index_pipeline, index_data, outputPath, "PCollection")
        
    def done(self):
        # retrieve the index of the Pipeline wrapper
        index = Pipeline.memPs[self]
        # call the java-side wrapper to execute the writeTextFile method for the real Pipeline identified by the index
        gateway.jvm.crunch_in_python.JavaWrapper.Pipeline_.done_(index)

In [5]:
class PCollection(object):
    
    pCols = dict()
    count = 0
    
    def __init__(self):
        global gateway
        self.gateway = gateway
        # map the python-side PCollection wrapper to a index
        PCollection.pCols[self] = PCollection.count
        PCollection.count += 1
        
    def parallelDo(self, lambda_exp):
        # wrap the lambda expression
        lambdaWrapper = LambdaWrapper(lambda_exp)
        # retrieve the index of the PCollection wrapper
        index = PCollection.pCols[self]
        # ask the java-side wrapper to execute the parallelDo method for the real PCollection identified by the index
        gateway.jvm.crunch_in_python.JavaWrapper.PCollection_.parallelDo_(index, lambdaWrapper)
        # return a PCollection wrapper to represent the result of the paralleDo
        return PCollection()
    
    def aggregate_count(self):
        # retrieve the index of the PCollection wrapper
        index = PCollection.pCols[self]
        # ask the java-side wrapper to execute the count method for the real PCollection identified by the index
        gateway.jvm.crunch_in_python.JavaWrapper.PCollection_.count_(index)
        # return a PTable wrapper to represent the result of the paralleDo
        return PTable()
    
    def max(self):
        # retrieve the index of the PCollection wrapper
        index = PCollection.pCols[self]
        # ask the java-side wrapper to execute the max method for the real PCollection identified by the index
        gateway.jvm.crunch_in_python.JavaWrapper.PCollection_.max_(index)
        # return a PObject wrapper to represent the result of the paralleDo
        return PObject()
    
    def tokenize(self):
        # retrieve the index of the PCollection wrapper
        index = PCollection.pCols[self]
        # ask the java-side wrapper to execute the tokenize method for the real PCollection identified by the index
        gateway.jvm.crunch_in_python.JavaWrapper.PCollection_.tokenize_(index)
        # return a PCollection wrapper to represent the result of the paralleDo
        return PCollection()
    
    def toDouble(self):
        # retrieve the index of the PCollection wrapper
        index = PCollection.pCols[self]
        # ask the java-side wrapper to execute the toDouble method for the real PCollection identified by the index
        gateway.jvm.crunch_in_python.JavaWrapper.PCollection_.toDouble_(index)
        # return a PCollection wrapper to represent the result of the paralleDo
        return PCollection()

In [6]:
class PTable(object):
    
    pTables = dict()
    count = 0
    
    def __init__(self):
        global gateway
        self.gateway = gateway
        # map the python-side PTable wrapper to a index
        PTable.pTables[self] = PTable.count
        PTable.count += 1  
    

In [7]:
class PObject(object):
    
    pObjects = dict()
    count = 0
    
    def __init__(self):
        global gateway
        self.gateway = gateway
        # map the python-side PTable wrapper to a index
        PObject.pObjects[self] = PObject.count
        PObject.count += 1  
        
    def getValue(self):
        # retrieve the index of the PCollection wrapper
        index = PObject.pObjects[self]
        # ask the java-side wrapper to execute the toDouble method for the real PCollection identified by the index
        value = gateway.jvm.crunch_in_python.JavaWrapper.PObject.getValue_(index)
        # return a PCollection wrapper to represent the result of the paralleDo
        return value

In [8]:
# test 1: word count example
def word_count_test(inputPath, outputPath):
    pipeline = Pipeline()
    lines = pipeline.readTextFile(inputPath)
    words = lines.tokenize()
    counts = words.aggregate_count()
    pipeline.writeTextFile(counts, outputPath)
    pipeline.done()


In [9]:
# test 2: Celsius to Fahrenheit example
def Celsius_to_Fahrenheit_test():
    pipeline = Pipeline()
    inputPath = "C:\\Users\\Tim\\Documents\\cs_239_big_data_system\\crunch_in_python\\input.txt"
    outputPath = "C:\\Users\\Tim\\Documents\\cs_239_big_data_system\\crunch_in_python\\output"
    lines = pipeline.readTextFile(inputPath)
    temperature_Celsius = lines.tokenize().toDouble()
    temperature_Fahrenheit = temperature_Celsius.parallelDo(lambda t : 1.8 * t + 32)
    pipeline.writeTextFile(temperature_Fahrenheit, outputPath)
    pipeline.done()

In [10]:
# benchmark of word count example
def word_count_profile():
    # profiling execution time
    prefix = "C:\\Users\\Tim\\Documents\\cs_239_big_data_system\\crunch_in_python\\"
    inputPaths = [prefix + "benchmark" + str(i) + ".txt" for i in range(1,3)]
    outputPaths = [prefix + "output" + str(i) for i in range(1,3)]
    for i in range(2):
        cProfile.runctx('f(x, y)', {'f': word_count_test, 'x': inputPaths[i], 'y': outputPaths[i]}, {})
    # profiling memory usage

In [11]:
word_count_profile()

         1091 function calls in 0.987 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    0.359    0.359 <ipython-input-4-533dd06aca18>:16(readTextFile)
        1    0.000    0.000    0.133    0.133 <ipython-input-4-533dd06aca18>:24(writeTextFile)
        1    0.000    0.000    0.004    0.004 <ipython-input-4-533dd06aca18>:39(done)
        1    0.000    0.000    0.302    0.302 <ipython-input-4-533dd06aca18>:6(__init__)
        1    0.000    0.000    0.088    0.088 <ipython-input-5-a10c56d43c5b>:23(aggregate_count)
        1    0.000    0.000    0.102    0.102 <ipython-input-5-a10c56d43c5b>:39(tokenize)
        2    0.000    0.000    0.000    0.000 <ipython-input-5-a10c56d43c5b>:6(__init__)
        1    0.000    0.000    0.000    0.000 <ipython-input-6-f0d58a3f6fb9>:6(__init__)
        1    0.000    0.000    0.987    0.987 <ipython-input-8-a94a75f577ee>:2(word_count_test)
        1    0.000    0.00

In [19]:
# test: check if the java wrapper works
Celsius_to_Fahrenheit_test()

In [12]:
gateway.close_callback_server()