# First things first...

Is Spark set up in this notebook? And is it configured for awesome?

I recommend `--master local[*]`, which sets up one worker for each core.

In [None]:
sc.master

In [None]:
# We can still use IPython
%ls data

## Create some RDDs the easy way

Spark was conceived as an extension to Map-Reduce, which initially targeted the Hadoop FileSystem (HDFS). Spark uses the notions of Resilient Distributed Datasets (RDDs) that remain in memory, and can abstract over a large variety of storage back-ends.

Here, we see how we can work with small data (by some definition) locally without resorting to a cluster.

We can easily replace these RDDs with, e.g., an HDFS-backed datastore and cluster, and execute the identical logic. The plan is to introduce this next week.

*Note - you will need to copy these data files into the repo or adjust the paths*

In [None]:
permits = sc.textFile('data/Building_Permits.csv')
violations = sc.textFile('data/Building_Violations.csv')

Note that those functions returned way faster than the time needed to load the files... Spark is [lazy](http://en.wikipedia.org/wiki/Lazy_evaluation).

## Classic Spark

Don't worry about efficiency, that's what the other cores are for

In [None]:
# Let's do functional programming the way Guido likes it

# var, = ... unpacks a sequence of length 1. It is similar to:
# var = ...[0]
# asseret(len(...) == 1)
permit_header_str, = permits.take(1)

# Here, we pass a generator expression to enumerate within a dict comprehension
# This is very similar to how Spark works, and is *very* memory efficient
permit_header_locs = {name: loc for loc, name in 
               enumerate(val.strip() for val in header_str.split(','))}
permit_header_locs

## Learning some real Spark

The core of spark is functional programming over RDDs. Most of you have probably heard of map-reduce. `map` and `reduce` are actually two separate functional primitives, and Spark decouples a small set of powerful primitives that are readiliy mapped to parallel computation over RDD "partitions."

In general, these functions take other functions as arguments. PySpark lets you use python functions. Arbitrary expressions (over one variable) can be converted to functions using lambda expressions.

Let's explore some of these now.

### Filter

`filter` iterates over each item in the RDD and returns a new RDD limited to the items where the predicate evaluates to `True`.

In [None]:
# This is dumb. In real life, pre-process your data.
# Or parse the files before handing them to Spark.
# But, it's a nice intro to Spark...
permit_lines = permits.filter(lambda x: x != permit_header_str)

### Map

`map` returns a set of transformed items that result from applying a predicate to each value. Here, each string gets mapped to a list of strings.

In [None]:
# Note that the data is still unparsed (which is efficient, actually)
# We are now getting into functional programming with Spark...
permit_data = permit_lines.map(lambda x: [val.strip() for val in x.split(',')])

In [None]:
permit_data.take(2)

## First task

Let's convert the above process to a reusable function. 

### Go!

In [None]:
# Assemble the functional statements from above into a re-usable function
def simple_csv(rdd):
    return None, rdd

# And apply it to our violation data
violation_header_locs, violation_data = simple_csv(violations)

## Other approaches to reading CSV

@seahboonsiew wrote some [native PySpark](https://github.com/seahboonsiew/pyspark-csv/blob/master/pyspark_csv.py) to do this with more bells and whistles (but note that it is more expensive: parsing everything).

You can also feed Pandas DataFrames to Spark SQL DataFrames (and you can use chunking if need be!) [using standard Spark](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext.createDataFrame). And similarly, [you can convert back to Pandas](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.toPandas).

*NB: there is almost 0 chance you will write a better CSV parser than the one in Pandas.* It will also allow you to avoid parsing columns you don't want to parse (so you don't lose the efficiency we talked about above). But, the parser in Pandas wasn't architected to handle distributed parsing. Don't try to be too clever on this point... just solve *your* problem (not a more general one).

I would probably use @mrocklin's [Blaze](http://matthewrocklin.com/slides/sfpython-blaze.html#/2/3), but note that this is alpha/beta code. He's eager to help with social science test-cases though!

## Other important functional primitives

### Flat Map

`.flatMap()` will take sequences of 0 or more elements (`None` is simply discarded) and combine them into a single sequence. I won't cover this today

### Reduce

`.reduce()` is also in there, and will apply an accumulating function across elements using a pair-wise function. There are many trivial reducing functions such as `.count()`, `.mean()`, `.min()`, and so on. Most of these could be easily implemented with `.reduce()`.

In [None]:
lat_col = violation_header_locs['LATITUDE']
lat_col

In [None]:
x, = violation_data.take(1)
float(x[lat_col])

In [None]:
def safe_convert(x):
    try:
        return float(x)
    except:
        return None

In [None]:
violation_lats = violation_data.map(lambda x: x[lat_col]).map(safe_convert)
violation_lats.count()

In [None]:
# This should work, but I'm not sure why it doens't?
violation_lats.mean()

Note that operations like the above should be done in one pass for efficiency! You can also create a Python object that extracts and represents the quantities you're interested in (getting behavior somewhat like an ORM).

# SQL-like operations

PySpark will do SQL-like operations over regular RDDs, but you should check out [Spark SQL](http://spark.apache.org/docs/latest/sql-programming-guide.html) if you want the best solution.

Here, we demo `.keyBy()` and `.join()`. Note that we also have other operations like `.groupBy()`. You can map most SQL concepts onto Spark.

# MLlib

Always refer to [the docs](http://spark.apache.org/docs/latest/mllib-guide.html).