Skip to content

Commit

Permalink
Merge f4772cd into 3a6d888
Browse files Browse the repository at this point in the history
  • Loading branch information
wzorgdrager committed Apr 17, 2019
2 parents 3a6d888 + f4772cd commit 38d8188
Show file tree
Hide file tree
Showing 7 changed files with 451 additions and 4 deletions.
18 changes: 14 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ lazy val pluginGHTorrent = (project in file("codefeedr-plugins/codefeedr-ghtorre
)
).dependsOn(core)

lazy val pluginPypi = (project in file("codefeedr-plugins/codefeedr-pypi"))
.settings(
name := pluginPrefix + "pypi",
settings,
assemblySettings,
libraryDependencies ++= commonDependencies ++ Seq(
)
).dependsOn(core)

lazy val dependencies =
new {
val flinkVersion = "1.7.0"
Expand All @@ -164,10 +173,10 @@ lazy val dependencies =
val loggingCore = "org.apache.logging.log4j" % "log4j-core" % log4jVersion % Runtime
val loggingScala = "org.apache.logging.log4j" %% "log4j-api-scala" % log4jScalaVersion

val flink = "org.apache.flink" %% "flink-scala" % flinkVersion % Provided
val flinkStreaming = "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided
val flink = "org.apache.flink" %% "flink-scala" % flinkVersion // % Provided
val flinkStreaming = "org.apache.flink" %% "flink-streaming-scala" % flinkVersion // % Provided
val flinkKafka = "org.apache.flink" %% "flink-connector-kafka" % flinkVersion
val flinkRuntimeWeb = "org.apache.flink" %% "flink-runtime-web" % flinkVersion % Provided
val flinkRuntimeWeb = "org.apache.flink" %% "flink-runtime-web" % flinkVersion //% Provided
val flinkElasticSearch = "org.apache.flink" %% "flink-connector-elasticsearch6" % flinkVersion
val flinkRabbitMQ = "org.apache.flink" %% "flink-connector-rabbitmq" % flinkVersion

Expand Down Expand Up @@ -222,7 +231,8 @@ lazy val commonSettings = Seq(
"confluent" at "http://packages.confluent.io/maven/",
"Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/",
"Artima Maven Repository" at "http://repo.artima.com/releases",
Resolver.mavenLocal
Resolver.mavenLocal,
Resolver.jcenterRepo
),
publishMavenStyle in ThisBuild := true,
publishTo in ThisBuild := Some(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.codefeedr.plugins.pypi

import org.codefeedr.pipeline.PipelineBuilder
import org.codefeedr.plugins.pypi.protocol.Protocol.PyPiReleaseExt
import org.codefeedr.plugins.pypi.stages.{
PyPiReleaseExtStage,
PyPiReleasesStage
}
import org.codefeedr.stages.utilities.PrinterOutput

object Main {
def main(args: Array[String]): Unit = {
new PipelineBuilder()
.append(new PyPiReleasesStage)
.append(new PyPiReleaseExtStage)
.append(new PrinterOutput[PyPiReleaseExt]())
.build()
.startMock()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.codefeedr.plugins.pypi.protocol

import java.util.Date

object Protocol {

case class PyPiRelease(title: String,
link: String,
description: String,
pubDate: Date)

case class PyPiReleaseExt(title: String,
link: String,
description: String,
pubDate: Date,
project: PyPiProject)

case class PyPiProject(info: Info,
last_serial: Long,
releases: List[ReleaseVersion],
urls: List[Release])

case class Info(author: String,
author_email: String,
bugtrack_url: Option[String],
classifiers: List[String],
description: String,
description_content_type: String,
docs_url: Option[String],
download_url: String,
downloads: Downloads,
home_page: String,
keywords: String,
license: String,
maintainer: String,
maintainer_email: String,
name: String,
package_url: String,
platform: String,
project_url: String,
project_urls: Option[ProjectUrl],
release_url: String,
requires_dist: List[String],
requires_python: Option[String],
summary: String,
version: String)

case class Downloads(last_day: Int, last_month: Int, last_week: Int)
case class ProjectUrl(Homepage: String)

case class ReleaseVersion(version: String, releases: List[Release])

case class Release(comment_text: String,
digests: Digest,
downloads: Double,
filename: String,
has_sig: Boolean,
md5_digest: String,
packagetype: String,
python_version: String,
requires_python: Option[String],
size: Double,
upload_time: Date,
url: String)

case class Digest(md5: String, sha256: String)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.codefeedr.plugins.pypi.stages

import java.util.concurrent.TimeUnit

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.streaming.api.scala.{AsyncDataStream, DataStream}
import org.codefeedr.plugins.pypi.protocol.Protocol.{
PyPiProject,
PyPiRelease,
PyPiReleaseExt
}
import org.codefeedr.stages.TransformStage
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
import org.codefeedr.plugins.pypi.util.PyPiService

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
class PyPiReleaseExtStage(stageId: String = "pypi_releases")
extends TransformStage[PyPiRelease, PyPiReleaseExt](Some(stageId)) {
override def transform(
source: DataStream[PyPiRelease]): DataStream[PyPiReleaseExt] = {

val async = AsyncDataStream.orderedWait(source,
new MapReleaseToProject,
5,
TimeUnit.SECONDS,
100)

//async.print()

async
}
}

/** Maps a [[PyPiRelease]] to a [[PyPiProject]]. */
class MapReleaseToProject extends AsyncFunction[PyPiRelease, PyPiReleaseExt] {

implicit lazy val executor: ExecutionContext = ExecutionContext.global

override def asyncInvoke(input: PyPiRelease,
resultFuture: ResultFuture[PyPiReleaseExt]): Unit = {
val projectName = input.title.split(" ")(0)

val requestProject: Future[Option[PyPiProject]] = Future(
PyPiService.getProject(projectName))

requestProject.onComplete {
case Success(result: Option[PyPiProject]) => {
if (result.isDefined)
resultFuture.complete(
Iterable(
PyPiReleaseExt(input.title,
input.link,
input.description,
input.pubDate,
result.get)))
}
case Failure(e) =>
resultFuture.complete(Iterable())
e.printStackTrace()
}

}

override def timeout(input: PyPiRelease,
resultFuture: ResultFuture[PyPiReleaseExt]): Unit = {
println("HEREEE")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.codefeedr.plugins.pypi.stages

import org.apache.flink.streaming.api.scala.DataStream
import org.codefeedr.pipeline.Context
import org.codefeedr.plugins.pypi.protocol.Protocol.PyPiRelease
import org.codefeedr.plugins.pypi.util.PyPiReleasesSource
import org.codefeedr.stages.InputStage
import org.apache.flink.api.scala._

class PyPiReleasesStage(stageId: String = "pypi_releases_min")
extends InputStage[PyPiRelease](Some(stageId)) {
override def main(context: Context): DataStream[PyPiRelease] = {
val str = context.env
.addSource(new PyPiReleasesSource())

str
}
}

0 comments on commit 38d8188

Please sign in to comment.