From ca928d8a3d5350807e47ede9f910593d668ff48b Mon Sep 17 00:00:00 2001 From: Gursimran Singh Date: Tue, 17 Dec 2019 14:07:56 -0500 Subject: [PATCH] Add Serializable APIs for DataFrames (#389) - Add keepValidPagesDF - Add HTTP status code column to all() - Add test for keepValidPagesDF - Addresses #223 --- .../scala/io/archivesunleashed/package.scala | 33 ++++++++++- .../io/archivesunleashed/RecordDFTest.scala | 56 +++++++++++++++++++ 2 files changed, 86 insertions(+), 3 deletions(-) create mode 100644 src/test/scala/io/archivesunleashed/RecordDFTest.scala diff --git a/src/main/scala/io/archivesunleashed/package.scala b/src/main/scala/io/archivesunleashed/package.scala index 11542e0e..69282724 100644 --- a/src/main/scala/io/archivesunleashed/package.scala +++ b/src/main/scala/io/archivesunleashed/package.scala @@ -31,10 +31,10 @@ import org.apache.hadoop.fs.{FileSystem, Path} import io.archivesunleashed.matchbox.ExtractDateRDD.DateComponent.DateComponent import java.net.URI import java.net.URL -import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.types.{BinaryType, IntegerType, StringType, StructField, StructType} import org.apache.hadoop.io.LongWritable -import org.apache.spark.{SerializableWritable, SparkContext} +import org.apache.spark.{RangePartitioner, SerializableWritable, SparkContext} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag import scala.util.matching.Regex @@ -83,6 +83,32 @@ package object archivesunleashed { } } + /** + * A Wrapper class around DF to allow Dfs of type ARCRecord and WARCRecord to be queried via a fluent API. + * + * To load such an DF, please use [[RecordLoader]] and apply .all() on it. + */ + implicit class WARecordDF(df: DataFrame) extends java.io.Serializable { + + def keepValidPagesDF(): DataFrame = { + + val spark = SparkSession.builder().master("local").getOrCreate() + // scalastyle:off + import spark.implicits._ + // scalastyle:on + + df.filter($"crawl_date" isNotNull) + .filter(!($"url".rlike(".*robots\\.txt$")) && + ( $"mime_type_web_server".rlike("text/html") || + $"mime_type_web_server".rlike("application/xhtml+xml") || + $"url".rlike("(?i).*htm$") || + $"url".rlike("(?i).*html$") + ) + ) + .filter($"HttpStatus" === 200) + } + } + /** * A Wrapper class around RDD to allow RDDs of type ARCRecord and WARCRecord to be queried via a fluent API. * @@ -94,7 +120,7 @@ package object archivesunleashed { Call KeepImages OR KeepValidPages on RDD depending upon the requirement before calling this method */ def all(): DataFrame = { val records = rdd.map(r => Row(r.getCrawlDate, r.getUrl, r.getMimeType, - DetectMimeTypeTika(r.getBinaryBytes), r.getContentString, r.getBinaryBytes)) + DetectMimeTypeTika(r.getBinaryBytes), r.getContentString, r.getBinaryBytes, r.getHttpStatus)) val schema = new StructType() .add(StructField("crawl_date", StringType, true)) @@ -103,6 +129,7 @@ package object archivesunleashed { .add(StructField("mime_type_tika", StringType, true)) .add(StructField("content", StringType, true)) .add(StructField("bytes", BinaryType, true)) + .add(StructField("HttpStatus", StringType, true)) val sqlContext = SparkSession.builder() sqlContext.getOrCreate().createDataFrame(records, schema) diff --git a/src/test/scala/io/archivesunleashed/RecordDFTest.scala b/src/test/scala/io/archivesunleashed/RecordDFTest.scala new file mode 100644 index 00000000..0232c538 --- /dev/null +++ b/src/test/scala/io/archivesunleashed/RecordDFTest.scala @@ -0,0 +1,56 @@ +/* + * Copyright © 2017 The Archives Unleashed Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.archivesunleashed + +import com.google.common.io.Resources +import org.apache.spark.{SparkConf, SparkContext} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfter, FunSuite} + +@RunWith(classOf[JUnitRunner]) +class RecordDFTest extends FunSuite with BeforeAndAfter { + private val arcPath = Resources.getResource("arc/example.arc.gz").getPath + private val badPath = Resources.getResource("arc/badexample.arc.gz").getPath + private val master = "local[4]" + private val appName = "example-spark" + private var sc: SparkContext = _ + private val archive = "http://www.archive.org/" + private val sloan = "http://www.sloan.org" + private val regex = raw"Please visit our website at".r + + before { + val conf = new SparkConf() + .setMaster(master) + .setAppName(appName) + conf.set("spark.driver.allowMultipleContexts", "true"); + sc = new SparkContext(conf) + } + + test("keep Valid Pages") { + val expected = "http://www.archive.org/" + val base = RecordLoader.loadArchives(arcPath, sc).all() + .keepValidPagesDF().take(1)(0)(1) + assert (base.toString == expected) + } + + after { + if (sc != null) { + sc.stop() + } + } +} \ No newline at end of file