Skip to content

Commit

Permalink
Add Serializable APIs for DataFrames (#389)
Browse files Browse the repository at this point in the history
- Add keepValidPagesDF
- Add HTTP status code column to all()
- Add test for keepValidPagesDF
- Addresses #223
  • Loading branch information
Gursimran Singh authored and ruebot committed Dec 17, 2019
1 parent 9e32284 commit ca928d8
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 3 deletions.
33 changes: 30 additions & 3 deletions src/main/scala/io/archivesunleashed/package.scala
Expand Up @@ -31,10 +31,10 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import io.archivesunleashed.matchbox.ExtractDateRDD.DateComponent.DateComponent
import java.net.URI
import java.net.URL
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.types.{BinaryType, IntegerType, StringType, StructField, StructType}
import org.apache.hadoop.io.LongWritable
import org.apache.spark.{SerializableWritable, SparkContext}
import org.apache.spark.{RangePartitioner, SerializableWritable, SparkContext}
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
import scala.util.matching.Regex
Expand Down Expand Up @@ -83,6 +83,32 @@ package object archivesunleashed {
}
}

/**
* A Wrapper class around DF to allow Dfs of type ARCRecord and WARCRecord to be queried via a fluent API.
*
* To load such an DF, please use [[RecordLoader]] and apply .all() on it.
*/
implicit class WARecordDF(df: DataFrame) extends java.io.Serializable {

def keepValidPagesDF(): DataFrame = {

val spark = SparkSession.builder().master("local").getOrCreate()
// scalastyle:off
import spark.implicits._
// scalastyle:on

df.filter($"crawl_date" isNotNull)
.filter(!($"url".rlike(".*robots\\.txt$")) &&
( $"mime_type_web_server".rlike("text/html") ||
$"mime_type_web_server".rlike("application/xhtml+xml") ||
$"url".rlike("(?i).*htm$") ||
$"url".rlike("(?i).*html$")
)
)
.filter($"HttpStatus" === 200)
}
}

/**
* A Wrapper class around RDD to allow RDDs of type ARCRecord and WARCRecord to be queried via a fluent API.
*
Expand All @@ -94,7 +120,7 @@ package object archivesunleashed {
Call KeepImages OR KeepValidPages on RDD depending upon the requirement before calling this method */
def all(): DataFrame = {
val records = rdd.map(r => Row(r.getCrawlDate, r.getUrl, r.getMimeType,
DetectMimeTypeTika(r.getBinaryBytes), r.getContentString, r.getBinaryBytes))
DetectMimeTypeTika(r.getBinaryBytes), r.getContentString, r.getBinaryBytes, r.getHttpStatus))

val schema = new StructType()
.add(StructField("crawl_date", StringType, true))
Expand All @@ -103,6 +129,7 @@ package object archivesunleashed {
.add(StructField("mime_type_tika", StringType, true))
.add(StructField("content", StringType, true))
.add(StructField("bytes", BinaryType, true))
.add(StructField("HttpStatus", StringType, true))

val sqlContext = SparkSession.builder()
sqlContext.getOrCreate().createDataFrame(records, schema)
Expand Down
56 changes: 56 additions & 0 deletions src/test/scala/io/archivesunleashed/RecordDFTest.scala
@@ -0,0 +1,56 @@
/*
* Copyright © 2017 The Archives Unleashed Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.archivesunleashed

import com.google.common.io.Resources
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfter, FunSuite}

@RunWith(classOf[JUnitRunner])
class RecordDFTest extends FunSuite with BeforeAndAfter {
private val arcPath = Resources.getResource("arc/example.arc.gz").getPath
private val badPath = Resources.getResource("arc/badexample.arc.gz").getPath
private val master = "local[4]"
private val appName = "example-spark"
private var sc: SparkContext = _
private val archive = "http://www.archive.org/"
private val sloan = "http://www.sloan.org"
private val regex = raw"Please visit our website at".r

before {
val conf = new SparkConf()
.setMaster(master)
.setAppName(appName)
conf.set("spark.driver.allowMultipleContexts", "true");
sc = new SparkContext(conf)
}

test("keep Valid Pages") {
val expected = "http://www.archive.org/"
val base = RecordLoader.loadArchives(arcPath, sc).all()
.keepValidPagesDF().take(1)(0)(1)
assert (base.toString == expected)
}

after {
if (sc != null) {
sc.stop()
}
}
}

0 comments on commit ca928d8

Please sign in to comment.