# Working with Data using Spark and Python
## Creating and Working with a RDDs (Resillient Distributed Datasets)
In this section of the Notebook, we will first use Spark to interactively explore __Wikipedia__ data.  
__Note:__ For the purposes of reproducibility, the data can be downloaded from [here](http://d2xijq8wc2iink.cloudfront.net/ampcamp6/ampcamp6-rc3.tar.gz).

To start, we create an RDD named `pagecounts` from the input files in the `data` directory. Since the `SparkContext` is already created as variable `sc`, the first task is view this to confirm before creating the RDD.

In [None]:
sc

In [None]:
# Create pagecounts
pagecounts = sc.textFile("data/pagecounts") #stored on shared /vagrant

To view the data, we use the `take` operation of an RDD to get the first __K__ record were, K = 10.

In [None]:
pagecounts.take(10)

As we can see from the output above, the data is not easy to understand. This is because `take()` returns an array and Python simply prints the array with each element separated by a `,`. To alleviate this we can traverse the and print each record on its own line.

In [None]:
for i in pagecounts.take(10):
    print i

Although not much better, the above output elminites some of the array specifics to give us better view of the actual data within context. To further explore the data, we can subset or filter the based on specific criteria. In the following example, we will filter only the English pages (en). This is done by applying the `filter()` function to `pagecounts`. So for each record, we can split it by the field delimiter (i.e. a space) and get the second field (project code) and compare it with the string “en”. 

__TIP:__ To avoid reading from disks each time we perform any operations on the RDD, we also cache the RDD into memory. This is where Spark really starts to to shine. 

In [None]:
enPages = pagecounts.filter(lambda x: x.split(" ")[1] == "en").cache()

Using the `filter()` function is a lazy evaluation, so no computation is done yet. Next time any action is invoked on `enPages`, Spark will cache the data set in memory across the workers in your cluster. For example, we can view the total records of pages using the `count()` function and then compare that to the total number of records for English pages.

In [None]:
print "Total number of pages:",pagecounts.count()
print "Total numbner of English pages:", enPages.count()

Next we can generate a histogram of total page views on __Wikipedia__ English pages for a specific data range represented in our dataset (i.e. May 5 to May 7, 2009). The high level idea is as follows: 
1. Generate a key value pair for each line, the key is the date (the first eight characters of the first field), and the value is the number of pageviews for that date (the fourth field). 
2. Shuffle the data and group all values of the same key together. 
3. Sum up the values for each key. There is a convenient method called reduceByKey in Spark for exactly this pattern. By default, Spark assumes that the reduce function is commutative and associative and applies combiners on the mapper side. 

The `collect` method at the end converts the result from an RDD to an array.

In [None]:
# Split the Data
enTuples = enPages.map(lambda x: x.split(" "))

# Generate <Key><Value> pairs
enKeyValuePairs = enTuples.map(lambda x: (x[0][:8], int(x[3])))

# Shuffle the data and group all values of the same key together
enKeyValuePairs.reduceByKey(lambda x, y: x + y, 1).collect()

__Tip:__ We can combine the previous three commands into one:

In [None]:
enPages.map(lambda x: x.split(" ")).map(lambda x: (x[0][:8], int(x[3]))).reduceByKey(lambda x, y: x + y, 1).collect()

Suppose we want to find pages that were viewed more than 200,000 times during the three days covered by our dataset. Conceptually, this task is similar to the previous query. But, given the large number of pages (23 million distinct page names), the new task is very expensive. We are doing an expensive group-by with a lot of network shuffling of data. To recap: 
1. We split each line of data into its respective fields. 
2. We extract the fields for page name and number of page views. We reduce by key again, this time with 40 reducers. 
3. We filter out pages with less than 200,000 total views over our time window represented by our dataset.

In [None]:
enPages.map(lambda x: x.split(" ")).map(lambda x: (x[2], int(x[3]))).reduceByKey(lambda x, y: x + y, 40).filter(lambda x: x[1] > 200000).map(lambda x: (x[1], x[0])).collect()

## Data Exploration using Spark SQL
Spark SQL is the newest component of Spark and provides a SQL like interface. Spark SQL is tightly integrated with the the various spark programming languages.

In [None]:
# Verify the sqlContext
sqlContext

Note how the `sqlContext` is like the `HiveContect`, meaning that the use of SQL in this context is very similar to using SQL against a Hive Metastore in Hadoop. As in that case, where SQL commands are translated to MapReduce jobs, our SQL commands are translated to working RDD's.

To start we load a data frame in that is stored in the __Parquet__ format. Parquet is a self-describing columnar file format. Since it is self-describing, Spark SQL will automatically be able to infer all of the column names and their datatypes. For this part of the Notebook, we will use a set of data that contains all of the pages on wikipedia that contain the word “berkeley”. We will load this data using the input methods provided by SQLContext.

In [None]:
# Load the data from the /vagrant shared file system
wikiData = sqlContext.read.parquet("data/wiki.parquet")

The result of the above command is a SchemaRDD. A SchemaRDD has all of the functions of a normal RDD, for example, lets figure out how many records are in the data set.

In [None]:
wikiData.count()

In addition to standard RDD operatrions, SchemaRDDs also have extra information about the names and types of the columns in the dataset, just like the schema of RDBMS table. So basically, SchemaRDD = RDD [Row] + Schema. This extra schema information makes it possible to run SQL queries against the data after it has been registered as a table. Below is an example of counting the number of records using a SQL query.

In [None]:
# Create a temporary table for SQL queries
wikiData.registerTempTable("wikiData")

# SQL Query to count
result = sqlContext.sql("SELECT COUNT(*) AS pageCount FROM wikiData").collect()
print result

The result of SQL queries is always a collection of Row objects. From a row object we can access the individual columns of the result.

In [None]:
result[0].pageCount

SQL can be a powerful tool from performing complex aggregations. For example, the following query returns the top 10 usersnames by the number of pages they created.

In [None]:
sqlContext.sql("SELECT username, COUNT(*) AS cnt FROM wikiData WHERE username <> '' GROUP BY username ORDER BY cnt DESC LIMIT 10").collect()

## Data Exploration using Spark DataFrames
Now that we have a very basic idea of working with RDD's and using SQL queries against them, wwe next look at __Spark DataFrames__. The DataFrames API is inspired by data frames in `R` and `Python` (Pandas) and is used for storing tabular data, along with labeled axes for both rows and columns. Since the concept was introduced in `R` before it was in `Python` Pandas the methadology of working with DataFrames can be easly applied to Spark. DataFrames can be constructed from a wide array of sources such as: 
- structured data files
- tables in Hive
- external databases
- existing RDDs.

It is important however, to be cogniscient of some of the differences between data frames in `R` and `Python`. In `R`, a `data.frame()` is a list of vector variables of the same number of elements (rows) with unique row names. So, each column is a vector with an associated name, and each row is a series of vector elements that correspond to the same position in each of the column-vectors.

In `Python` Pandas, a DataFrame can be thought of as a dict-like container for Series objects, where a Series is a one-dimensional NumPy `ndarray` with axis labels (including time series). By default, each Series correspond with a column in the resulting DataFrame. To elaborate, we will show some exammples of using them with some basic statistical and mathamatical functions.

### Random Number Generation  
Random data generation is useful for testing of existing algorithms and implementing randomized algorithms, such as random projection. The following methods are under `sql.functions` for generating columns that contains Independent and identically distributed random variables (i.i.d.) values drawn from a distribution, e.g., uniform `rand`,  and standard normal `randn`.

In [None]:
from pyspark.sql.functions import rand, randn
# Create a DataFrame with one int column and 10 rows.
df = sqlContext.range(0, 10)
df.show()

In [None]:
# Generate two other columns using uniform distribution and normal distribution.
df.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal")).show()

### Descriptive and Summary Statistics  
Before performing any type of exploratory data analysis, typicall the first thing to do is to get an idea of what the data looks like. Usually, for numerical columns, knowing the descriptive summary statistics can help a lot in understanding the distribution of the data. In `R`, this is typically done using the `summary()` and `str()` functions. The following `python` code returns a DataFrame containing information such as number of non-null entries (count), mean, standard deviation, and minimum and maximum value for each numerical column. 

In [None]:
# A slightly different way to generate the two random columns
df = sqlContext.range(0, 10).withColumn('uniform', rand(seed=10)).withColumn('normal', randn(seed=27))
df.describe().show()

Should the DataFrame be large, we can also run describe on a subset of the columns:

In [None]:
df.describe('uniform', 'normal').show()

While the `describe()` function works well for exploratory data analysis, we can also generate a list of descriptive statistics and the columns they apply to using the normal select on a DataFrame:

In [None]:
from pyspark.sql.functions import mean, min, max
df.select([mean('uniform'), min('uniform'), max('uniform')]).show()

### Covariance and Correlation  
[Covariance](https://en.wikipedia.org/wiki/Covariance) is a measure of how two variables change with respect to each other. A *positive* number would mean that there is a tendency that as one variable __increases__, the other __increases__ as well. A *negative* number would mean that as one variable __increases__, the other variable has a tendency to __decrease__. The sample covariance of two columns of a DataFrame can be calculated as follows:

In [None]:
df = sqlContext.range(0, 10).withColumn('rand1', rand(seed=10)).withColumn('rand2', rand(seed=27))
df.show()
print "Covariance of the two columns is:", df.stat.cov('rand1', 'rand2')
print "Covariance of the `id` column with itself is:", df.stat.cov('id', 'id')

As can be seen from the above, the covariance of the two randomly generated columns, __rand1__ and __rand2__, is close to zero, while the covariance of the __id__ column with itself is very high. But what does the value of __9.17__ mean? That's where [Correlation](https://en.wikipedia.org/wiki/Correlation_and_dependence) comes into play. Correlation is a normalized measure of covariance that is easier to understand, as it provides a quantitative measurement of the statistical dependence between two random variables.

In [None]:
print "Correlation of the two columns is:", df.stat.corr('rand1', 'rand2')
print "Correlation of the `id` colum with itslef is:", df.stat.corr('id', 'id')

So, as can be seen, the __id__ column correlates perfectly with itself, while the two randomly generated columns have low correlation value.

### Cross Tabulation  
Cross Tabulation or [Contingency Tables](https://en.wikipedia.org/wiki/Contingency_table) is a table showing the freqency distribution for a set of variables and is therefore a powerful tool statistical that is used to observe the statistical significance (or independence) of variables. Usinf Spark DataFrames, we can obtain the frequency counts of the different pairs that are observed with the columns. 

__Note:__ The cardinality of columns we run crosstab on cannot be too big. That is to say, the number of distinct “name” and “item” cannot be too large. For example, if “item” contains 1 billion distinct entries, there would be no way to do exploratory data analysis as the resultant table would never fit onthe screen.

In [None]:
# Create a DataFrame with two columns (name, item)
names = ["Alice", "Bob", "Mike"]
items = ["milk", "bread", "butter", "apples", "oranges"]
df = sqlContext.createDataFrame([(names[i % 3], items[i % 5]) for i in range(100)], ["name", "item"])

# Display the first 10 records
df.show(10)

In [None]:
# Generate the Contingency Table
df.stat.crosstab("name", "item").show()

### Element Frequency  
Knowing the frequency of the occurance of data can be useful in understing the data. Using DataFrames, we can find the frequent items for a set of columns. Spark uses a [one-pass algorithm](http://dl.acm.org/citation.cfm?doid=762471.762473) proposed by Karp et al. This is a fast, approximate algorithm that always return all the frequent items that appear in a user-specified minimum proportion of rows. 

__Note:__ The result might contain false positives, i.e. items that are not frequent. 

In [None]:
df = sqlContext.createDataFrame([(1, 2, 3) if i % 2 == 0 else (i, 2 * i, i % 4) for i in range(100)], ["a", "b", "c"])
df.show(10)

In [None]:
# Given the above DataFrame, the following code finds the frequent items that show up 40% of the time for each column
freq = df.stat.freqItems(["a", "b", "c"], 0.4)
freq.collect()[0]

As can be seen from the outpu above, __11__ and __1__ are the frequent values for column __a__. We can also find frequent items for column combinations, by creating a composite column using the `struct()` function:

In [None]:
from pyspark.sql.functions import struct
freq = df.withColumn('ab', struct('a', 'b')).stat.freqItems(['ab'], 0.4)
freq.collect()[0]

From the above output, the combination of __a=11__ and __b=22___, and __a=1__ and __b=2__ appear frequently in this dataset.  

__Note__ that “a=11 and b=22” is a false positive.

### Mathematical Functions  
A number of methmatical functions can also be applied to rows and columns in a DataFrame. The inputs need to be columns functions that take a single argument, such as __cos__, __sin__, __floor__ and __ceil__. For functions that take two arguments as input, such as __pow__, __hypot__, either two columns or a combination of a double and column can be supplied, for example:

In [None]:
from pyspark.sql.functions import *
df = sqlContext.range(0, 10).withColumn('uniform', rand(seed=10) * 3.14)

# Reference a column or supply the column name
df.select('uniform',
          toDegrees('uniform'),
          (pow(cos(df['uniform']), 2) + pow(sin(df.uniform), 2)). \
          alias("cos^2 + sin^2")).show()

## Conclusion  
Now that we have a small overview of how to do some basic data exploration using Spark, we can apply these principles and do some exploratory data analysis.