# Introduction to Spark
Using Spark we are going to read in this data and calculate the average age. First, we need to initialize a SparkSession:

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Spark Example") \
    .getOrCreate()

Let’s go ahead and create a Spark Dataset from our Lord of the Rings age data. Included in the Spark directory for this chapter is a file called ages.json which includes the age data in JSON lines format. It looks like:

```
{"Name": "Bilbo", "Age": 28}
{"Name": "Frodo", "Age": 26}
{"Name": "Gandalf", "Age": 62}
{"Name": "Samwise", "Age": 30}
{"Name": "Sauron", "Age": 72}
{"Name": "Aragorn", "Age": 31}
```

Now, we can read in `ages.json` as a Spark Dataset:

In [None]:
df = spark.read.json('ages.json').repartition(10).cache()

Now we have a Dataset (also called DataFrame in accordance with Pandas) representing our data. We can leverage the Spark SQL API to calculate an aggregation over the dataset, which in our case is an average:

In [None]:
df.agg({"Age": "avg"}).collect()

We can also execute calculations at the row level. For example, let’s calculate each of the character’s age in dog years (age times 7):

In [None]:
df.withColumn('dog_years', df.Age*7).collect()

Best of all, this calculation would have scaled automatically across our computing cluster if we had more than one node. Notice something at the end of each of the commands above? If you are thinking, what does `.collect()` do then you’re onto something. 

Spark executes code lazily. This means that *transformations* such as calculating the characters’ age in dog years is only executed once an *action* is called. The `.withColumn()` command is a *transformation* while `.collect()` is the *action* which causes the *transformation* to be executed. Often, the *action* which causes execution of our *transformations* is writing the job’s output to disk, HDFS, or S3.

Let’s try to create a new Dataset which includes the characters’ ages in dog years, then let’s write this out to disk:

In [None]:
df_new = df.withColumn('dog_years', df.Age*7)

Now we have a new Dataset called `df_new`. Note that nothing has been calculated yet; we have simply mapped the function we want across the cluster so that when we call an action on `df_new` such as `.collect()` or try to write the output to disk the transformation will be executed.

We can write `df_new` to disk with the following:

In [None]:
df_new.write.mode('append').json("dog_years.json")

We can even execute a filter

In [None]:
filtered = df.filter("name = 'Bilbo'")

In [None]:
filtered.collect()

## Below are deprecated examples

if we want to go back to RDD based examples we can use this code:

In [None]:
import pyspark
sc = pyspark.SparkContext('local[*]')

In [None]:
objects = []
for i in range(1000):
    msg = {'id': i, 'payload': 'Here is an example payload for id {}'.format(i)}
    objects.append(msg)

Now, let's parallelize the array of messages and convert it into an RDD:

In [None]:
rdd = sc.parallelize(objects)

Now we can take a random sample of the `rdd` we have just created.

In [None]:
# we can take a sample of 5 messages without replacement
# this just means that once selected, a message is not available to be selected again
rdd.takeSample(False, 5)

When we operate on an RDD we need to use map functions which takes a function and applies it across the `rdd`. For example, we can map a function which grabs the message `id`. This new rdd, `rdd_ids` will only contain the `id` field.

In [None]:
rdd_ids = rdd.map(lambda x: x['id'])

We can take the first 5 items in `rdd_ids` to see what just happened.

In [None]:
rdd_ids.take(5)

Now we can apply a `reduce` style function, such as a sum, on our new `rdd`.

In [None]:
rdd_ids.sum()

We can even define more complicated functions which insert or manipulate data and `map` them to the `rdd`.

In [None]:
import random
def randomize_id(msg):
    msg['rand_id'] = msg['id'] * random.randint(1,100)
    return msg

rdd_rand_ids = rdd.map(lambda x: randomize_id(x))

Just to check what happened, let's take a look at the first 5 items in `rdd_rand_ids`. Notice that a new field, called `rand_id` has been added to each object. This is a new random id.

In [None]:
rdd_rand_ids.take(5)