Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Fix typos in code comments #662

Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,9 @@
*
* <p>
* One of such cases is stream processing, where processor fetches from two topics and performs the join on these two streams.
* When one of the topic is long lagging behind the other, the processor would like to pause fetching from the ahead topic
* When one of the topics is long lagging behind the other, the processor would like to pause fetching from the ahead topic
* in order to get the lagging stream to catch up. Another example is bootstraping upon consumer starting up where there are
* a lot of history data to catch up, the applciations usually wants to get the latest data on some of the topics before consider
* a lot of history data to catch up, the applications usually want to get the latest data on some of the topics before consider
* fetching other topics.
*
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private void poll(long timeout, long now, boolean executeDelayedTasks) {
clientPoll(timeout, now);
now = time.milliseconds();

// handle any disconnects by failing the active requests. note that disconects must
// handle any disconnects by failing the active requests. note that disconnects must
// be checked immediately following poll since any subsequent call to client.ready()
// will reset the disconnect status
checkDisconnects(now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.kafka.common.cache;

/**
* Interface for caches, semi-peristent maps which store key-value mappings until either an eviction criteria is met
* Interface for caches, semi-persistent maps which store key-value mappings until either an eviction criteria is met
* or the entries are manually invalidated. Caches are not required to be thread-safe, but some implementations may be.
*/
public interface Cache<K, V> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package org.apache.kafka.common.errors;

/**
* The broker returns this error code for any coordiantor request if it is still loading the metadata (after a leader change
* The broker returns this error code for any coordinator request if it is still loading the metadata (after a leader change
* for that offsets topic partition) for this group.
*/
public class GroupLoadInProgressException extends RetriableException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.kafka.common.metrics;

/**
* A Stat is a quanity such as average, max, etc that is computed off the stream of updates to a sensor
* A Stat is a quantity such as average, max, etc that is computed off the stream of updates to a sensor
*/
public interface Stat {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

/**
* A SampledStat records a single scalar value measured over one or more samples. Each sample is recorded over a
* configurable window. The window can be defined by number of events or ellapsed time (or both, if both are given the
* window is complete when <i>either</i> the event count or ellapsed time criterion is met).
* configurable window. The window can be defined by number of events or elapsed time (or both, if both are given the
* window is complete when <i>either</i> the event count or elapsed time criterion is met).
* <p>
* All the samples are combined to produce the measurement. When a window is complete the oldest sample is cleared and
* recycled to begin recording the next sample.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public void send(Send send) {
*
* In the "Plaintext" setting, we are using socketChannel to read & write to the network. But for the "SSL" setting,
* we encrypt the data before we use socketChannel to write data to the network, and decrypt before we return the responses.
* This requires additional buffers to be maintained as we are reading from network, since the data on the wire is encrpyted
* This requires additional buffers to be maintained as we are reading from network, since the data on the wire is encrypted
* we won't be able to read exact no.of bytes as kafka protocol requires. We read as many bytes as we can, up to SSLEngine's
* application buffer size. This means we might be reading additional bytes than the requested size.
* If there is no further data to read from socketChannel selector won't invoke that channel and we've have additional bytes
Expand Down Expand Up @@ -510,7 +510,7 @@ private boolean hasStagedReceives() {


/**
* adds a receive to staged receieves
* adds a receive to staged receives
*/
private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
if (!stagedReceives.containsKey(channel))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

/*
* Transport layer for underlying communication.
* At very basic level it is wrapper around SocketChannel and can be used as substitue for SocketChannel
* At very basic level it is wrapper around SocketChannel and can be used as substitute for SocketChannel
* and other network Channel implementations.
* As NetworkClient replaces BlockingChannel and other implementations we will be using KafkaChannel as
* a network I/O channel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public static int readUnsignedIntLE(InputStream in) throws IOException {

/**
* Get the little-endian value of an integer as a byte array.
* @param val The value to convert to a litte-endian array
* @param val The value to convert to a little-endian array
* @return The little-endian encoded array of bytes for the value
*/
public static byte[] toArrayLE(int val) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void testClientAuthenticationRequiredUntrustedProvided() throws Exception
}

/**
* Tests that server does not accept connections from clients which dont
* Tests that server does not accept connections from clients which don't
* provide a certificate when client authentication is required.
*/
@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class TestSslUtils {
* @param days how many days from now the Certificate is valid for
* @param algorithm the signing algorithm, eg "SHA1withRSA"
* @return the self-signed certificate
* @throws CertificateException thrown if a security error or an IO error ocurred.
* @throws CertificateException thrown if a security error or an IO error occurred.
*/
public static X509Certificate generateCertificate(String dn, KeyPair pair,
int days, String algorithm)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void close() {
}

/**
* Preforms initial join process for consumer group, ensures we have an assignment, and initializes + starts the
* Performs initial join process for consumer group, ensures we have an assignment, and initializes + starts the
* SinkTask.
*
* @returns true if successful, false if joining the consumer group was interrupted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public Map<String, String> taskConfig(ConnectorTaskId task) {
}

/**
* Get the number of tasks assigned for the given conncetor.
* Get the number of tasks assigned for the given connector.
* @param connectorName name of the connector to look up tasks for
* @return the number of tasks
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ private boolean handleRebalanceCompleted() {
// even attempting to. If we can't we should drop out of the group because we will block everyone from making
// progress. We can backoff and try rejoining later.
// 1b. We are not the leader. We might need to catch up. If we're already caught up we can rejoin immediately,
// otherwise, we just want to wait indefinitely to catch up and rejoin whenver we're finally ready.
// otherwise, we just want to wait indefinitely to catch up and rejoin whenever we're finally ready.
// 2. Assignment succeeded.
// 2a. We are caught up on configs. Awesome! We can proceed to run our assigned work.
// 2b. We need to try to catch up. We can do this potentially indefinitely because if it takes to long, we'll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public void putConnectorConfig(String connector, Map<String, String> properties)
*/
public void putTaskConfigs(Map<ConnectorTaskId, Map<String, String>> configs) {
// Make sure we're at the end of the log. We should be the only writer, but we want to make sure we don't have
// any outstanding lagging data to consume.
// any outstanding logging data to consume.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lagging is actually the correct word here. We are consuming messages that are outstanding because the consumer is lagging behind the latest data in the broker.

try {
configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void testPollRedelivery() throws Exception {
consumer.pause(TOPIC_PARTITION2);
PowerMock.expectLastCall();

// Retry delivery should suceed
// Retry delivery should succeed
expectConsumerPoll(0);
sinkTask.put(EasyMock.capture(records));
EasyMock.expectLastCall();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package kafka.common

/**
* Indicates a createMessageStreams can't be called more thane once
* Indicates a createMessageStreams can't be called more than once
*/
class MessageStreamsExistException(message: String, t: Throwable) extends RuntimeException(message, t) {
}
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/OffsetMap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ trait OffsetMap {
class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extends OffsetMap {
private val bytes = ByteBuffer.allocate(memory)

/* the hash algorithm instance to use, defualt is MD5 */
/* the hash algorithm instance to use, default is MD5 */
private val digest = MessageDigest.getInstance(hashAlgorithm)

/* the number of bytes for this hash algorithm */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package kafka.message

/**
* Indicates the presense of a message that exceeds the maximum acceptable
* Indicates the presence of a message that exceeds the maximum acceptable
* length (whatever that happens to be)
*/
class MessageLengthException(message: String) extends RuntimeException(message)
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/ZooKeeperMainWrapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ZooKeeperMainWrapper(args: Array[String]) extends ZooKeeperMain(args) {

/**
* ZooKeeper 3.4.6 broke being able to pass commands on command line.
* See ZOOKEEPER-1897. This class is a hack to restore this faclity.
* See ZOOKEEPER-1897. This class is a hack to restore this facility.
*/
object ZooKeeperMainWrapper {

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/utils/CoreUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object CoreUtils extends Logging {
/**
* Create a daemon thread
* @param name The name of the thread
* @param fun The runction to execute in the thread
* @param fun The function to execute in the thread
* @return The unstarted thread
*/
def daemonThread(name: String, fun: => Unit): Thread =
Expand Down Expand Up @@ -162,7 +162,7 @@ object CoreUtils extends Logging {
def crc32(bytes: Array[Byte]): Long = crc32(bytes, 0, bytes.length)

/**
* Compute the CRC32 of the segment of the byte array given by the specificed size and offset
* Compute the CRC32 of the segment of the byte array given by the specified size and offset
* @param bytes The bytes to checksum
* @param offset the offset at which to begin checksumming
* @param size the number of bytes to checksum
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/utils/ZkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ class ZkUtils(val zkClient: ZkClient,

/**
* Update the value of a persistent node with the given path and data.
* create parrent directory if necessary. Never throw NodeExistException.
* create parent directory if necessary. Never throw NodeExistException.
* Return the updated path zkVersion
*/
def updatePersistentPath(path: String, data: String, acls: java.util.List[ACL] = DefaultAcls) = {
Expand Down Expand Up @@ -476,7 +476,7 @@ class ZkUtils(val zkClient: ZkClient,

/**
* Update the value of a persistent node with the given path and data.
* create parrent directory if necessary. Never throw NodeExistException.
* create parent directory if necessary. Never throw NodeExistException.
*/
def updateEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = DefaultAcls): Unit = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,12 +615,12 @@ class PlaintextConsumerTest extends BaseConsumerTest {
* pollers for these consumers. Wait for partition re-assignment and validate.
*
* Currently, assignment validation requires that total number of partitions is greater or equal to
* number of consumers, so subscriptions.size must be greate or equal the resulting number of consumers in the group
* number of consumers, so subscriptions.size must be greater or equal the resulting number of consumers in the group
*
* @param numOfConsumersToAdd number of consumers to create and add to the consumer group
* @param consumerGroup current consumer group
* @param consumerPollers current consumer pollers
* @param topicsToSubscribe topics to which new consumers will subsribe to
* @param topicsToSubscribe topics to which new consumers will subscribe to
* @param subscriptions set of all topic partitions
*/
def addConsumersToGroupAndWaitForGroupAssignment(numOfConsumersToAdd: Int,
Expand Down Expand Up @@ -664,7 +664,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
for (poller <- consumerPollers)
poller.subscribe(topicsToSubscribe)

// since subscribe call to poller does not actually call consumer subsribe right away, wait
// since subscribe call to poller does not actually call consumer subscribe right away, wait
// until subscribe is called on all consumers
TestUtils.waitUntilTrue(() => {
consumerPollers forall (poller => poller.isSubscribeRequestProcessed())
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ class LogSegmentTest {
val oldFileSize = seg.log.file.length
assertEquals(512*1024*1024, oldFileSize)
seg.close()
//After close, file should be trimed
//After close, file should be trimmed
assertEquals(oldSize, seg.log.file.length)

val segReopen = new LogSegment(tempDir, 40, 10, 1000, 0, SystemTime, true, 512*1024*1024, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
}

def testWriteToWithMessageSet(set: MessageSet) {
// do the write twice to ensure the message set is restored to its orginal state
// do the write twice to ensure the message set is restored to its original state
for(i <- List(0,1)) {
val file = tempFile()
val channel = new RandomAccessFile(file, "rw").getChannel()
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -401,12 +401,12 @@ object TestUtils extends Logging {
}

/**
* Create a hexidecimal string for the given bytes
* Create a hexadecimal string for the given bytes
*/
def hexString(bytes: Array[Byte]): String = hexString(ByteBuffer.wrap(bytes))

/**
* Create a hexidecimal string for the given bytes
* Create a hexadecimal string for the given bytes
*/
def hexString(buffer: ByteBuffer): String = {
val builder = new StringBuilder("0x")
Expand Down Expand Up @@ -712,7 +712,7 @@ object TestUtils extends Logging {

/**
* Execute the given block. If it throws an assert error, retry. Repeat
* until no error is thrown or the time limit ellapses
* until no error is thrown or the time limit elapses
*/
def retry(maxWaitMs: Long)(block: => Unit) {
var wait = 1L
Expand Down