Skip to content

Commit

Permalink
modify dstream.py to fix indent error
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Takagiwa authored and giwa committed Sep 20, 2014
1 parent 41886c2 commit 66fcfff
Show file tree
Hide file tree
Showing 12 changed files with 0 additions and 655 deletions.
11 changes: 0 additions & 11 deletions bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ export SPARK_SUBMIT_PROPERTIES_FILE=${SPARK_SUBMIT_PROPERTIES_FILE:-"$DEFAULT_PR
# paths, library paths, java options and memory early on. Otherwise, it will
# be too late by the time the driver JVM has started.

<<<<<<< HEAD
if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FILE" ]]; then
# Parse the properties file only if the special configs exist
contains_special_configs=$(
Expand All @@ -58,16 +57,6 @@ if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FI
if [ -n "$contains_special_configs" ]; then
export SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
fi
=======
# Figure out which Python executable to use
if [[ -z "$PYSPARK_PYTHON" ]]; then
PYSPARK_PYTHON="python"
fi
export PYSPARK_PYTHON

if [ -n "$DRIVER_MEMORY" ] && [ $DEPLOY_MODE == "client" ]; then
export SPARK_DRIVER_MEMORY=$DRIVER_MEMORY
>>>>>>> initial commit for pySparkStreaming
fi

exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"
Expand Down
4 changes: 0 additions & 4 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<<<<<<< HEAD
<version>1.2.0-SNAPSHOT</version>
=======
<version>1.0.0</version>
>>>>>>> initial commit for pySparkStreaming
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
16 changes: 0 additions & 16 deletions examples/src/main/python/streaming/wordcount.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import sys
from operator import add

<<<<<<< HEAD
from pyspark.conf import SparkConf
=======
>>>>>>> initial commit for pySparkStreaming
from pyspark.streaming.context import StreamingContext
from pyspark.streaming.duration import *

if __name__ == "__main__":
if len(sys.argv) != 2:
print >> sys.stderr, "Usage: wordcount <directory>"
exit(-1)
<<<<<<< HEAD
conf = SparkConf()
conf.setAppName("PythonStreamingWordCount")

Expand All @@ -24,17 +20,5 @@
count = mapped_words.reduceByKey(add)

count.pyprint()
=======
ssc = StreamingContext(appName="PythonStreamingWordCount", duration=Seconds(1))

lines = ssc.textFileStream(sys.argv[1])
fm_lines = lines.flatMap(lambda x: x.split(" "))
filtered_lines = fm_lines.filter(lambda line: "Spark" in line)
mapped_lines = fm_lines.map(lambda x: (x, 1))

fm_lines.pyprint()
filtered_lines.pyprint()
mapped_lines.pyprint()
>>>>>>> initial commit for pySparkStreaming
ssc.start()
ssc.awaitTermination()
6 changes: 0 additions & 6 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,10 @@ def run(self):
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
<<<<<<< HEAD
java_import(gateway.jvm, "org.apache.spark.streaming.*") # do we need this?
java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*")
java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*")
java_import(gateway.jvm, "org.apache.spark.streaming.dstream.*") # do we need this?
=======
java_import(gateway.jvm, "org.apache.spark.streaming.*")
java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*")
java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*")
>>>>>>> initial commit for pySparkStreaming
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
Expand Down
99 changes: 0 additions & 99 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
<<<<<<< HEAD
=======
__author__ = 'ktakagiw'


>>>>>>> initial commit for pySparkStreaming
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
Expand All @@ -21,7 +15,6 @@
# limitations under the License.
#

<<<<<<< HEAD
import sys
from signal import signal, SIGTERM, SIGINT

Expand All @@ -36,43 +29,12 @@ class StreamingContext(object):
"""
Main entry point for Spark Streaming functionality. A StreamingContext represents the
connection to a Spark cluster, and can be used to create L{DStream}s and
=======
import os
import shutil
import sys
from threading import Lock
from tempfile import NamedTemporaryFile

from pyspark import accumulators
from pyspark.accumulators import Accumulator
from pyspark.broadcast import Broadcast
from pyspark.conf import SparkConf
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
from pyspark.storagelevel import StorageLevel
from pyspark.rdd import RDD
from pyspark.context import SparkContext

from py4j.java_collections import ListConverter

from pyspark.streaming.dstream import DStream

class StreamingContext(object):
"""
Main entry point for Spark functionality. A StreamingContext represents the
connection to a Spark cluster, and can be used to create L{RDD}s and
>>>>>>> initial commit for pySparkStreaming
broadcast variables on that cluster.
"""

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
<<<<<<< HEAD
gateway=None, sparkContext=None, duration=None):
=======
gateway=None, duration=None):
>>>>>>> initial commit for pySparkStreaming
"""
Create a new StreamingContext. At least the master and app name and duration
should be set, either through the named parameters here or through C{conf}.
Expand All @@ -93,7 +55,6 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
@param conf: A L{SparkConf} object setting Spark properties.
@param gateway: Use an existing gateway and JVM, otherwise a new JVM
will be instatiated.
<<<<<<< HEAD
@param sparkContext: L{SparkContext} object.
@param duration: A L{Duration} object for SparkStreaming.
Expand All @@ -112,23 +73,13 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
# is started in StreamingContext.
SparkContext._gateway.restart_callback_server()
self._clean_up_trigger()
=======
@param duration: A L{Duration} Duration for SparkStreaming

"""
# Create the Python Sparkcontext
self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome,
pyFiles=pyFiles, environment=environment, batchSize=batchSize,
serializer=serializer, conf=conf, gateway=gateway)
>>>>>>> initial commit for pySparkStreaming
self._jvm = self._sc._jvm
self._jssc = self._initialize_context(self._sc._jsc, duration._jduration)

# Initialize StremaingContext in function to allow subclass specific initialization
def _initialize_context(self, jspark_context, jduration):
return self._jvm.JavaStreamingContext(jspark_context, jduration)

<<<<<<< HEAD
def _clean_up_trigger(self):
"""Kill py4j callback server properly using signal lib"""

Expand Down Expand Up @@ -205,53 +156,3 @@ def _testInputStream(self, test_inputs, numSlices=None):
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()

return DStream(jinput_stream, self, test_rdd_deserializers[0])
=======
def actorStream(self, props, name, storageLevel, supervisorStrategy):
raise NotImplementedError

def addStreamingListener(self, streamingListener):
raise NotImplementedError

def awaitTermination(self, timeout=None):
if timeout:
self._jssc.awaitTermination(timeout)
else:
self._jssc.awaitTermination()

def checkpoint(self, directory):
raise NotImplementedError

def fileStream(self, directory, filter=None, newFilesOnly=None):
raise NotImplementedError

def networkStream(self, receiver):
raise NotImplementedError

def queueStream(self, queue, oneAtATime=True, defaultRDD=None):
raise NotImplementedError

def rawSocketStream(self, hostname, port, storagelevel):
raise NotImplementedError

def remember(self, duration):
raise NotImplementedError

def socketStream(hostname, port, converter,storageLevel):
raise NotImplementedError

def start(self):
self._jssc.start()

def stop(self, stopSparkContext=True):
raise NotImplementedError

def textFileStream(self, directory):
return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())

def transform(self, seq):
raise NotImplementedError

def union(self, seq):
raise NotImplementedError

>>>>>>> initial commit for pySparkStreaming
Loading

0 comments on commit 66fcfff

Please sign in to comment.