# Operation on RDDs

In [2]:
// We have this utils object that we could reuse later on 
object Utils {
  // a regular expression which matches commas but not commas within double quotations
  val COMMA_DELIMITER = ",(?=([^\"]*\"[^\"]*\")*[^\"]*$)"
}

defined object Utils


In [6]:
// Data inputs

In [28]:
val path="scala-spark-tutorial/in"

path: String = scala-spark-tutorial/in


In [9]:
val inputWords = List("spark", "hadoop", "spark", "hive", "pig", "cassandra", "hadoop")
val wordRdd = sc.parallelize(inputWords)

inputWords: List[String] = List(spark, hadoop, spark, hive, pig, cassandra, hadoop)
wordRdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26


In [10]:
wordRdd.collect()

res3: Array[String] = Array(spark, hadoop, spark, hive, pig, cassandra, hadoop)


In [11]:
// some loop to println
for (w <- wordRdd) println(w)

hadoop
spark
hive
spark
pig
cassandra
hadoop


In [12]:
// I can even put it in a variable
val ward= for (w <- wordRdd ) yield w

ward: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at map at <console>:27


In [19]:
ward.collect()

res9: Array[String] = Array(spark, hadoop, spark, hive, pig, cassandra, hadoop)


In [20]:
// we can count rows
wordRdd.count()

res10: Long = 7


In [23]:
// now countValue
val wordRddCoutValue= wordRdd.countByValue()

wordRddCoutValue: scala.collection.Map[String,Long] = Map(cassandra -> 1, hadoop -> 2, spark -> 2, hive -> 1, pig -> 1)


In [22]:
// We got a Map collection 
// we better not have much data
// Now to print values that are in the resulting countValue RDD

In [24]:
for ((k,v) <- wordRddCoutValue) println(k, v)

(cassandra,1)
(hadoop,2)
(spark,2)
(hive,1)
(pig,1)


## Intersecton of two RDDs

In [63]:
path

res41: String = scala-spark-tutorial/in


In [38]:
// read the TSV file 
val julyFirstLogs = spark.sparkContext.textFile("scala-spark-tutorial/in/nasa_19950701.tsv")

val augustFirstLogs = sc.textFile("scala-spark-tutorial/in/nasa_19950801.tsv")

julyFirstLogs: org.apache.spark.rdd.RDD[String] = scala-spark-tutorial/in/nasa_19950701.tsv MapPartitionsRDD[16] at textFile at <console>:26
augustFirstLogs: org.apache.spark.rdd.RDD[String] = scala-spark-tutorial/in/nasa_19950801.tsv MapPartitionsRDD[18] at textFile at <console>:28


In [37]:
julyFirstLogs.take(2)

res20: Array[String] = Array(host	logname	time	method	url	response	bytes, "199.72.81.55	-	804571201	GET	/history/apollo/	200	6245		")


In [49]:
augustFirstLogs.take(2)
julyFirstLogs.map(line => line.split("\t")(0)).take(2)

res30: Array[String] = Array(host, 199.72.81.55)


In [50]:

val julyFirstHosts = julyFirstLogs.map(line => line.split("\t")(0))
val augustFirstHosts = augustFirstLogs.map(line => line.split("\t")(0))

julyFirstHosts: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[25] at map at <console>:27
augustFirstHosts: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at map at <console>:28


In [51]:
val intersection = julyFirstHosts.intersection(augustFirstHosts)

intersection: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[32] at intersection at <console>:27


In [54]:
julyFirstHosts.collect()

res33: Array[String] = Array(host, 199.72.81.55, unicomp6.unicomp.net, 199.120.110.21, burger.letters.com, 199.120.110.21, burger.letters.com, burger.letters.com, 205.212.115.106, d104.aa.net, 129.94.144.152, unicomp6.unicomp.net, unicomp6.unicomp.net, unicomp6.unicomp.net, d104.aa.net, d104.aa.net, d104.aa.net, 129.94.144.152, 199.120.110.21, ppptky391.asahi-net.or.jp, net-1-141.eden.com, ppptky391.asahi-net.or.jp, 205.189.154.54, waters-gw.starway.net.au, ppp-mia-30.shadow.net, 205.189.154.54, alyssa.prodigy.com, ppp-mia-30.shadow.net, dial22.lloyd.com, smyth-pc.moorecap.com, 205.189.154.54, ix-orl2-01.ix.netcom.com, ppp-mia-30.shadow.net, ppp-mia-30.shadow.net, 205.189.154.54, ppp-mia-30.shadow.net, ppp-mia-30.shadow.net, ix-orl2-01.ix.netcom.com, gayle-gaston.tenet.edu, piweba3y.pro...


In [53]:
intersection.collect()

res32: Array[String] = Array(alyssa.prodigy.com, www-d1.proxy.aol.com, piweba4y.prodigy.com, piweba2y.prodigy.com, www-b3.proxy.aol.com, columbia.acc.brad.ac.uk, host, spectrum.xerox.com, beglinger.dial-up.bdt.com, www-d3.proxy.aol.com, freenet.edmonton.ab.ca, dd08-021.compuserve.com, netcom3.netcom.com, www-b5.proxy.aol.com, disarray.demon.co.uk, ottgate2.bnr.ca, www-a2.proxy.aol.com, pm206-52.smartlink.net, vagrant.vf.mmc.com, www-a1.proxy.aol.com, alpha2.csd.uwm.edu, piweba1y.prodigy.com, srv1.freenet.calgary.ab.ca, hitiij.hitachi.co.jp, ccn.cs.dal.ca, wwwproxy.info.au, www-d2.proxy.aol.com, server.elysian.net, hella.stm.it, piweba3y.prodigy.com, ntigate.nt.com, www-b2.proxy.aol.com, palona1.cns.hp.com, www-d4.proxy.aol.com, bettong.client.uq.oz.au, koala.melbpc.org.au, magicall.daco...


In [60]:
//intersection.filter(host => host !="host").collect()

In [61]:
val cleanedHostIntersection = intersection.filter(host => host != "host")

cleanedHostIntersection: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[38] at filter at <console>:26


In [62]:
// Now We can save it !!!
//cleanedHostIntersection.saveAsTextFile("out/nasa_logs_same_hosts.csv")

## Union RDDs

In [64]:
// working on the same RDDs
val aggregatedLogLines = julyFirstLogs.union(augustFirstLogs)

aggregatedLogLines: org.apache.spark.rdd.RDD[String] = UnionRDD[39] at union at <console>:28


In [65]:
aggregatedLogLines.count()

res42: Long = 20000


In [66]:
julyFirstLogs.count()

res43: Long = 10000


In [67]:
augustFirstLogs.count()

res44: Long = 10000


In [68]:
// It looks like an append !!!

In [69]:
aggregatedLogLines.collect()

res46: Array[String] = Array(host	logname	time	method	url	response	bytes, "199.72.81.55	-	804571201	GET	/history/apollo/	200	6245		", "unicomp6.unicomp.net	-	804571206	GET	/shuttle/countdown/	200	3985		", "199.120.110.21	-	804571209	GET	/shuttle/missions/sts-73/mission-sts-73.html	200	4085		", "burger.letters.com	-	804571211	GET	/shuttle/countdown/liftoff.html	304	0		", "199.120.110.21	-	804571211	GET	/shuttle/missions/sts-73/sts-73-patch-small.gif	200	4179		", "burger.letters.com	-	804571212	GET	/images/NASA-logosmall.gif	304	0		", "burger.letters.com	-	804571212	GET	/shuttle/countdown/video/livevideo.gif	200	0		", "205.212.115.106	-	804571212	GET	/shuttle/countdown/countdown.html	200	3985		", "d104.aa.net	-	804571213	GET	/shuttle/countdown/	200	3985		", "129.94.144.152	-	804571213	GET...


In [71]:
def isNotHeader(line: String): Boolean = !(line.startsWith("host") && line.contains("bytes"))

isNotHeader: (line: String)Boolean


In [72]:
val cleanLogLines = aggregatedLogLines.filter(line => isNotHeader(line))

cleanLogLines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[40] at filter at <console>:28


In [74]:
cleanLogLines.count()

res48: Long = 19998


In [75]:
cleanLogLines.first()

res49: String = "199.72.81.55	-	804571201	GET	/history/apollo/	200	6245		"


In [76]:
val sample = cleanLogLines.sample(withReplacement = true, fraction = 0.1)

sample: org.apache.spark.rdd.RDD[String] = PartitionwiseSampledRDD[41] at sample at <console>:26


In [77]:
sample.count()

res50: Long = 1976


In [86]:
//1 - 
sample.count()*1.0/cleanLogLines.count()

res59: Double = 0.09880988098809881


In [87]:
// Now I can save it
// sample.saveAsTextFile("out/sample_nasa_logs.csv")

# PERSIST RDD

In [89]:
val inputIntegers = List(1, 2, 3, 4, 5)
val integerRdd = sc.parallelize(inputIntegers)

integerRdd.persist()
//(StorageLevel.MEMORY_ONLY)


inputIntegers: List[Int] = List(1, 2, 3, 4, 5)
integerRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[42] at parallelize at <console>:26
res62: integerRdd.type = ParallelCollectionRDD[42] at parallelize at <console>:26


In [90]:
integerRdd.collect()

res63: Array[Int] = Array(1, 2, 3, 4, 5)


In [93]:
// Reduce
val reduceRdd = integerRdd.reduce((x, y) => x * y)
integerRdd.count()

reduceRdd: Int = 120
res66: Long = 5


In [95]:
reduceRdd

res68: Int = 120


# Sum of Numbers

In [96]:
val lines = sc.textFile("scala-spark-tutorial/in/prime_nums.text")

lines: org.apache.spark.rdd.RDD[String] = scala-spark-tutorial/in/prime_nums.text MapPartitionsRDD[44] at textFile at <console>:25


In [98]:
lines.collect()

res70: Array[String] = Array("  2	  3	  5	  7	 11	 13	 17	 19	 23	 29", " 31	 37	 41	 43	 47	 53	 59	 61	 67	 71", " 73	 79	 83	 89	 97	101	103	107	109	113", 127	131	137	139	149	151	157	163	167	173, 179	181	191	193	197	199	211	223	227	229, 233	239	241	251	257	263	269	271	277	281, 283	293	307	311	313	317	331	337	347	349, 353	359	367	373	379	383	389	397	401	409, 419	421	431	433	439	443	449	457	461	463, 467	479	487	491	499	503	509	521	523	541)


In [99]:
lines.count()

res71: Long = 10


In [100]:
val numbers = lines.flatMap(line => line.split("\\s+"))

numbers: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[45] at flatMap at <console>:26


In [101]:
numbers.collect()

res72: Array[String] = Array("", 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, "", 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, "", 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523, 541)


In [102]:
val validNumbers = numbers.filter(number => !number.isEmpty)

validNumbers: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[46] at filter at <console>:26


In [103]:
validNumbers.collect()

res73: Array[String] = Array(2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523, 541)


In [104]:
val intNumbers = validNumbers.map(number => number.toInt)

intNumbers: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[47] at map at <console>:26


In [105]:
intNumbers.collect()

res74: Array[Int] = Array(2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523, 541)


In [106]:
println("Sum is: " + intNumbers.reduce((x, y) => x + y))

Sum is: 24133


In [108]:
intNumbers.count()

res77: Long = 100


# Word Count

In [109]:
val lines = sc.textFile("scala-spark-tutorial/in/word_count.text")
val words = lines.flatMap(line => line.split(" "))

lines: org.apache.spark.rdd.RDD[String] = scala-spark-tutorial/in/word_count.text MapPartitionsRDD[49] at textFile at <console>:27
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[50] at flatMap at <console>:28


In [111]:
lines.collect()

res79: Array[String] = Array(The history of New York begins around 10,000 BC, when the first Native Americans arrived. By 1100 AD, New York's main native cultures, the Iroquoian and Algonquian, had developed. European discovery of New York was led by the French in 1524 and the first land claim came in 1609 by the Dutch. As part of New Netherland, the colony was important in the fur trade and eventually became an agricultural resource thanks to the patroon system. In 1626 the Dutch bought the island of Manhattan from Native Americans.[1] In 1664, England renamed the colony New York, after the Duke of York (later James II & VII.) New York City gained prominence in the 18th century as a major trading port in the Thirteen Colonies., "", New York played a pivotal role during the American Rev...


In [110]:
words.collect()

res78: Array[String] = Array(The, history, of, New, York, begins, around, 10,000, BC,, when, the, first, Native, Americans, arrived., By, 1100, AD,, New, York's, main, native, cultures,, the, Iroquoian, and, Algonquian,, had, developed., European, discovery, of, New, York, was, led, by, the, French, in, 1524, and, the, first, land, claim, came, in, 1609, by, the, Dutch., As, part, of, New, Netherland,, the, colony, was, important, in, the, fur, trade, and, eventually, became, an, agricultural, resource, thanks, to, the, patroon, system., In, 1626, the, Dutch, bought, the, island, of, Manhattan, from, Native, Americans.[1], In, 1664,, England, renamed, the, colony, New, York,, after, the, Duke, of, York, (later, James, II, &, VII.), New, York, City, gained, prominence, in, the, 18th, cen...


In [115]:
val wordCounts = words.countByValue()

wordCounts: scala.collection.Map[String,Long] = Map(Twenties, -> 1, II -> 2, industries. -> 1, economy -> 1, "" -> 7, ties -> 2, buildings -> 1, for -> 3, eleventh -> 1, ultimately -> 1, support -> 1, channels -> 1, Thereafter, -> 1, subsequent -> 1, defense -> 1, series -> 1, proposed -> 1, any -> 1, 1790, -> 1, city -> 1, war. -> 2, southern -> 2, across -> 1, operations -> 1, 18th -> 1, challenge -> 1, in -> 21, Park -> 1, expressed -> 1, Civil -> 1, point -> 2, cultural -> 1, 1777, -> 1, claim -> 1, labor -> 1, British -> 3, influenced -> 1, War -> 2, representatives -> 1, patroon -> 1, system -> 1, Iroquoian -> 1, Battery -> 1, nationally -> 1, 1664, -> 1, history -> 1, killing -> 1, late -> 1, renewed -> 1, City's -> 1, shrank. -> 1, After -> 1, Wall -> 1, In -> 3, state -> 5, 11 ...


In [114]:
//for ((word, count) <- wordCounts) println(word + " : " + count)