-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'develop' into improvement/kafka
- Loading branch information
Showing
19 changed files
with
833 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ language: scala | |
scala: | ||
- 2.12.8 | ||
sudo: required | ||
dist: trusty | ||
dist: xenial | ||
jdk: openjdk11 | ||
|
||
before_cache: | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
192 changes: 192 additions & 0 deletions
192
...defeedr-pypi/src/main/scala/org/codefeedr/plugins/pypi/operators/PyPiReleasesSource.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,192 @@ | ||
package org.codefeedr.plugins.pypi.operators | ||
|
||
import java.text.SimpleDateFormat | ||
|
||
import org.apache.flink.api.common.accumulators.LongCounter | ||
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} | ||
import org.apache.flink.configuration.Configuration | ||
import org.apache.flink.runtime.state.{ | ||
FunctionInitializationContext, | ||
FunctionSnapshotContext | ||
} | ||
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction | ||
import org.apache.flink.streaming.api.functions.source.{ | ||
RichSourceFunction, | ||
SourceFunction | ||
} | ||
import org.codefeedr.plugins.pypi.protocol.Protocol.PyPiRelease | ||
import org.codefeedr.stages.utilities.{HttpRequester, RequestException} | ||
import scalaj.http.Http | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.xml.XML | ||
|
||
case class PyPiSourceConfig(pollingInterval: Int = 1000, | ||
maxNumberOfRuns: Int = -1) | ||
|
||
class PyPiReleasesSource(config: PyPiSourceConfig = PyPiSourceConfig()) | ||
extends RichSourceFunction[PyPiRelease] | ||
with CheckpointedFunction { | ||
|
||
/** Format and URL of RSS Feed. */ | ||
val dateFormat = "EEE, dd MMM yyyy HH:mm:ss ZZ" | ||
val url = "https://pypi.org/rss/updates.xml" | ||
|
||
/** Some track variables of this source. */ | ||
private var isRunning = false | ||
private var runsLeft = 0 | ||
private var lastItem: Option[PyPiRelease] = None | ||
@transient | ||
private var checkpointedState: ListState[PyPiRelease] = _ | ||
|
||
def getIsRunning: Boolean = isRunning | ||
|
||
/** Accumulator for the amount of processed releases. */ | ||
val releasesProcessed = new LongCounter() | ||
|
||
/** Opens this source. */ | ||
override def open(parameters: Configuration): Unit = { | ||
isRunning = true | ||
runsLeft = config.maxNumberOfRuns | ||
} | ||
|
||
/** Close the source. */ | ||
override def cancel(): Unit = { | ||
isRunning = false | ||
|
||
} | ||
|
||
/** Runs the source. | ||
* | ||
* @param ctx the source the context. | ||
*/ | ||
override def run(ctx: SourceFunction.SourceContext[PyPiRelease]): Unit = { | ||
val lock = ctx.getCheckpointLock | ||
|
||
/** While is running or #runs left. */ | ||
while (isRunning && runsLeft != 0) { | ||
lock.synchronized { // Synchronize to the checkpoint lock. | ||
try { | ||
// Polls the RSS feed | ||
val rssAsString = getRSSAsString | ||
// Parses the received rss items | ||
val items: Seq[PyPiRelease] = parseRSSString(rssAsString) | ||
|
||
// Decrease the amount of runs left. | ||
decreaseRunsLeft() | ||
|
||
// Collect right items and update last item | ||
val validSortedItems = sortAndDropDuplicates(items) | ||
validSortedItems.foreach(x => | ||
ctx.collectWithTimestamp(x, x.pubDate.getTime)) | ||
releasesProcessed.add(validSortedItems.size) | ||
if (validSortedItems.nonEmpty) { | ||
lastItem = Some(validSortedItems.last) | ||
} | ||
|
||
// Wait until the next poll | ||
waitPollingInterval() | ||
} catch { | ||
case _: Throwable => | ||
} | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Drops items that already have been collected and sorts them based on times | ||
* @param items Potential items to be collected | ||
* @return Valid sorted items | ||
*/ | ||
def sortAndDropDuplicates(items: Seq[PyPiRelease]): Seq[PyPiRelease] = { | ||
items | ||
.filter((x: PyPiRelease) => { | ||
if (lastItem.isDefined) | ||
lastItem.get.pubDate.before(x.pubDate) && lastItem.get.link != x.link | ||
else | ||
true | ||
}) | ||
.sortWith((x: PyPiRelease, y: PyPiRelease) => x.pubDate.before(y.pubDate)) | ||
} | ||
|
||
/** | ||
* Requests the RSS feed and returns its body as a string. | ||
* Will keep trying with increasing intervals if it doesn't succeed | ||
* @return Body of requested RSS feed | ||
*/ | ||
@throws[RequestException] | ||
def getRSSAsString: String = { | ||
new HttpRequester().retrieveResponse(Http(url)).body | ||
} | ||
|
||
/** | ||
* Parses a string that contains xml with RSS items | ||
* @param rssString XML string with RSS items | ||
* @return Sequence of RSS items | ||
*/ | ||
def parseRSSString(rssString: String): Seq[PyPiRelease] = { | ||
try { | ||
val xml = XML.loadString(rssString) | ||
val nodes = xml \\ "item" | ||
for (t <- nodes) yield xmlToPyPiRelease(t) | ||
} catch { | ||
// If the string cannot be parsed return an empty list | ||
case _: Throwable => Nil | ||
} | ||
} | ||
|
||
/** | ||
* Parses a xml node to a RSS item | ||
* @param node XML node | ||
* @return RSS item | ||
*/ | ||
def xmlToPyPiRelease(node: scala.xml.Node): PyPiRelease = { | ||
val title = (node \ "title").text | ||
val description = (node \ "description").text | ||
val link = (node \ "link").text | ||
|
||
val formatter = new SimpleDateFormat(dateFormat) | ||
val pubDate = formatter.parse((node \ "pubDate").text) | ||
|
||
PyPiRelease(title, description, link, pubDate) | ||
} | ||
|
||
/** | ||
* If there is a limit to the amount of runs decrease by 1 | ||
*/ | ||
def decreaseRunsLeft(): Unit = { | ||
if (runsLeft > 0) { | ||
runsLeft -= 1 | ||
} | ||
} | ||
|
||
/** | ||
* Wait a certain amount of times the polling interval | ||
* @param times Times the polling interval should be waited | ||
*/ | ||
def waitPollingInterval(times: Int = 1): Unit = { | ||
Thread.sleep(times * config.pollingInterval) | ||
} | ||
|
||
/** Make a snapshot of the current state. */ | ||
override def snapshotState(context: FunctionSnapshotContext): Unit = { | ||
if (lastItem.isDefined) { | ||
checkpointedState.clear() | ||
checkpointedState.add(lastItem.get) | ||
} | ||
} | ||
|
||
/** Initializes state by reading from a checkpoint or creating an empty one. */ | ||
override def initializeState(context: FunctionInitializationContext): Unit = { | ||
val descriptor = | ||
new ListStateDescriptor[PyPiRelease]("last_element", classOf[PyPiRelease]) | ||
|
||
checkpointedState = context.getOperatorStateStore.getListState(descriptor) | ||
|
||
if (context.isRestored) { | ||
checkpointedState.get().asScala.foreach { x => | ||
lastItem = Some(x) | ||
} | ||
} | ||
} | ||
} |
66 changes: 66 additions & 0 deletions
66
...feedr-pypi/src/main/scala/org/codefeedr/plugins/pypi/operators/RetrieveProjectAsync.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
package org.codefeedr.plugins.pypi.operators | ||
|
||
import org.apache.flink.streaming.api.functions.async.{ | ||
ResultFuture, | ||
RichAsyncFunction | ||
} | ||
import org.codefeedr.plugins.pypi.protocol.Protocol.{ | ||
PyPiProject, | ||
PyPiRelease, | ||
PyPiReleaseExt | ||
} | ||
import org.codefeedr.plugins.pypi.util.PyPiService | ||
|
||
import scala.concurrent.{ExecutionContext, Future} | ||
import scala.util.{Failure, Success} | ||
import collection.JavaConverters._ | ||
|
||
/** Retrieves a project related to a release asynchronously. */ | ||
class RetrieveProjectAsync | ||
extends RichAsyncFunction[PyPiRelease, PyPiReleaseExt] { | ||
|
||
/** Retrieve the execution context lazily. */ | ||
implicit lazy val executor: ExecutionContext = ExecutionContext.global | ||
|
||
/** Async retrieves the project belonging to the release. | ||
* | ||
* @param input the release. | ||
* @param resultFuture the future to add the project to. | ||
*/ | ||
override def asyncInvoke(input: PyPiRelease, | ||
resultFuture: ResultFuture[PyPiReleaseExt]): Unit = { | ||
|
||
/** The project name combined with its release version */ | ||
val projectName = input.title.replace(" ", "/") | ||
|
||
/** Retrieve the project in a Future. */ | ||
val requestProject: Future[Option[PyPiProject]] = Future( | ||
PyPiService.getProject(projectName)) | ||
|
||
/** Collects the result. */ | ||
requestProject.onComplete { | ||
case Success(result: Option[PyPiProject]) => { | ||
if (result.isDefined) { //If we get None, we return nothing. | ||
resultFuture.complete( | ||
List( | ||
PyPiReleaseExt(input.title, | ||
input.link, | ||
input.description, | ||
input.pubDate, | ||
result.get)).asJava) | ||
} else { | ||
resultFuture.complete(List().asJava) | ||
} | ||
} | ||
case Failure(e) => | ||
resultFuture.complete(List().asJava) | ||
e.printStackTrace() | ||
} | ||
|
||
} | ||
|
||
/** If we retrieve a time-out, then we just complete the future with an empty list.*/ | ||
override def timeout(input: PyPiRelease, | ||
resultFuture: ResultFuture[PyPiReleaseExt]): Unit = | ||
resultFuture.complete(List().asJava) | ||
} |
68 changes: 68 additions & 0 deletions
68
...-plugins/codefeedr-pypi/src/main/scala/org/codefeedr/plugins/pypi/protocol/Protocol.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
package org.codefeedr.plugins.pypi.protocol | ||
|
||
import java.util.Date | ||
|
||
object Protocol { | ||
|
||
case class PyPiRelease(title: String, | ||
link: String, | ||
description: String, | ||
pubDate: Date) | ||
|
||
case class PyPiReleaseExt(title: String, | ||
link: String, | ||
description: String, | ||
pubDate: Date, | ||
project: PyPiProject) | ||
|
||
case class PyPiProject(info: Info, | ||
last_serial: Long, | ||
releases: List[ReleaseVersion], | ||
urls: List[Release]) | ||
|
||
case class Info(author: String, | ||
author_email: String, | ||
bugtrack_url: Option[String], | ||
classifiers: List[String], | ||
description: String, | ||
description_content_type: String, | ||
docs_url: Option[String], | ||
download_url: String, | ||
downloads: Downloads, | ||
home_page: String, | ||
keywords: String, | ||
license: String, | ||
maintainer: String, | ||
maintainer_email: String, | ||
name: String, | ||
package_url: String, | ||
platform: String, | ||
project_url: String, | ||
project_urls: Option[ProjectUrl], | ||
release_url: String, | ||
requires_dist: List[String], | ||
requires_python: Option[String], | ||
summary: String, | ||
version: String) | ||
|
||
case class Downloads(last_day: Int, last_month: Int, last_week: Int) | ||
|
||
case class ProjectUrl(Homepage: String) | ||
|
||
case class ReleaseVersion(version: String, releases: List[Release]) | ||
|
||
case class Release(comment_text: String, | ||
digests: Digest, | ||
downloads: Double, | ||
filename: String, | ||
has_sig: Boolean, | ||
md5_digest: String, | ||
packagetype: String, | ||
python_version: String, | ||
requires_python: Option[String], | ||
size: Double, | ||
upload_time: Date, | ||
url: String) | ||
|
||
case class Digest(md5: String, sha256: String) | ||
} |
Oops, something went wrong.