Skip to content

Commit

Permalink
Finalized Scaladoc.
Browse files Browse the repository at this point in the history
  • Loading branch information
wzorgdrager committed Apr 17, 2019
1 parent d435458 commit 3498d17
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,10 @@ package org.codefeedr.plugins.pypi.stages

import java.util.concurrent.TimeUnit

import org.apache.flink.api.common.accumulators.LongCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.datastream.{
AsyncDataStream => JavaAsyncDataStream
}
import org.apache.flink.streaming.api.functions.async.{
AsyncFunction => JavaAsyncFunction
}
import org.codefeedr.plugins.pypi.protocol.Protocol.{
PyPiRelease,
PyPiReleaseExt
Expand All @@ -19,11 +14,22 @@ import org.codefeedr.stages.TransformStage
import org.apache.flink.api.scala._
import org.codefeedr.plugins.pypi.operators.RetrieveProjectAsync

/** Transform a [[PyPiRelease]] to [[PyPiReleaseExt]].
*
* @param stageId the name of this stage.
*/
class PyPiReleaseExtStage(stageId: String = "pypi_releases")
extends TransformStage[PyPiRelease, PyPiReleaseExt](Some(stageId)) {

/** Transform a [[PyPiRelease]] to [[PyPiReleaseExt]].
*
* @param source The input source with type [[PyPiRelease]].
* @return The transformed stream with type [[PyPiReleaseExt]].
*/
override def transform(
source: DataStream[PyPiRelease]): DataStream[PyPiReleaseExt] = {

/** Retrieve project from release asynchronously. */
val async = JavaAsyncDataStream.orderedWait(source.javaStream,
new RetrieveProjectAsync,
5,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ import org.json4s.jackson.JsonMethods.parse
import org.json4s.Extraction._
import org.json4s.JsonAST._

/** Services to retrieve a project from the PyPi APi. */
object PyPiService extends Serializable {

/** Extraction formats. */
lazy implicit val formats: Formats = new DefaultFormats {
override def dateFormatter: SimpleDateFormat =
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")

} ++ JavaTimeSerializers.all

/** Retrieve the API url. */
private val url = "https://pypi.org/pypi/"

/** Retrieves a PyPi project.
Expand All @@ -28,8 +32,8 @@ object PyPiService extends Serializable {
def getProject(projectName: String): Option[PyPiProject] = {
val projectEndPoint = projectName + "/json"

/** Retrieve the project. */
val rawProject = getProjectRaw(projectEndPoint)

if (rawProject.isEmpty) return None

val json = parse(rawProject.get)
Expand All @@ -39,9 +43,11 @@ object PyPiService extends Serializable {
println(json)
}

/** Extract into an optional if it can't be parsed. */
extractOpt[PyPiProject](transformProject(json))
}

/** Transform the JSON AST to be more suitable with a case class.*/
def transformProject(json: JValue): JValue =
json transformField {
case JField("releases", JObject(x)) => {
Expand Down

0 comments on commit 3498d17

Please sign in to comment.