Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Nguyen Thanh Tung committed Nov 11, 2017
1 parent bc5f051 commit f252444
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 42 deletions.
2 changes: 1 addition & 1 deletion src/main/scala/com/septech/snowflake4s/Generator.scala
Expand Up @@ -19,6 +19,6 @@ trait Generator {

def generate(): String

def generateByBatch(batch: Int): List[String]
def bulkGenerate(batch: Int): List[String]

}
5 changes: 2 additions & 3 deletions src/main/scala/com/septech/snowflake4s/GeneratorModule.scala
Expand Up @@ -16,16 +16,15 @@
package com.septech.snowflake4s

import com.google.inject.AbstractModule
import com.septech.snowflake4s.algorithms.TwitterSnowflake
import com.septech.snowflake4s.algorithms.Snowflake
import com.septech.snowflake4s.node.MachineMACAddress
import com.septech.snowflake4s.node.NodeMachine
import net.codingwell.scalaguice.ScalaModule

class GeneratorModule extends AbstractModule with ScalaModule {

override def configure() = {
bind[NodeMachine].to[MachineMACAddress]
bind[Generator].to[TwitterSnowflake]
bind[Generator].to[Snowflake]
}

}
Expand Up @@ -13,7 +13,7 @@
*
*
*/
package com.septech.snowflake4s.node
package com.septech.snowflake4s

trait NodeMachine {

Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/septech/snowflake4s/Snowflake4s.scala
Expand Up @@ -25,6 +25,6 @@ object Snowflake4s {
def getGenerator: Generator = defaultInstance

def main(args: Array[String]): Unit = {
Snowflake4s.getGenerator.generateByBatch(10).foreach(println)
Snowflake4s.getGenerator.bulkGenerate(10).foreach(println)
}
}
Expand Up @@ -15,6 +15,7 @@
*/
package com.septech.snowflake4s.algorithms

import java.lang.management.ManagementFactory
import java.time.Clock
import java.time.LocalDateTime
import java.time.Month
Expand All @@ -26,17 +27,17 @@ import com.google.inject.Inject
import com.google.inject.Singleton
import com.septech.snowflake4s.Generator
import com.septech.snowflake4s.IdEntity
import com.septech.snowflake4s.NodeMachine
import com.septech.snowflake4s.exception.GenerateException
import com.septech.snowflake4s.exception.InvalidSystemClock
import com.septech.snowflake4s.node.NodeMachine

import scala.collection.mutable.ListBuffer
import scala.util.Failure
import scala.util.Success
import scala.util.Try

@Singleton
private[snowflake4s] class TwitterSnowflake @Inject()(nodeIdentifier: NodeMachine) extends Generator {
private[snowflake4s] class Snowflake @Inject()(nodeIdentifier: NodeMachine) extends Generator {
/**
* This is a Custom Epoch, mean for reference time: October 18, 1989, 16:53:40 UTC -
* The date of Galileo Spacecraft was launched to explored Jupiter and its moon from Kennedy Space Center, Florida, US.
Expand All @@ -46,11 +47,10 @@ private[snowflake4s] class TwitterSnowflake @Inject()(nodeIdentifier: NodeMachin
*/
final val GALILEO_LAUNCHED_DATETIME: ZonedDateTime =
LocalDateTime.of(1989, Month.OCTOBER, 18, 16, 53, 40).atZone(ZoneId.of("US/Eastern"))
final val CUSTOM_EPOCH: Long = GALILEO_LAUNCHED_DATETIME.toInstant.toEpochMilli
final val EPOCH: Long = GALILEO_LAUNCHED_DATETIME.toInstant.toEpochMilli

private final val STARTING_SEQUENCE_NUMBERS: Int = 1
private final val lock = new ReentrantLock()
private var lastTimestamp: Long = -1L
private var sequence: Long = 0L

/**
Expand All @@ -63,21 +63,21 @@ private[snowflake4s] class TwitterSnowflake @Inject()(nodeIdentifier: NodeMachin
* - 12 bits that represent an auto-incrementing sequence, modulus 4096.
* This means we can generate 4096 IDs, per Node, per millisecond
*/
final private val TOTAL_BITS: Int = 64
final private val TIMESTAMP_BITS: Int = 41
final private val WORKER_BITS: Int = 10 // up to 1024 machine
final private val SEQUENCE_BITS: Int = 12 // up to 4096 ids per machine
final private val MAX_SEQUENCE_NUMBERS: Int = Math.pow(2, SEQUENCE_BITS).toInt
final private val workerIdBits = 5L
final private val datacenterIdBits = 5L
final private val sequenceBits = 12L
final private val workerIdShift = sequenceBits
final private val machineIdShift = sequenceBits + workerIdBits
final private val timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits
final private val sequenceMask = -1L ^ (-1L << sequenceBits)
final private var lastTimestamp = -1L

final private val TIMESTAMP_BITS_SHIFT: Int = TOTAL_BITS - TIMESTAMP_BITS
final private val WORKER_ID_BITS_SHIFT: Int = TOTAL_BITS - TIMESTAMP_BITS - SEQUENCE_BITS
final private val SEQUENCE_MASK = -1L ^ (-1L << MAX_SEQUENCE_NUMBERS)
final private val MACHINE_ID: Long = nodeIdentifier.getId().toLong
final private val WORKER_ID: Long = getProcessId

final private val WORKER_ID: Long = (nodeIdentifier.getId().toLong % MAX_SEQUENCE_NUMBERS) << WORKER_ID_BITS_SHIFT
override def generate(): String = bulkGenerate(1).headOption.fold[String](throw new GenerateException)(id => id)

override def generate(): String = generateByBatch(1).headOption.fold[String](throw new GenerateException)(_ => _)

override def generateByBatch(batch: Int): List[String] = {
override def bulkGenerate(batch: Int): List[String] = {
lock.lock()

val ids = generateIds(batch).map(_.getString)
Expand All @@ -88,14 +88,9 @@ private[snowflake4s] class TwitterSnowflake @Inject()(nodeIdentifier: NodeMachin

private def generateIds(batch: Int): List[IdEntity] = synchronized {
Try {
require(batch <= MAX_SEQUENCE_NUMBERS)

val currentTimestamp: Long = Clock.systemUTC().millis()
val shiftedTimestamp: Long = (currentTimestamp - CUSTOM_EPOCH) << TIMESTAMP_BITS_SHIFT

val ids = new ListBuffer[IdEntity]()

(STARTING_SEQUENCE_NUMBERS to batch).map(_ => ids += nextId(currentTimestamp, shiftedTimestamp, WORKER_ID))
(STARTING_SEQUENCE_NUMBERS to batch).map(_ => ids += nextId())

ids.toList
} match {
Expand All @@ -104,22 +99,33 @@ private[snowflake4s] class TwitterSnowflake @Inject()(nodeIdentifier: NodeMachin
}
}

private def nextId(currentTimestamp: Long, shiftedTimestamp: Long, workerId: Long): IdEntity = {
var timestamp = Clock.systemUTC().millis()
private def nextId(): IdEntity = {
var currentTimestamp: Long = Clock.systemUTC().millis()

if (timestamp < lastTimestamp) {
val timeDiff = lastTimestamp - timestamp
if (currentTimestamp < lastTimestamp) {
val timeDiff = lastTimestamp - currentTimestamp
throw new InvalidSystemClock("Clock moved backwards. Refusing to generate id for %d milliseconds".format(timeDiff))
}

if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & SEQUENCE_MASK
if (sequence == 0) timestamp = nextMillis(lastTimestamp)
} else sequence = 0
if (lastTimestamp == currentTimestamp) {
sequence = (sequence + 1) & sequenceMask

lastTimestamp = timestamp
if (sequence == 0) {
currentTimestamp = nextMillis(lastTimestamp)
}
} else{
sequence = 0
}

IdEntity(shiftedTimestamp | workerId | sequence)
lastTimestamp = currentTimestamp

val id =
((currentTimestamp - EPOCH) << timestampLeftShift) |
(MACHINE_ID << machineIdShift) |
(WORKER_ID << workerIdShift) |
sequence

IdEntity(Math.abs(id))
}

private def nextMillis(lastTimestamp: Long): Long = {
Expand All @@ -130,4 +136,8 @@ private[snowflake4s] class TwitterSnowflake @Inject()(nodeIdentifier: NodeMachin
timestamp
}

protected[snowflake4s] def getProcessId: Long = {
ManagementFactory.getRuntimeMXBean.getName.split("@").headOption.map(_.toLong).fold(throw new Exception)(pid => pid)
}

}
Expand Up @@ -17,6 +17,7 @@ package com.septech.snowflake4s.node

import java.net._

import com.septech.snowflake4s.NodeMachine
import com.septech.snowflake4s.exception.MacAddressException

import scala.util.Failure
Expand All @@ -30,10 +31,9 @@ private[snowflake4s] class MachineMACAddress extends NodeMachine {

localNetworkInterface
.getHardwareAddress.toList
.map(b => String.format("%02x", b.asInstanceOf[Object]))
.map(Integer.parseInt(_, 16)).foldLeft(0L) {
case (acc, item) => acc * 256 + item
}.toString
.map(byte => Integer.parseInt(String.format("%02x", byte.asInstanceOf[Object]), 16))
.foldLeft(0L) { case (acc, item) => acc * 256 + item }
.toString
} match {
case Failure(_) => throw new MacAddressException()
case Success(address) => address
Expand Down

0 comments on commit f252444

Please sign in to comment.