#Learning Python and Apache Spark

This tutorial will teach you the basics of Python required to use the large-scale data processing framework (Apache Spark).

This tutorial assumes basic knowledge of Python

##Part 1: Learning the ipynb

As you work through a notebook it is important that you run all of the code cells. The notebook is stateful, which means that variables and their values are retained until the notebook is detached (in Databricks Cloud) or the kernel is restarted (in IPython notebooks). If you do not run all of the code cells as you proceed through the notebook, your variables will not be properly initialized and later code might fail. You will also need to rerun any cells that you have modified in order for the changes to be available to other cells.

In [None]:
# This is a Python cell. You can run normal Python code here...
print ('Hello World!')

In [None]:
# Here is another Python cell, this time with a variable (x) declaration and an if statement:
# Notice the lack of semi-colons and the strict indentation
x = 'Hello World!'
if len(x) > 5:
    print (x)

Do take note that the variable x will exist for the entire notebook once you run the above cell.

Now it is your turn! Write a function that takes in a String and returns itself repeated twice.

i.e. `repeatTwice('Hello World!')` returns `'Hello World!Hello World!`.

In [None]:
def repeatTwice(x):
    return <FILL IN>

Now time to test out your method!

In [None]:
assert repeatTwice(x)=='Hello World!Hello World!', 'output does not match!'
assert repeatTwice('Clarence Ngoh Peng Yu')=='Clarence Ngoh Peng YuClarence Ngoh Peng Yu', 'output does not match!'
assert repeatTwice('1243owroewhf8erhjsp8239uroijs')=='1243owroewhf8erhjsp8239uroijs1243owroewhf8erhjsp8239uroijs', 'output does not match!'
print ('All passed!')

##Part 2: An introduction to using Apache Spark with the Python pySpark API running in the browser

In order to use Spark and its API we will need to use a SparkContext. When running Spark, you start a new Spark application by creating a SparkContext. When the SparkContext is created, it asks the master for some cores to use to do work. The master sets these cores aside just for you; they won't be used for other applications. 

To start using Spark, you would need to initialize the Spark Context, as shown below:

In [None]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName('appName')
sc = SparkContext(conf=conf)

In this lab we will make a simple word count application and do simple data analytics on it!

We'll start with a simple Resilient Distributed Dataset. In Spark, we first create a base RDD. We can then apply one or more transformations to that base RDD. An RDD is immutable, so once it is created, it cannot be changed. As a result, each transformation creates a new RDD. Finally, we can apply one or more actions to the RDDs. Note that Spark uses lazy evaluation, so transformations are not actually executed until an action occurs.

Let's start by creating a word RDD from a python list!

In [None]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
# Print out the type of wordsRDD
print (type(wordsRDD))

Let's transform our RDD a bit, in Spark, we do this with `map` operations, which take in a function.

We first have to write the function to transform our dataset. Write a function to append `'.com'` to the end of a string. 


In [None]:
def webify(x):
    return <FILL IN>

print (webify('cat'))

In [None]:
assert webify('lol') == 'lol.com',  'output does not match!'
assert webify('sdufhoadfhasdpfup923') == 'sdufhoadfhasdpfup923.com',  'output does not match!'
assert webify('youtube') == 'youtube.com',  'output does not match!'
assert webify('x.x.x.x.x') == 'x.x.x.x.x.com',  'output does not match!'
print ('All tests passed!')

Now, we can webify our words RDD and see what it produces!

In [None]:
webifiedRDD = wordsRDD.map(<FILL IN>)
print (webifiedRDD.collect())

In [None]:
assert webifiedRDD.collect() == ['cat.com', 'elephant.com', 'rat.com', 'rat.com', 'cat.com'],  'output does not match!'
print ('All tests passed!')

Now we should learn a cleaner approach towards writing this code.

We introduce lambda functions (anonymous functions), which are of the form 

`lambda <args> : <method body>`

so for instance, if we wanted a map a function that pluralizes all the words in the RDD, we would write the below code

`wordsRDD.map(lambda word : word + 's')`

Now it's your turn! Write a lambda function that does the exact same thing webify.

In [None]:
webifiedRDD = wordsRDD.map(<FILL IN>)
print (webifiedRDD.collect())

In [None]:
assert webifiedRDD.collect() == ['cat.com', 'elephant.com', 'rat.com', 'rat.com', 'cat.com'],  'output does not match!'
print ('All tests passed!')

Pair RDDs 

The next step in writing our program is to create a new type of RDD, called a pair RDD. A pair RDD is an RDD where each element is a pair tuple (k, v) where k is the key and v is the value. In this example, we will create a pair consisting of (`'<word>', 1`) for each word element in the RDD.

We can create the pair RDD using the map() transformation with a lambda() function to create a new RDD.

In [None]:
# Note that map operations may return values of a different type!
wordPairs = wordsRDD.map(<FILL IN>)
print (wordPairs.collect())

In [None]:
assert wordPairs.collect() == [('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)],  'output does not match!'
print ('All tests passed!')

`groupByKey()` approach **
An approach you might first consider (we'll see shortly that there are better ways) is based on using the [groupByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupByKey) transformation. As the name implies, the `groupByKey()` transformation groups all the elements of the RDD with the same key into a single list in one of the partitions. There are two problems with using `groupByKey()`:
  + #### The operation requires a lot of data movement to move all the values into the appropriate partitions.
  + #### The lists can be very large. Consider a word count of English Wikipedia: the lists for common words (e.g., the, a, etc.) would be huge and could exhaust the available memory in a worker.
 
Use `groupByKey()` to generate a pair RDD of type `('word', iterator)`.

In [None]:
# Note that groupByKey requires no parameters
wordsGrouped = wordPairs.<FILL IN>
for key, value in wordsGrouped.collect():
    print ('{0}: {1}'.format(key, list(value)))

In [None]:
# TEST groupByKey() approach 
assert sorted(wordsGrouped.mapValues(lambda x: list(x)).collect()) == [('cat', [1, 1]), ('elephant', [1]), ('rat', [1, 1])], 'incorrect value for wordsGrouped'
print ('All tests passed!')

#### ** (2b) Use `groupByKey()` to obtain the counts **
#### Using the `groupByKey()` transformation creates an RDD containing 3 elements, each of which is a pair of a word and a Python iterator.
#### Now sum the iterator using a `map()` transformation.  The result should be a pair RDD consisting of (word, count) pairs.

In [None]:
# TODO: Replace <FILL IN> with appropriate code
wordCountsGrouped = wordsGrouped.<FILL IN>
print (wordCountsGrouped.collect())

In [None]:
# TEST Use groupByKey() to obtain the counts (2b)
assert sorted(wordCountsGrouped.collect()) == [('cat', 2), ('elephant', 1), ('rat', 2)], 'incorrect value for wordCountsGrouped'
print('All tests passed!')

#### ** (2c) Counting using `reduceByKey` **
#### A better approach is to start from the pair RDD and then use the [reduceByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey) transformation to create a new pair RDD. The `reduceByKey()` transformation gathers together pairs that have the same key and applies the function provided to two values at a time, iteratively reducing all of the values to a single value. `reduceByKey()` operates by applying the function first within each partition on a per-key basis and then across the partitions, allowing it to scale efficiently to large datasets.

In [None]:
# TODO: Replace <FILL IN> with appropriate code
# Note that reduceByKey takes in a function that accepts two values and returns a single value
wordCounts = wordPairs.reduceByKey(<FILL IN>)
print (wordCounts.collect())

In [None]:
# TEST Counting using reduceByKey (2c)
assert sorted(wordCounts.collect()) == [('cat', 2), ('elephant', 1), ('rat', 2)], 'incorrect value for wordCounts'
print('All tests passed!')

#### ** (2d) All together **
#### The expert version of the code performs the `map()` to pair RDD, `reduceByKey()` transformation, and `collect` in one statement.

In [None]:
# TODO: Replace <FILL IN> with appropriate code
wordCountsCollected = (wordsRDD
                       .map(<FILL IN>)
                       .reduceByKey(<FILL IN>)
                       .collect())
print (wordCountsCollected)

In [None]:
# TEST All together (2d)
assert sorted(wordCountsCollected) == [('cat', 2), ('elephant', 1), ('rat', 2)], 'incorrect value for wordCountsCollected'
print('All tests passed!')

### ** Part 3: Finding unique words and a mean value **

#### ** (3a) Unique words **
#### Calculate the number of words that are unique (i.e. only exists once) in `wordsRDD`.  You can use other RDDs that you have already created to make this easier. 
Hint: use the `filter` operation with the `wordCountsGrouped` RDD.

In [None]:
# TODO: Replace <FILL IN> with appropriate code
uniqueWords = wordCountsGrouped.<FILL IN>
print (uniqueWords)

In [None]:
# TEST Unique words (3a)
assert uniqueWords == 1, 'incorrect count of uniqueWords'
print('All tests passed!')

#### ** (3b) Mean using `reduce` **
#### Find the mean number of words per word in `wordCounts`.
#### Use a `reduce()` action to sum the counts in `wordCounts` and then divide by the number of unique words.  First `map()` the pair RDD `wordCounts`, which consists of (key, value) pairs, to an RDD of values.

In [None]:
# TODO: Replace <FILL IN> with appropriate code
from operator import add
totalCount = (wordCounts
              .map(<FILL IN>)
              .reduce(<FILL IN>)
numWords = wordCounts.<FILL IN>
average = totalCount / float(numWords)
print (totalCount)
print (round(average, 2))

In [None]:
# TEST Mean using reduce (3b)
assert round(average, 2) == 1.67, 'incorrect value of average'
print('All tests passed!')

### ** Part 4: Analysing real datasets **

#### In this section we will make use of real data from a csv file, load in our data source, and do some analytics with Apache Spark on the new dataset.

#### ** (4a) Load a text file **
#### For the next part of this lab, we will use the [Real Estate Transactions](http://samplecsvs.s3.amazonaws.com/Sacramentorealestatetransactions.csv) from [SpatialKey Sample CSV Data](https://support.spatialkey.com/spatialkey-sample-csv-data/). The Sacramento real estate transactions file is a list of 985 real estate transactions in the Sacramento area reported over a five-day period, as reported by the Sacramento Bee. Note that this file has address level information that you can choose to geocode, or you can use the existing latitude/longitude in the file.

In [None]:
# Just run this code
import os.path
from pyspark.sql import Row
#import pyspark_csv as pycsv

baseDir = os.path.join('/tmp')
inputPath = os.path.join('data', 'Sacramentorealestatetransactions.csv')
csvFileName = os.path.join(baseDir, inputPath)

#attributes are from the first row

sacramentoWithHeaderRDD = (sc.textFile('/tmp/data/Sacramentorealestatetransactions.csv')
                .map(lambda line : line.split(','))
                .map(lambda x : Row(
            street = x[0],
            city = x[1],
            zipcode = x[2],
            state = x[3],
            beds = x[4],
            baths = x[5],
            sqfeet = x[6],
            typeOfHouse = x[7],
            saleDate = x[8],
            price = x[9],
            latitude = x[10],
            longitude = x[11]
        )))
header = sacramentoWithHeaderRDD.take(1)[0]
sacramentoRDD = sacramentoWithHeaderRDD.filter(lambda line: line != header)
print (sacramentoRDD.top(10))

#### ** (4b) Find median, mean and standard deviation of the prices **
####As a real estate company, you may be interested in the mean and standard deviation of the prices of the houses in the area. In this section, we will be referencing the Apache PySpark API to calculate these values.

[Apache PySpark API](http://spark.apache.org/docs/latest/api/python/pyspark.html)

In [None]:
#Map the RDD to prices
sacramentoPricesRDD = sacramentoRDD.map(<FILL IN>)

#Find the mean
mean = sacramentoPricesRDD.mean()
print (mean)

#Find the standard deviation
stDev = sacramentoPricesRDD.stdev()
print (stDev)

In [None]:
# TEST Mean, StDev (4b)
assert mean == 234144.26395939104, "invalid mean"
assert round(stDev,2) == 138295.58, "invalid stDev"
print("All passed!")

#### ** (4c) Find the most expensive house **
####Now let's try to find the most expensive house in the region. You will be using the `takeOrdered()` for this. takeOrdered takes in an integer n of the number of houses you want to search. The key is the function to be applied to the elements before they are sorted in ascending order.

[Apache PySpark API](http://spark.apache.org/docs/latest/api/python/pyspark.html)

In [None]:
mostExpensiveHouse = sacramentoRDD.takeOrdered(1, lambda x: -float(x.price))
print (mostExpensiveHouse[0])

In [None]:
assert mostExpensiveHouse[0].price == str(884790), "invalid most expensive house"
print ('All tests passed')

#### ** (4d) Find the top-k cheapest house **
####Now let's try to find the top-k cheapest houses in the region. The top-k notation is just a way of saying that you are taking the first `k` number of entries. You will be using the `takeOrdered()` for this. takeOrdered takes in an integer n of the number of houses you want to search. The key is the function to be applied to the elements before they are sorted in ascending order.

[Apache PySpark API](http://spark.apache.org/docs/latest/api/python/pyspark.html)

In [None]:
#topKCheapestHouses takes in an integer k and returns a list of k entries of houses
def topKCheapestHouses (k):
    kCheapestHouses = sacramentoRDD.takeOrdered(k, lambda x: float(x.price))
    return kCheapestHouses

print (topKCheapestHouses(10))

In [None]:
assert topKCheapestHouses(10) == [Row(baths='3', beds='3', city='LINCOLN', latitude='38.851645', longitude='-121.231742', price='1551', saleDate='Fri May 16 00:00:00 EDT 2008', sqfeet='0', state='CA', street='3720 VISTA DE MADERA', typeOfHouse='Residential', zipcode='95648'), Row(baths='4', beds='3', city='SLOUGHHOUSE', latitude='38.490447', longitude='-121.129337', price='2000', saleDate='Fri May 16 00:00:00 EDT 2008', sqfeet='5822', state='CA', street='14151 INDIO DR', typeOfHouse='Residential', zipcode='95683'), Row(baths='0', beds='0', city='LINCOLN', latitude='38.885327', longitude='-121.289412', price='4897', saleDate='Mon May 19 00:00:00 EDT 2008', sqfeet='0', state='CA', street='20 CRYSTALWOOD CIR', typeOfHouse='Residential', zipcode='95648'), Row(baths='0', beds='0', city='LINCOLN', latitude='38.885132', longitude='-121.289405', price='4897', saleDate='Mon May 19 00:00:00 EDT 2008', sqfeet='0', state='CA', street='24 CRYSTALWOOD CIR', typeOfHouse='Residential', zipcode='95648'), Row(baths='0', beds='0', city='LINCOLN', latitude='38.884936', longitude='-121.289397', price='4897', saleDate='Mon May 19 00:00:00 EDT 2008', sqfeet='0', state='CA', street='28 CRYSTALWOOD CIR', typeOfHouse='Residential', zipcode='95648'), Row(baths='0', beds='0', city='LINCOLN', latitude='38.884741', longitude='-121.28939', price='4897', saleDate='Mon May 19 00:00:00 EDT 2008', sqfeet='0', state='CA', street='32 CRYSTALWOOD CIR', typeOfHouse='Residential', zipcode='95648'), Row(baths='0', beds='0', city='LINCOLN', latitude='38.884599', longitude='-121.289406', price='4897', saleDate='Mon May 19 00:00:00 EDT 2008', sqfeet='0', state='CA', street='36 CRYSTALWOOD CIR', typeOfHouse='Residential', zipcode='95648'), Row(baths='0', beds='0', city='LINCOLN', latitude='38.884535', longitude='-121.289619', price='4897', saleDate='Mon May 19 00:00:00 EDT 2008', sqfeet='0', state='CA', street='40 CRYSTALWOOD CIR', typeOfHouse='Residential', zipcode='95648'), Row(baths='0', beds='0', city='LINCOLN', latitude='38.88459', longitude='-121.289835', price='4897', saleDate='Mon May 19 00:00:00 EDT 2008', sqfeet='0', state='CA', street='44 CRYSTALWOOD CIR', typeOfHouse='Residential', zipcode='95648'), Row(baths='0', beds='0', city='LINCOLN', latitude='38.884667', longitude='-121.289896', price='4897', saleDate='Mon May 19 00:00:00 EDT 2008', sqfeet='0', state='CA', street='48 CRYSTALWOOD CIR', typeOfHouse='Residential', zipcode='95648')], "incorrect top-k cheapest function"
print("All tests passed")

#### ** (4e) Data visualization with matplotlib **
####Now let's try to visualize the price data in a box-plot. This functionality is provided by matplotlib. We have provided the code here for you, but feel free to explore the API and see what other data you can gather from this dataset!

[Matplotlib API](http://matplotlib.org/contents.html)

In [None]:
#Just run the code!
#required to print inline
%matplotlib inline  

#import the libraries
import matplotlib
import matplotlib.pyplot as plt

#map the data to prices and plot
data = sacramentoRDD.map(lambda house : float(house.price)).collect()
plt.boxplot(data, notch=0, sym='+', vert=1, whis=1.5)