Skip to content

Commit

Permalink
remove unused import in python
Browse files Browse the repository at this point in the history
  • Loading branch information
Ken Takagiwa authored and giwa committed Aug 18, 2014
1 parent 856d98e commit 4ce4058
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 38 deletions.
9 changes: 0 additions & 9 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,6 @@
# limitations under the License.
#

import os
import shutil
import sys
from threading import Lock
from tempfile import NamedTemporaryFile

from pyspark import accumulators
from pyspark.accumulators import Accumulator
from pyspark.broadcast import Broadcast
from pyspark.conf import SparkConf
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
Expand Down
30 changes: 4 additions & 26 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,8 @@
from base64 import standard_b64encode as b64enc
import copy
from collections import defaultdict
from collections import namedtuple
from itertools import chain, ifilter, imap
import operator
import os
import sys
import shlex
import traceback
from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
from threading import Thread
import warnings
import heapq
from random import Random

from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long
from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
from pyspark.rddsampler import RDDSampler
from pyspark.storagelevel import StorageLevel
#from pyspark.resultiterable import ResultIterable

from pyspark.serializers import NoOpSerializer,\
BatchedSerializer, CloudPickleSerializer, pack_long
from pyspark.rdd import _JavaStackTrace

from py4j.java_collections import ListConverter, MapConverter
Expand All @@ -47,15 +27,14 @@ def generatedRDDs(self):
def print_(self):
"""
"""
# print is a resrved name of Python. We cannot give print to function name
# print is a reserved name of Python. We cannot give print to function name
getattr(self._jdstream, "print")()

def pyprint(self):
"""
"""
self._jdstream.pyprint()


def filter(self, f):
"""
"""
Expand Down Expand Up @@ -140,7 +119,6 @@ def add_shuffle_key(split, iterator):
keyed._bypass_serializer = True
with _JavaStackTrace(self.ctx) as st:
#JavaDStream
#pairRDD = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream()).asJavaPairRDD()
pairDStream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream()).asJavaPairDStream()
partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
id(partitionFunc))
Expand Down
17 changes: 16 additions & 1 deletion python/pyspark/streaming/duration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
__author__ = 'ktakagiw'
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from pyspark.streaming import utils

Expand Down
24 changes: 23 additions & 1 deletion python/pyspark/streaming/jtime.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,30 @@
__author__ = 'ktakagiw'
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from pyspark.streaming import utils
from pyspark.streaming.duration import Duration

"""
The name of this file, time is not good naming for python
because if we do import time when we want to use native python time package, it does
not import python time package.
"""


class Time(object):
"""
Time for Spark Streaming application. Used to set Time
Expand Down
19 changes: 19 additions & 0 deletions python/pyspark/streaming/pyprint.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import sys
from itertools import chain

from pyspark.serializers import PickleSerializer

def collect(binary_file_path):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ class PythonDStream[T: ClassTag](
case None => None
}
}

val asJavaDStream = JavaDStream.fromDStream(this)
}

0 comments on commit 4ce4058

Please sign in to comment.