Skip to content

Commit

Permalink
Merge 81b3165 into 3a6d888
Browse files Browse the repository at this point in the history
  • Loading branch information
wzorgdrager committed Apr 17, 2019
2 parents 3a6d888 + 81b3165 commit f88703c
Show file tree
Hide file tree
Showing 7 changed files with 486 additions and 4 deletions.
18 changes: 14 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ lazy val pluginGHTorrent = (project in file("codefeedr-plugins/codefeedr-ghtorre
)
).dependsOn(core)

lazy val pluginPypi = (project in file("codefeedr-plugins/codefeedr-pypi"))
.settings(
name := pluginPrefix + "pypi",
settings,
assemblySettings,
libraryDependencies ++= commonDependencies ++ Seq(
)
).dependsOn(core)

lazy val dependencies =
new {
val flinkVersion = "1.7.0"
Expand All @@ -164,10 +173,10 @@ lazy val dependencies =
val loggingCore = "org.apache.logging.log4j" % "log4j-core" % log4jVersion % Runtime
val loggingScala = "org.apache.logging.log4j" %% "log4j-api-scala" % log4jScalaVersion

val flink = "org.apache.flink" %% "flink-scala" % flinkVersion % Provided
val flinkStreaming = "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided
val flink = "org.apache.flink" %% "flink-scala" % flinkVersion // % Provided
val flinkStreaming = "org.apache.flink" %% "flink-streaming-scala" % flinkVersion // % Provided
val flinkKafka = "org.apache.flink" %% "flink-connector-kafka" % flinkVersion
val flinkRuntimeWeb = "org.apache.flink" %% "flink-runtime-web" % flinkVersion % Provided
val flinkRuntimeWeb = "org.apache.flink" %% "flink-runtime-web" % flinkVersion //% Provided
val flinkElasticSearch = "org.apache.flink" %% "flink-connector-elasticsearch6" % flinkVersion
val flinkRabbitMQ = "org.apache.flink" %% "flink-connector-rabbitmq" % flinkVersion

Expand Down Expand Up @@ -222,7 +231,8 @@ lazy val commonSettings = Seq(
"confluent" at "http://packages.confluent.io/maven/",
"Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/",
"Artima Maven Repository" at "http://repo.artima.com/releases",
Resolver.mavenLocal
Resolver.mavenLocal,
Resolver.jcenterRepo
),
publishMavenStyle in ThisBuild := true,
publishTo in ThisBuild := Some(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.codefeedr.plugins.pypi

import org.codefeedr.pipeline.PipelineBuilder
import org.codefeedr.plugins.pypi.protocol.Protocol.PyPiReleaseExt
import org.codefeedr.plugins.pypi.stages.{
PyPiReleaseExtStage,
PyPiReleasesStage
}
import org.codefeedr.stages.utilities.PrinterOutput

object Main {
def main(args: Array[String]): Unit = {
new PipelineBuilder()
.append(new PyPiReleasesStage)
.append(new PyPiReleaseExtStage)
.build()
.startMock()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package org.codefeedr.plugins.pypi.operators

import java.text.SimpleDateFormat

import org.apache.flink.api.common.accumulators.LongCounter
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.{
FunctionInitializationContext,
FunctionSnapshotContext
}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.source.{
RichSourceFunction,
SourceFunction
}
import org.codefeedr.plugins.pypi.protocol.Protocol.PyPiRelease
import org.codefeedr.stages.utilities.{HttpRequester, RequestException}
import scalaj.http.Http

import scala.collection.JavaConverters._
import scala.xml.XML

class PyPiReleasesSource(pollingInterval: Int = 1000, maxNumberOfRuns: Int = -1)
extends RichSourceFunction[PyPiRelease]
with CheckpointedFunction {

/** Format and URL of RSS Feed. */
val dateFormat = "EEE, dd MMM yyyy HH:mm:ss ZZ"
val url = "https://pypi.org/rss/updates.xml"

/** Some track variables of this source. */
private var isRunning = false
private var runsLeft = 0
private var lastItem: Option[PyPiRelease] = None
@transient
private var checkpointedState: ListState[PyPiRelease] = _

def getIsRunning: Boolean = isRunning

/** Accumulator for the amount of processed releases. */
val releasesProcessed = new LongCounter()

/** Opens this source. */
override def open(parameters: Configuration): Unit = {
isRunning = true
runsLeft = maxNumberOfRuns
}

/** Close the source. */
override def cancel(): Unit = {
isRunning = false

}

/** Runs the source.
*
* @param ctx the source the context.
*/
override def run(ctx: SourceFunction.SourceContext[PyPiRelease]): Unit = {
val lock = ctx.getCheckpointLock

/** While is running or #runs left. */
while (isRunning && runsLeft != 0) {
lock.synchronized { // Synchronize to the checkpoint lock.
try {
// Polls the RSS feed
val rssAsString = getRSSAsString
// Parses the received rss items
val items: Seq[PyPiRelease] = parseRSSString(rssAsString)

// Decrease the amount of runs left.
decreaseRunsLeft()

// Collect right items and update last item
val validSortedItems = sortAndDropDuplicates(items)
validSortedItems.foreach(x =>
ctx.collectWithTimestamp(x, x.pubDate.getTime))
releasesProcessed.add(validSortedItems.size)
if (validSortedItems.nonEmpty) {
lastItem = Some(validSortedItems.last)
}

// Wait until the next poll
waitPollingInterval()
} catch {
case _: Throwable =>
}
}
}
}

/**
* Drops items that already have been collected and sorts them based on times
* @param items Potential items to be collected
* @return Valid sorted items
*/
def sortAndDropDuplicates(items: Seq[PyPiRelease]): Seq[PyPiRelease] = {
items
.filter((x: PyPiRelease) => {
if (lastItem.isDefined)
lastItem.get.pubDate.before(x.pubDate) && lastItem.get.link != x.link
else
true
})
.sortWith((x: PyPiRelease, y: PyPiRelease) => x.pubDate.before(y.pubDate))
}

/**
* Requests the RSS feed and returns its body as a string.
* Will keep trying with increasing intervals if it doesn't succeed
* @return Body of requested RSS feed
*/
@throws[RequestException]
def getRSSAsString: String = {
new HttpRequester().retrieveResponse(Http(url)).body
}

/**
* Parses a string that contains xml with RSS items
* @param rssString XML string with RSS items
* @return Sequence of RSS items
*/
def parseRSSString(rssString: String): Seq[PyPiRelease] = {
try {
val xml = XML.loadString(rssString)
val nodes = xml \\ "item"
for (t <- nodes) yield xmlToPyPiRelease(t)
} catch {
// If the string cannot be parsed return an empty list
case _: Throwable => Nil
}
}

/**
* Parses a xml node to a RSS item
* @param node XML node
* @return RSS item
*/
def xmlToPyPiRelease(node: scala.xml.Node): PyPiRelease = {
val title = (node \ "title").text
val description = (node \ "description").text
val link = (node \ "link").text

val formatter = new SimpleDateFormat(dateFormat)
val pubDate = formatter.parse((node \ "pubDate").text)

PyPiRelease(title, description, link, pubDate)
}

/**
* If there is a limit to the amount of runs decrease by 1
*/
def decreaseRunsLeft(): Unit = {
if (runsLeft > 0) {
runsLeft -= 1
}
}

/**
* Wait a certain amount of times the polling interval
* @param times Times the polling interval should be waited
*/
def waitPollingInterval(times: Int = 1): Unit = {
Thread.sleep(times * pollingInterval)
}

/** Make a snapshot of the current state. */
override def snapshotState(context: FunctionSnapshotContext): Unit = {
if (lastItem.isDefined) {
checkpointedState.clear()
checkpointedState.add(lastItem.get)
}
}

/** Initializes state by reading from a checkpoint or creating an empty one. */
override def initializeState(context: FunctionInitializationContext): Unit = {
val descriptor =
new ListStateDescriptor[PyPiRelease]("last_element", classOf[PyPiRelease])

checkpointedState = context.getOperatorStateStore.getListState(descriptor)

if (context.isRestored) {
checkpointedState.get().asScala.foreach { x =>
lastItem = Some(x)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.codefeedr.plugins.pypi.protocol

import java.util.Date

object Protocol {

case class PyPiRelease(title: String,
link: String,
description: String,
pubDate: Date)

case class PyPiReleaseExt(title: String,
link: String,
description: String,
pubDate: Date,
project: PyPiProject)

case class PyPiProject(info: Info,
last_serial: Long,
releases: List[ReleaseVersion],
urls: List[Release])

case class Info(author: String,
author_email: String,
bugtrack_url: Option[String],
classifiers: List[String],
description: String,
description_content_type: String,
docs_url: Option[String],
download_url: String,
downloads: Downloads,
home_page: String,
keywords: String,
license: String,
maintainer: String,
maintainer_email: String,
name: String,
package_url: String,
platform: String,
project_url: String,
project_urls: Option[ProjectUrl],
release_url: String,
requires_dist: List[String],
requires_python: Option[String],
summary: String,
version: String)

case class Downloads(last_day: Int, last_month: Int, last_week: Int)
case class ProjectUrl(Homepage: String)

case class ReleaseVersion(version: String, releases: List[Release])

case class Release(comment_text: String,
digests: Digest,
downloads: Double,
filename: String,
has_sig: Boolean,
md5_digest: String,
packagetype: String,
python_version: String,
requires_python: Option[String],
size: Double,
upload_time: Date,
url: String)

case class Digest(md5: String, sha256: String)
}

0 comments on commit f88703c

Please sign in to comment.