# Chapter 3 Programming with RDDs

## Initializing spark context

In [25]:
conf = SparkConf().setAppName("app1").setMaster("local")
sc = SparkContext(conf=conf)

## calculating PI

In [17]:
import random
num_samples = 100000000
def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1
count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print(pi)

3.14155684


## Playing with text data

In [26]:
lines=sc.textFile('/Users/pgupta10/ALL_DATA.csv')

In [27]:
lines.count()

716388

In [28]:
lines.first()

'order_id,external_row_id,cust_order_id,sku,quantity,adjusted_price_cents,customer_id,contact_id,create_date,pf_id,name,last_update_date'

## RDD Basics

An RDD in Spark is simply an immutable distributed collection of objects. Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java, or Scala objects, including user- defined classes.

In [32]:
pythonLines = lines.filter(lambda line: "order_id" in line)
pythonLines.first()

'order_id,external_row_id,cust_order_id,sku,quantity,adjusted_price_cents,customer_id,contact_id,create_date,pf_id,name,last_update_date'

Once created, RDDs offer two types of operations: transformations and actions. Transformations construct a new RDD from a previous one. For example, one com‐ mon transformation is filtering data that matches a predicate.

One example of an action we called earlier is first(), which returns the first element in an RDD

Although you can define new RDDs any time, Spark computes them only in a lazy fashion—that is, the first time they are used in an action. In fact, for the first() action, Spark scans the file only until it finds the first matching line; it doesn’t even read the whole file.

Finally, Spark’s RDDs are by default recomputed each time you run an action on them. If you would like to reuse an RDD in multiple actions, you can ask Spark to persist it using RDD.persist(). In practice, you will often use persist() to load a subset of your data into memory and query it repeatedly. 

To summarize, every Spark program and shell session will work as follows:
1. Create some input RDDs from external data.
2. Transform them to define new RDDs using transformations like filter().
3. Ask Spark to persist() any intermediate RDDs that will need to be reused.
4. Launch actions such as count() and first() to kick off a parallel computation, which is then optimized and executed by Spark.

# RDD Operations
As we’ve discussed, RDDs support two types of operations: transformations and actions. Transformations are operations on RDDs that return a new RDD, such as map() and filter(). Actions are operations that return a result to the driver pro‐ gram or write it to storage, and kick off a computation, such as count() and first().

## Transformations
Transformations can actually operate on any number of input RDDs.

In [34]:
inputRDD = sc.textFile('log.txt')
errorRDD = inputRDD.filter(lambda line: "error" in line)
warnRDD = inputRDD.filter(lambda lines: "warn" in line)
errorAndWarnRDD = inputRDD.union(warnRDD)

A better way to accomplish the same result as in above would be to simply filter the inputRDD once, looking for either error or warning.

Finally, as you derive new RDDs from each other using transformations, Spark keeps track of the set of dependencies between different RDDs, called the lineage graph. It uses this information to compute each RDD on demand and to recover lost data if part of a persistent RDD is lost. Figure 3-1 shows a lineage graph

![Lineage Graph](img/lineageGraph.png)

## Actions
Actions are the second type of RDD operation. They are the operations that return a final value to the driver program or write data to an external storage system. Actions force the evaluation of the transformations required for the RDD they were called on, since they need to actually produce output.

In [22]:
sc.stop()