In [32]:
import findspark
import pyspark
findspark.init()
sc = pyspark.SparkContext.getOrCreate()

In [33]:
logFile = sc.textFile("notebook.log")

In [39]:
#filter out the lines that contains INFO
linesWithINFO = logFile.filter(lambda line: "INFO" in line)

In [37]:
#Count the lines with INFO
logFile.filter(lambda line: "INFO" in line).count()

13438

In [40]:
#Count the lines with INFO
linesWithINFO.count()

13438

In [41]:
# Count no of lines with spark within the lines of INFO
linesWithINFO.filter(lambda line: "spark" in line).count()

156

In [42]:
# Count no of lines with spark in logFile
logFile.filter(lambda line: "spark" in line).count()

2238

In [43]:
#Fetch the lines(INFO + spark) as an array of Strings
linesWithINFO.filter(lambda line: "spark" in line).collect()

['15/10/14 14:29:23 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@172.17.0.22:53333]',
 "15/10/14 14:29:23 INFO Utils: Successfully started service 'sparkDriver' on port 53333.",
 '15/10/14 14:29:23 INFO DiskBlockManager: Created local directory at /tmp/spark-fe150378-7bad-42b6-876b-d14e2c193eb6/blockmgr-c142f2f1-ebb6-4612-945b-0a67d156230a',
 '15/10/14 14:29:23 INFO HttpFileServer: HTTP File server directory is /tmp/spark-fe150378-7bad-42b6-876b-d14e2c193eb6/httpd-ed3f4ab0-7218-48bc-9d8a-3981b1cfe574',
 "15/10/14 14:29:24 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 35726.",
 '15/10/15 15:33:42 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@172.17.0.22:47412]',
 "15/10/15 15:33:42 INFO Utils: Successfully started service 'sparkDriver' on port 47412.",
 '15/10/15 15:33:42 INFO DiskBlockManager: Created local directory at /tmp/spark-fc035223-3b43-43d1-8d7d-

In [44]:
#View the graph of an RDD
print(linesWithINFO.toDebugString())

b'(2) PythonRDD[127] at RDD at PythonRDD.scala:53 []\n |  notebook.log MapPartitionsRDD[120] at textFile at NativeMethodAccessorImpl.java:0 []\n |  notebook.log HadoopRDD[119] at textFile at NativeMethodAccessorImpl.java:0 []'


#### Joining RDDs

In [45]:
#create RDDs for the same README and the POM files
readmeFile = sc.textFile("README.md")
pomFile = sc.textFile("pom.xml")

In [49]:
#Count Spark keywords are in each file
print(readmeFile.filter(lambda line: "Spark" in line).count())
print(pomFile.filter(lambda line: "Spark" in line).count())

18
2


In [50]:
#WordCount on each RDD so that the results are (K,V) pairs of (word,count)
readmeCount = readmeFile.                    \
    flatMap(lambda line: line.split("   ")).   \
    map(lambda word: (word, 1)).             \
    reduceByKey(lambda a, b: a + b)
    
pomCount = pomFile.                          \
    flatMap(lambda line: line.split("   ")).   \
    map(lambda word: (word, 1)).            \
    reduceByKey(lambda a, b: a + b)

In [51]:
print("Readme Count\n")
print(readmeCount.collect())

Readme Count

[('', 43), ('Spark is a fast and general cluster computing system for Big Data. It provides', 1), ('and Spark Streaming for stream processing.', 1), ('guide, on the [project web page](http://spark.apache.org/documentation.html)', 1), ('## Building Spark', 1), ('Spark is built using [Apache Maven](http://maven.apache.org/).', 1), (' build/mvn -DskipTests clean package', 1), ('Try the following command, which should return 1000:', 1), (' scala> sc.parallelize(1 to 1000).count()', 1), ('## Interactive Python Shell', 1), (' ./bin/pyspark', 1), ('And run the following command, which should also return 1000:', 1), ('Spark also comes with several sample programs in the `examples` directory.', 1), ('To run one of them, use `./bin/run-example <class> [params]`. For example:', 1), (' ./bin/run-example SparkPi', 1), ('will run the Pi example locally.', 1), ('You can set the MASTER environment variable when running examples to submit', 1), ('examples to a cluster. This can be a mesos

In [52]:
print("Pom Count\n")
print(pomCount.collect())

Pom Count

[('<?xml version="1.0" encoding="UTF-8"?>', 1), ('  ~ Licensed to the Apache Software Foundation (ASF) under one or more', 1), ('  ~ contributor license agreements.  See the NOTICE file distributed with', 1), ('  ~ The ASF licenses this file to You under the Apache License, Version 2.0', 1), (' http://www.apache.org/licenses/LICENSE-2.0', 1), ('  ~ distributed under the License is distributed on an "AS IS" BASIS,', 1), ('  ~ limitations under the License.', 1), ('  -->', 1), ('', 841), ('<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">', 1), ('  <modelVersion>4.0.0</modelVersion>', 1), ('  <parent>', 1), (' <groupId>org.apache.spark</groupId>', 2), (' <artifactId>spark-parent_2.10</artifactId>', 1), (' <version>1.6.0-SNAPSHOT</version>', 1), ('  <properties>', 1), (' <sbt.project.name>examples</sbt.project.name>', 1), (' 

In [53]:
#join function combines the two datasets (K,V) and (K,W) together and get (K, (V,W))
joined = readmeCount.join(pomCount)

In [54]:
#Print the value to the console
joined.collect()

[('', (43, 841))]

In [55]:
#combine the values together to get the total count
joinedSum = joined.map(lambda k: (k[0], (k[1][0]+k[1][1])))

In [57]:
joinedSum.collect()

[('', 884)]

In [60]:
#print the first five elements from the joined and the joinedSum RDD
print("Joined Individial\n")
print(joined.take(5))

print("\n\nJoined Sum\n")
print(joinedSum.take(5))

Joined Individial

[('', (43, 841))]


Joined Sum

[('', 884)]


#### Broadcast variables

In [61]:
broadcastVar = sc.broadcast([1,2,3])

In [62]:
broadcastVar.value

[1, 2, 3]

In [63]:
accum = sc.accumulator(0)

In [64]:
accum.value

0

In [65]:
#parallelize an array of four integers and run it through a loop to add each integer value to the accumulator variable
rdd = sc.parallelize([1,2,3,4])
def f(x):
    global accum
    accum += x

In [66]:
#iterate through each element of the rdd and apply the function f
rdd.foreach(f)

In [67]:
accum.value

10

#### Key-value pairs

In [68]:
pair = ('a', 'b')

In [69]:
print(pair[0])
print(pair[1])

a
b
