Skip to content

Commit

Permalink
Debug time.
Browse files Browse the repository at this point in the history
  • Loading branch information
wzorgdrager committed Apr 17, 2019
1 parent c802f7b commit f4772cd
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package org.codefeedr.plugins.pypi

import org.codefeedr.pipeline.PipelineBuilder
import org.codefeedr.plugins.pypi.stages.PyPiReleasesStage
import org.codefeedr.plugins.pypi.protocol.Protocol.PyPiReleaseExt
import org.codefeedr.plugins.pypi.stages.{
PyPiReleaseExtStage,
PyPiReleasesStage
}
import org.codefeedr.stages.utilities.PrinterOutput

object Main {
def main(args: Array[String]): Unit = {
new PipelineBuilder()
.append(new PyPiReleasesStage)
.append(new PyPiReleaseExtStage)
.append(new PrinterOutput[PyPiReleaseExt]())
.build()
.startMock()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ object Protocol {
classifiers: List[String],
description: String,
description_content_type: String,
docs_url: String,
docs_url: Option[String],
download_url: String,
downloads: Downloads,
home_page: String,
Expand All @@ -38,10 +38,10 @@ object Protocol {
package_url: String,
platform: String,
project_url: String,
project_urls: ProjectUrl,
project_urls: Option[ProjectUrl],
release_url: String,
requires_dist: String,
requires_python: String,
requires_dist: List[String],
requires_python: Option[String],
summary: String,
version: String)

Expand All @@ -58,7 +58,7 @@ object Protocol {
md5_digest: String,
packagetype: String,
python_version: String,
requires_python: String,
requires_python: Option[String],
size: Double,
upload_time: Date,
url: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ import java.util.concurrent.TimeUnit

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.streaming.api.scala.{AsyncDataStream, DataStream}
import org.codefeedr.plugins.pypi.protocol.Protocol.{PyPiProject, PyPiRelease}
import org.codefeedr.plugins.pypi.protocol.Protocol.{
PyPiProject,
PyPiRelease,
PyPiReleaseExt
}
import org.codefeedr.stages.TransformStage
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
Expand All @@ -13,33 +17,44 @@ import org.codefeedr.plugins.pypi.util.PyPiService
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
class PyPiReleaseExtStage(stageId: String = "pypi_releases")
extends TransformStage[PyPiRelease, PyPiProject](Some(stageId)) {
extends TransformStage[PyPiRelease, PyPiReleaseExt](Some(stageId)) {
override def transform(
source: DataStream[PyPiRelease]): DataStream[PyPiProject] = {
source: DataStream[PyPiRelease]): DataStream[PyPiReleaseExt] = {

val async = AsyncDataStream.orderedWait(source,
new MapReleaseToProject,
5,
TimeUnit.SECONDS,
100)

AsyncDataStream.orderedWait(source,
new MapReleaseToProject,
5,
TimeUnit.SECONDS,
100)
//async.print()

async
}
}

/** Maps a [[PyPiRelease]] to a [[PyPiProject]]. */
class MapReleaseToProject extends AsyncFunction[PyPiRelease, PyPiProject] {
class MapReleaseToProject extends AsyncFunction[PyPiRelease, PyPiReleaseExt] {

implicit val executor: ExecutionContext = ExecutionContext.global
implicit lazy val executor: ExecutionContext = ExecutionContext.global

override def asyncInvoke(input: PyPiRelease,
resultFuture: ResultFuture[PyPiProject]): Unit = {
resultFuture: ResultFuture[PyPiReleaseExt]): Unit = {
val projectName = input.title.split(" ")(0)

val requestProject: Future[Option[PyPiProject]] = Future(
PyPiService.getProject(projectName))

requestProject.onComplete {
case Success(result: Option[PyPiRelease]) => {
if (result.isDefined) resultFuture.complete(Iterable(result.get))
case Success(result: Option[PyPiProject]) => {
if (result.isDefined)
resultFuture.complete(
Iterable(
PyPiReleaseExt(input.title,
input.link,
input.description,
input.pubDate,
result.get)))
}
case Failure(e) =>
resultFuture.complete(Iterable())
Expand All @@ -49,5 +64,7 @@ class MapReleaseToProject extends AsyncFunction[PyPiRelease, PyPiProject] {
}

override def timeout(input: PyPiRelease,
resultFuture: ResultFuture[PyPiProject]): Unit = {}
resultFuture: ResultFuture[PyPiReleaseExt]): Unit = {
println("HEREEE")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ class PyPiReleasesStage(stageId: String = "pypi_releases_min")
val str = context.env
.addSource(new PyPiReleasesSource())

str.print()

str
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
package org.codefeedr.plugins.pypi.util

import java.text.SimpleDateFormat

import org.codefeedr.plugins.pypi.protocol.Protocol.PyPiProject
import org.codefeedr.stages.utilities.HttpRequester
import org.json4s.{DefaultFormats, Formats}
import org.json4s.ext.JavaTimeSerializers
import scalaj.http.{Http, HttpRequest}
import org.json4s.jackson.JsonMethods.parse
import org.json4s.Extraction._
import org.json4s.JsonAST._

object PyPiService extends Serializable {

lazy implicit val formats: Formats = DefaultFormats ++ JavaTimeSerializers.all
lazy implicit val formats: Formats = new DefaultFormats {
override def dateFormatter: SimpleDateFormat =
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")

} ++ JavaTimeSerializers.all
private val url = "https://pypi.org/pypi/"

/** Retrieves a PyPi project.
Expand All @@ -26,9 +33,23 @@ object PyPiService extends Serializable {
if (rawProject.isEmpty) return None

val json = parse(rawProject.get)
extractOpt[PyPiProject](json)
// println(json)
//println(Some(extract[PyPiProject](transformProject(json))))
Some(extract[PyPiProject](transformProject(json)))
}

def transformProject(json: JValue): JValue =
json transformField {
case JField("releases", JObject(x)) => {
val newList = x.map { y =>
new JObject(
List(JField("version", JString(y._1)), JField("releases", y._2)))
}

JField("releases", JArray(newList))
}
}

/** Returns a project as a raw string.
*
* @param endpoint the end_point to do the request.
Expand Down

0 comments on commit f4772cd

Please sign in to comment.