Skip to content

Commit

Permalink
fied input of socketTextDStream
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Takagiwa authored and giwa committed Sep 20, 2014
1 parent f746109 commit 0d1b954
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 22 deletions.
2 changes: 1 addition & 1 deletion examples/src/main/python/streaming/nerwork_wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
exit(-1)
ssc = StreamingContext(appName="PythonStreamingNetworkWordCount", duration=Seconds(1))

lines = ssc.socketTextStream(sys.argv[1], sys.argv[2])
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
fm_lines = lines.flatMap(lambda x: x.split(" "))
filtered_lines = fm_lines.filter(lambda line: "Spark" in line)
mapped_lines = fm_lines.map(lambda x: (x, 1))
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def run(self):
java_import(gateway.jvm, "org.apache.spark.streaming.*")
java_import(gateway.jvm, "org.apache.spark.streaming.api.java.*")
java_import(gateway.jvm, "org.apache.spark.streaming.api.python.*")
java_import(gateway.jvm, "org.apache.spark.streaming.dstream.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
Expand Down
25 changes: 4 additions & 21 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
from pyspark.storagelevel import StorageLevel
from pyspark.storagelevel import *
from pyspark.rdd import RDD
from pyspark.context import SparkContext

Expand Down Expand Up @@ -83,26 +83,9 @@ def awaitTermination(self, timeout=None):
else:
self._jssc.awaitTermination()

def checkpoint(self, directory):
raise NotImplementedError

def fileStream(self, directory, filter=None, newFilesOnly=None):
raise NotImplementedError

def networkStream(self, receiver):
raise NotImplementedError

def queueStream(self, queue, oneAtATime=True, defaultRDD=None):
raise NotImplementedError

def rawSocketStream(self, hostname, port, storagelevel):
raise NotImplementedError

def remember(self, duration):
raise NotImplementedError

def socketStream(hostname, port, converter,storageLevel):
raise NotImplementedError
# start from simple one. storageLevel is not passed for now.
def socketTextStream(self, hostname, port):
return DStream(self._jssc.socketTextStream(hostname, port), self, UTF8Deserializer())

def start(self):
self._jssc.start()
Expand Down

0 comments on commit 0d1b954

Please sign in to comment.