# Module 2 - Lab 1: Feature Extraction
The pupose of this lab is to practice preparing data to be used with Spark. Here will learn how to obtain, process, and prepare the data. This is necessary because a dataset with an inconsistent format will be difficult to query and manage. Upon completing this lab, you should be able to load different datasets, query the datasets, transform the data (if needed), and extract features from the data that are useful for your study of the data. 

You can think of these preperation steps as a spark specific implmentation of some of the concepts of data cleaning that you learned in the Intro class. 

In [1]:
# you need to run some cell to get pyspark started.

# Make Random String

import os, random, string

length = 32

chars = string.ascii_letters + string.digits + '_-'

random.seed = (os.urandom(1024))

rndfolder = ''.join(random.choice(chars) for i in range(length))

dirpath = '/home/hadoop/work_dir/' + rndfolder + '/'

# Set Path and permissions ("0770", which means everyone can read and write the file) 
os.mkdir(dirpath, 0770)
os.chdir(dirpath)

print "done!"

Creating SparkContext as 'sc'


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
231,,pyspark,idle,,,✔


Creating HiveContext as 'sqlContext'
SparkContext and HiveContext created. Executing user code ...
done!

Here, I will explain what has happened in the code block above. 
In the above code block, we import modules in order to access the methods we need. 
Without the modules, we can't use the methods that the modules possess.

We assign the value 32 to the variable 'length'.
This will be the length of our random folder name.

We include all of the ascii letters, digits, and symbols '_' and '-' in the varible 'chars'.
We will use this to select characters randomly for our random folder name.

The 'random' class implements pseudo-random number generators for a multitude of uses.
The 'seed()' method needs a value in order to initialize the random number generator.
The 'os.urandom()' method returns a string of random bytes to be used to initialize a generator.
Ours returns 1024 bytes.

This piece of code `rndfolder = ''.join(random.choice(chars) for i in range(length))` uses the 'join()' method is used on the string ''.
It returns a concatentation of the strings in the iterable parameter provided to the method. 
In the parameter: 
* `random.choice(chars)` returns a random element from the chars variable,
* `for i in range(length)` is a for loop that iterates range(length) times (which is 32)
* So the `random.choice(chars)` returns an element once, and then, again for every iteration of the for loop

The variable 'dirpath' is the string for the name of our random folder.
The folder is necessary for our machine to return images. This is very simplified, but it is a densely complex matter, so I will spare you the details.

This line `os.mkdir(dirpath, 0770)` creates a directory with the name 'dirpath' and provides the permission given in the second parameter.
This line `os.chdir(dirpath)` changes the current working directory to that of our new directory (folder).

Below is a function to prepare a figure and return a URL to it. We are using Amazon's "Elastic Map Reduce" Infrastructure to run SPARK. This Infrastructure has some limits to, among them, we cannot generate graphics on it. What we can do is write visualizations to a file, then Provide the URL where you can click and see the results. 

The `process_figure` function created below will be used to print URL's for output throughout this notebook. 

In [3]:
# This defintion defines a function (or method) called 'process_figure'.
# The function takes in two parameters called 'fig' and 'name', respectively.
def process_figure(fig, name):
    # The following line saves the figure. 'name' is the file name of the figure. 
    fig.savefig(name)
    # printing
    print 'http://ec2-54-153-99-19.us-west-1.compute.amazonaws.com:8810/' + rndfolder + '/' + name

# Importing several modules below and printing 'Hello World' at the end.
import matplotlib
matplotlib.use('agg') # non-graphical mode (pngs(?))
import matplotlib.pyplot as plt

import numpy as np

import datetime

import re

from scipy import sparse as sp

from pyspark.mllib.feature import Normalizer

print "Hello World"

Hello World

# Exploring the user dataset

Load in the dataset here, and observe the data sturcture by looking at the first line of data. The first() function is similar to the collect() function, except that it only returns the first element of the RDD to the driver. The take(k) function could have been used to obtain the first k elements of the RDD.

There are five fields in this data set:
 - user id
 - age
 - gender
 - occupation
 - zip code
 
Below, we are creating a new RDD from the text of the file "ml-100k/u.user" with the function textFile(), which transforms the text in a file into an RDD. We then, grab the first line of the dataset and display it.

In [4]:
user_data = sc.textFile("ml-100k/u.user")
user_data.first()

u'1|24|M|technician|85711'

Below, we do the same transformation as we did above. We then, take the first ten lines of the dataset and display them.

In [5]:
user_data = sc.textFile("ml-100k/u.user")
user_data.take(10)

[u'1|24|M|technician|85711', u'2|53|F|other|94043', u'3|23|M|writer|32067', u'4|24|M|technician|43537', u'5|33|F|other|15213', u'6|42|M|executive|98101', u'7|57|M|administrator|91344', u'8|36|M|administrator|05201', u'9|29|M|student|01002', u'10|53|M|lawyer|90703']

In the next section of code, we obtain the count for each of the fields within each line where that makes sense. Knowing the number of users, genders, occupations and zip codes makes logical sense. Knowing the number of distinct ages makes less less sense, and we will work with age data later. 

This descriptive summary will give us an idea of how many values for each field there are. We are exploring the data. 

To do this, we first transform the data by splitting each line around the `|` character. This gives us an RDD ( [Resilient Distributed Data Set](http://spark.apache.org/examples.html) ) with each record being a Python list containing the user ID, age, gender, occupation, and ZIP code fields. Next, we count the number of users, genders, occupations, and ZIP codes. Note: We are NOT cacheing the data here because the RDD is small in size. 

This is where variables referenced in the code below for accessing the data are instantiated (created)
The map() function applies the given function (in the parameter list) to every element in the object that it is called upon.
A lambda function is an anonymous function, which is utilized on one particular instance and then discarded.

The line `user_fields = user_data.map(lambda line: line.split("|"))` splits every element `line` in user_data by the given character.

The line `num_users = user_fields.map(lambda fields: fields[0]).count()` counts all of the elements in the first set of the fields array.

The line `num_genders = user_fields.map(lambda fields: fields[2]).distinct().count()` counts the distinct elements that are within the third set of the fields array. 
(We begin arrays with index 0, so '2' indicates the third value: 0, 1, 2, ....)

The line `num_occupations = user_fields.map(lambda fields: fields[3]).distinct().count()` counts the distinct elements in the fourth set of the fields array.

The line `num_zipcodes = user_fields.map(lambda fields: fields[4]).distinct().count()` also counts the distinct elements. However, it is the fifth set of the fields array.

We then print several values. 

In [6]:
user_fields = user_data.map(lambda line: line.split("|"))

num_users = user_fields.map(lambda fields: fields[0]).count()

num_genders = user_fields.map(lambda fields: fields[2]).distinct().count()

num_occupations = user_fields.map(lambda fields: fields[3]).distinct().count()

num_zipcodes = user_fields.map(lambda fields: fields[4]).distinct().count()

print "Users: %d, genders: %d, occupations: %d, ZIP codes: %d" % (num_users, num_genders, num_occupations, num_zipcodes)

Users: 943, genders: 2, occupations: 21, ZIP codes: 795

Here we gather the values for the ages present within the dataset. 

We use the map function to cast every value at `x[1]` in the user_fields dataset as an `int`. 
We use `collect()` to return a list of all of the objects within the RDD. 

In [7]:
ages = user_fields.map(lambda x: int(x[1])).collect()

Next we plot the values in a histogram. (Hence the `hist()` function.) The `normed=True` statement means that we have specified that we want the histogram to be normalized meaning that each subset represents the percentage of the overall data that falls into each subset. We can see that the ages of the MovieLens users are slightly skewed towards younger viewers. 

`bins` is a parameter that tells us how many different lines to put into the histogram. We could change the number of bins and see how it affects the appearance of the graph. Go ahead and give that a try. 

In [8]:
plt.hist(ages, bins=20, color='lightblue', normed=True)
fig = plt.gcf()
fig.set_size_inches(16, 10)
# remember when we defined the process_figure function earlier?
# This is where we print out a URL 
process_figure(fig, 'ages.png')
plt.clf() # clear the plot after you're done, or it will interfere with others

http://ec2-54-153-99-19.us-west-1.compute.amazonaws.com:8810/7iRcf7wFXmtkEPgNnqVlFd8Dr-P3L4Ok/ages.png

The query below selects the occupation information from the dataset and plots the information in a bar graph, which allows us to see the distribution of data. 

Here, we look at the relative frequencies of the various occupations of the users. 

We use the MapReduce approach to count the occurrences of each occupation in the dataset. [Hadoop MapReduce](http://www.datavard.com/blog/hadoop-in-a-nutshell-technical-overview/) is a technology upon which Spark sits. It is, in short, a way of distributing processing over a number of different machines in a cluster. 
The key difference between Hadoop MapReduce and Spark is that Spark performs the processing in-memory processing.

NOTE: `reduceByKey` : Explanation
When called on a dataset of (K, V) pairs, 
returns a dataset of (K, V) pairs where the values for each key are aggregated 
using the given reduce function func, which must be of type (V,V) => V. 
In the example below we are adding up the number of people in each occupation
More simply, this code is counting up all the people in each occupation.

`count_by_occupation` is the counting of the total people in an occupation
`user_fields` is defined above 
`map()` is a python function
`lambda` is a way of definining an anonymous python function 
`fields` is part of the lambda function
for more on lambda functions, see this link: 
* https://pythonconquerstheuniverse.wordpress.com/2011/08/29/lambda_tutorial/

The line of code `count_by_occupation = user_fields.map(lambda fields: (fields[3], 1)).reduceByKey(lambda x, y: x + y).collect()` uses the `map()` function to apply the lambda expression to every field within the 
`user_fields` set. These fields are the occupations. We end up with a dataset of (K, V) pairs, which will be used by the `reduceByKey()` function.
The `reduceByKey()` function is called on the dataset of tuples.  
The lambda inside the `reduceByKey()` function returns the sum of 1 and the value at `fields[3]` for each tuple in the dataset. 
The `collect()` function returns a list that contains all of the elements in this RDD.

The `x_axis1` variable is assigned the list of occupations from the dataset. The for loop inside the parentheses means this: For every line `c` in the `count_by_occupation` variable, add the value at the first index to our created numpy array. (To learn more about the Numpy library, visit this link: http://cs231n.github.io/python-numpy-tutorial/.) A Numpy array is a grid of values (all of which are the same type) and is indexed by a tuple of nonnegative integers. Numpy arrays are high-performance, multidimensional arrays and as such, are useful for graphs, visualization, and representation for scientific measures in general. 

The `y_axis1` variable is assigned the number of individuals for each occupation in a similar fashion to the `x_axis1` variable.

We then print the values of both arrays. We can see the occupation list and the number of users per occupation. This comes from the calculations above. We got our answer from the first line of code in the following code block. We were able to obtain the occupations from each line in the RDD via the `map()` function/lambda statement combination and count the number of users per occupation via the `reduceByKey()` function. 

In [9]:
count_by_occupation = user_fields.map(lambda fields: (fields[3], 1)).reduceByKey(lambda x, y: x + y).collect()
x_axis1 = np.array([c[0] for c in count_by_occupation])
y_axis1 = np.array([c[1] for c in count_by_occupation])

# The print statements below will help you see that the reduceByKey function called in count_by_occupation
# is then put into variables we will next graph
print(x_axis1)
print(y_axis1)

[u'administrator' u'writer' u'retired' u'student' u'doctor'
 u'entertainment' u'marketing' u'executive' u'none' u'scientist'
 u'educator' u'lawyer' u'healthcare' u'technician' u'librarian'
 u'programmer' u'artist' u'salesman' u'other' u'homemaker' u'engineer']
[ 79  45  14 196   7  18  26  32   9  31  95  12  16  27  51  66  28  12
 105   7  67]

In [10]:
# You can also see this if we simply print count_by_occupation
print(count_by_occupation)

[(u'administrator', 79), (u'writer', 45), (u'retired', 14), (u'student', 196), (u'doctor', 7), (u'entertainment', 18), (u'marketing', 26), (u'executive', 32), (u'none', 9), (u'scientist', 31), (u'educator', 95), (u'lawyer', 12), (u'healthcare', 16), (u'technician', 27), (u'librarian', 51), (u'programmer', 66), (u'artist', 28), (u'salesman', 12), (u'other', 105), (u'homemaker', 7), (u'engineer', 67)]

Then, once we have collected the RDD of counts per occupation, we convert it into two arrays for the x axis (which is the occupation) and the y axis (which represents the counts). The bar() function plots a bar graph. We need to the sort the count data so that the chart is ordered from the lowest to the highest count. This is a visual step, but understandable.

The collect() function doesn't return the RDD in any particular order. 

We sort the data by first creating two numpy arrays, and then using argsort() to select the elements from each array ordered by the count data in an ascending fashion. 

The image shows that the most prevalent occupations are student, other, educator, adminstrator, engineer, and programmer.

In [11]:
x_axis = x_axis1[np.argsort(y_axis1)]
y_axis = y_axis1[np.argsort(y_axis1)]

pos = np.arange(len(x_axis))
width = 1.0

ax = plt.axes()
ax.set_xticks(pos + (width / 2))
ax.set_xticklabels(x_axis)

plt.bar(pos, y_axis, width, color='lightblue')
plt.xticks(rotation=30)
fig = plt.gcf()
fig.set_size_inches(16, 10)
process_figure(fig, 'occupation.png')
plt.clf()

http://ec2-54-153-99-19.us-west-1.compute.amazonaws.com:8810/7iRcf7wFXmtkEPgNnqVlFd8Dr-P3L4Ok/occupation.png

The above code block does the following:
* In this line `x_axis = x_axis1[np.argsort(y_axis1)]` the `np.argsort(y_axis1)` function takes in array to sort (in our case this is the `y_axis1` array), and then, returns the same array, except now, it is sorted.
* The same story occurs for `y_axis`.
* We then, set up our figure, which is a bar graph. We should see the occupations listed on the x axis, and the number of users in each occupation listed on the y axis. 

This query below counts the values for each occupation, and displays them. There were two different approaches used to count the values. countByValue() counts the occurrences of each unique value in the RDD and returns it to the driver as a Python dict() method. Notice that the results are the same for each approach.

In [12]:
count_by_occupation2 = user_fields.map(lambda fields: fields[3]).countByValue()
print "countByValue approach:"
print dict(count_by_occupation2)
print ""
print "Map-reduce approach:"
print dict(count_by_occupation)

countByValue approach:
{u'administrator': 79, u'retired': 14, u'lawyer': 12, u'healthcare': 16, u'marketing': 26, u'executive': 32, u'scientist': 31, u'student': 196, u'technician': 27, u'librarian': 51, u'programmer': 66, u'salesman': 12, u'homemaker': 7, u'engineer': 67, u'none': 9, u'doctor': 7, u'writer': 45, u'entertainment': 18, u'other': 105, u'educator': 95, u'artist': 28}

Map-reduce approach:
{u'administrator': 79, u'executive': 32, u'retired': 14, u'doctor': 7, u'entertainment': 18, u'marketing': 26, u'writer': 45, u'none': 9, u'healthcare': 16, u'scientist': 31, u'homemaker': 7, u'student': 196, u'educator': 95, u'technician': 27, u'librarian': 51, u'programmer': 66, u'artist': 28, u'salesman': 12, u'other': 105, u'lawyer': 12, u'engineer': 67}

# Exploring the [MovieLens](https://movielens.org) dataset

MovieLens is not just *any* dataset. It is the original recommender system dataset, and exists as an ongoing project and important artifact in introduction of recommendation systems that are now part of our every day life. The conference where these systems originated is Dr. Goggins "home conference" academically. I did my masters degree at Minnesota, taking classes from each of the MovieLens founders in the [GroupLens](https://en.wikipedia.org/wiki/GroupLens_Research) research group. This is some fun stuff!

Here, we repeat the above process, except with a different dataset - one about movies. :) You can see the structure of the first line of data within the dataset.

We are looking at the **u.item** file from the [MovieLens 100k Database](http://files.grouplens.org/datasets/movielens/ml-100k-README.txt)

In [13]:
movie_data = sc.textFile('ml-100k/u.item')
print movie_data.first()
num_movies = movie_data.count()
print 'Movies: %d' % num_movies

1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
Movies: 1682

The function below handles errors in the parsing of the "release date" field. 

In [14]:
def convert_year(x):
    try:
        return int(x[-4:])
    except:
        return 1900

We can use our utility function:

    convert_year(x) 

to parse the years of the release using the map transformation and collect the results. Once we have assigned the year 1900 to any errors when parsing the data, we can filter the data using the filter() function. 

The structure of the movie data is:
 - Movie ID :: **NOTE** The movie ids are the ones used in the u.data data set.
 - Movie Name & Release Year
 - Release DATE
 - URL
 - Genre Information Fields (19)  The last 19 fields are the genres, a 1 indicates the movie is of that genre, a 0 indicates it is not; movies can be in several genres at once.  
   - unknown
   - Action
   - Adventure 
   - Animation 
   - Children's 
   - Comedy 
   - Crime 
   - Documentary 
   - Drama 
   - Fantasy
   - Film-Noir 
   - Horror 
   - Musical
   - Mystery
   - Romance 
   - Sci-Fi
   - Thriller 
   - War 
   - Western

In [15]:
movie_fields = movie_data.map(lambda lines: lines.split("|"))
years = movie_fields.map(lambda fields: fields[2]).map(lambda x: convert_year(x))
years_filtered = years.filter(lambda x: x!= 1900)

Often times real-world datasets require some in-depth approaches to parsing the data. This example displays why data exploration is so important since many of these issues in data integrity and quality are noticed during this phase.

After filtering the bad data, we can transform the list of movie release years into "movie ages" by subtracting the current year. We use the countByValue() function to compute the counts for each movie age, and then plot the histogram.

In [16]:
movie_ages = years_filtered.map(lambda yr: 1998-yr).countByValue()
values = movie_ages.values()
bins = movie_ages.keys()
plt.hist(values, bins=bins, color='lightblue', normed=True)
plt.xlabel("Age")
plt.ylabel("Frequency")
fig = plt.gcf()
fig.set_size_inches(16, 10)
process_figure(fig, 'movie_ages.png')
plt.clf()


http://ec2-54-153-99-19.us-west-1.compute.amazonaws.com:8810/7iRcf7wFXmtkEPgNnqVlFd8Dr-P3L4Ok/movie_ages.png

If you look closely at the information about this dataset, you will see it was released in 1998!! And if you look at the age information, you will notice that the age is based on the time of the rating. So, how does the graph look different if we measure age from today?

In [17]:
# We change the 1998 in the first line to 2016
movie_ages_now = years_filtered.map(lambda yr: 2016-yr).countByValue()
values1 = movie_ages_now.values()
bins1 = movie_ages_now.keys()
plt.hist(values1, bins=bins1, color='lightblue', normed=True, range=(min(bins1), max(bins1)))
fig1 = plt.gcf()
fig1.set_size_inches(16, 10)
plt.xlabel("Age")
plt.ylabel("Frequency")
process_figure(fig1, 'movie_ages_now.png')
plt.clf()

## BONUS: Figure out why the plot is WRONG!
## DOUBLE BONUS : Fix it!


http://ec2-54-153-99-19.us-west-1.compute.amazonaws.com:8810/7iRcf7wFXmtkEPgNnqVlFd8Dr-P3L4Ok/movie_ages_now.png

# Exploring the rating dataset

This dataset has 100,000 ratings and dissimilar to the previous two datasets is the fact that these records are split with a tab character ("\t"). We will run some basic queries on the dataset. 

In [18]:
rating_data_raw = sc.textFile('ml-100k/u.data')
print rating_data_raw.first()
num_ratings = rating_data_raw.count()
print 'Ratings : %d' % num_ratings

196	242	3	881250949
Ratings : 100000

Above, we are loading in the dataset with the `textFile()` method as we have done previously in this module. 
We then, print the first line from the dataset.
Then, using the `count()` method, we count the number of lines in the dataset.
Then, we print the result.

Below we perform some calculations to describe the data set. Notice the difference between what I calculate for the average, and the alternate calculation using the method from numpy. 

Why do you think there is a difference? 

In [19]:
rating_data = rating_data_raw.map(lambda line: line.split('\t'))
ratings = rating_data.map(lambda fields: int(fields[2]))
max_rating = ratings.reduce(lambda x, y: max(x,y))
min_rating = ratings.reduce(lambda x, y: min(x,y))
mean_rating = ratings.reduce(lambda x, y: x + y) / num_ratings
mean_rating2 = np.mean(ratings.collect())
median_rating = np.median(ratings.collect())

ratings_per_user = num_ratings / num_users
ratings_per_movie = num_ratings / num_movies

print "Min rating: %d" % min_rating
print "Max rating: %d" % max_rating
print "Average rating: %2.2f" % mean_rating
print "Alternate Average Rating: %2.2f" % mean_rating2
print "Median rating: %d" % median_rating
print "Average # of ratings per user: %2.2f" % ratings_per_user
print "Average # of ratings per movie: %2.2f" % ratings_per_movie

Min rating: 1
Max rating: 5
Average rating: 3.00
Alternate Average Rating: 3.53
Median rating: 4
Average # of ratings per user: 106.00
Average # of ratings per movie: 59.00

The above code block does the following:
* `rating_data`: We use the `map()` method to apply the lambda expression to every line in the dataset
    * The lambda expression uses the `split()` method to retrieve a list of the fields in a single line of the dataset
* `ratings`: Here, we again use `map()`, but this lambda casts the third field in a list from `rating_data` as an `int`. The list of `int`s is then returned as a list and stored in the `ratings` variable.
* `max_rating`: Here we are using the `reduce()` method, which performs a computation on a list and returns the result. 
    * Here we obtain maximum value of the dataset (i.e. we look at each list in the dataset, and find the maximum value among the lists).
* `min_rating`: Something similar happens here. Except now, it's the minimum.
* `mean_rating`: Here, we sum all of the ratings in the dataset and divide by the number of ratings, which gives us the mean value (or average).
* `mean_rating2`: Here, we use the handy `mean()` method from the numpy module that we imported in the beginning.
    * Hint: The reason we get different answers has to do types.... I'll let you look into that.
* `median_rating`: Here we use the `median()` method from the numpy module to obtain the median value in the dataset.
* Then, we print all of the values out for observation.

Spark provides the `stats()` function for RDDs. This is a super handy function, so remember it for the future! :)

In [20]:
ratings.stats()

(count: 100000, mean: 3.52986, stdev: 1.12566797076, max: 5.0, min: 1.0)

Upon observing the results, we can see that the average rating given by a user to a movie is around 3.5 and median rating is 4. This leads us to expect that the distribution of ratings will be skewed towards slightly higher ratings. 

In [21]:
count_by_ratings = ratings.countByValue()
x_axis = np.array(count_by_ratings.keys())
y_axis = np.array([float(c) for c in count_by_ratings.values()])
y_axis_normed = y_axis / y_axis.sum()
pos = np.arange(len(x_axis))
width = 1.0

ax = plt.axes()
ax.set_xticks(pos + (width / 2))
ax.set_xticklabels(x_axis)

plt.bar(pos, y_axis_normed, width, color='lightblue')
fig = plt.gcf()
fig.set_size_inches(16, 10)
process_figure(fig, 'ratings_distribution.png')
plt.clf()

http://ec2-54-153-99-19.us-west-1.compute.amazonaws.com:8810/7iRcf7wFXmtkEPgNnqVlFd8Dr-P3L4Ok/ratings_distribution.png

 - After seeing some summary statistics, it is clear that the distribution of ratings is skewed towards average to high ratings. We can also observe the distribution of the number of ratings made by each user. 

 - Recall that we previously computed the rating_data RDD used in the preceding code by splitting the ratings with the tab character. We will now use the rating_data variable again to compute the distribution of ratings per user. First we extract the key value (the user ID) and the rating value from rating_data RDD. 

 - Then, we will group the ratings by user ID using the  groupByKey() function.

In [23]:
user_ratings_grouped = rating_data.map(lambda fields: (int(fields[0]), int(fields[2]))).groupByKey()

user_ratings_by_user = user_ratings_grouped.map(lambda (k,v): (k, len(v)))
user_ratings_by_user.take(5)

[(1, 272), (2, 62), (3, 54), (4, 24), (5, 175)]

In the code block above, we did the following:
* `user_ratings_grouped`: Here we cast the first and third values of each list in the `rating_data` dataset, and return the two values as a tuple for each list.
    * Then, we apply the `groupByKey()` method to the list of tuples, to group the tuples into a single sequence. 
* `user_ratings_by_user`: Here we apply the lambda expression to each of the tuples that were obtained from the previous line of code. 
    * What we're doing here is returning the row number from the RDD, and the length of the `v` value in a tuple. 
* We then, take the first 5 tuples from the list and present them in the output. 

Now we plot the histogram() of number of ratings per user. You should see that most of the users give fewer than 100 ratings. The distribution does show that many users give hundreds of ratings, surprisingly. Though not in the 1998 dataset, Dr. Goggins is one of the users who has provided hundreds of ratings; and Johnny Depp would not be happy at how Dr. Goggins rates his films. 

In [24]:
user_ratings_by_user_local = user_ratings_by_user.map(lambda (k,v): v).collect()
plt.hist(user_ratings_by_user_local, bins=200, color='lightblue', normed=True)
fig = plt.gcf()
fig.set_size_inches(16,10)
process_figure(fig, 'ratings_per_user_distrib.png')
plt.clf()

http://ec2-54-153-99-19.us-west-1.compute.amazonaws.com:8810/7iRcf7wFXmtkEPgNnqVlFd8Dr-P3L4Ok/ratings_per_user_distrib.png

 - At this point we have 
    - Explored the dataset and 
    - Graphed Descriptive Statistics
 - Next up:
    - cleaning the data
    - Preparing analysis for sharing

# Processing and transforming your data

Before we can extract useful features for any [machine learning algorithms](http://machinelearningmastery.com/a-tour-of-machine-learning-algorithms/), we will first need to clean up the data. We may also need to transform it in different ways to extract any useful features. 

Data transformation and feature extraction are two steps that are closely linked. In general, real-world datasets contain bad data, missing data points, and outliers. To deal with such events, we utilize the following options:

* Filter out or remove records with bad or missing values.
* Fill in bad or missing data.
* Apply robust techniques to outliers.
* Apply transformations to potential outliers.

Below is an example of filling in bad or missing data, which is extremely common in big data science. Here, we assign a value to the data point that is equal ot the median year of release. Then, we compute the mean and median year of release after grabbing all of the year of release data, except the bad data point. We then use the where() function to find the index of the bad value in years_pre_processed_array. Finally, we use this index to assing the median release year to the bad value. 

You should see that the median release year is quite higher because of the skewed distribution of the years. While it's not always very straightforward to decide on precisely which fill-in value to use for a given situation, in this case, median makes sense because it will counter the high concentration of data in skewed years. 

In [25]:
years_pre_processed = movie_fields.map(lambda fields: fields[2]).map(lambda x: convert_year(x)).collect()
years_pre_processed_array = np.array(years_pre_processed)

mean_year = np.mean(years_pre_processed_array[years_pre_processed_array != 1900])
median_year = np.median(years_pre_processed_array[years_pre_processed_array != 1900])
index_bad_data = np.where(years_pre_processed_array == 1900)[0][0]
years_pre_processed_array[index_bad_data] = median_year

print 'Mean year of release: %d' % mean_year
print 'Median year of release: %d' % median_year
print "Index of '1900' after reassigning median: %s" % np.where(years_pre_processed_array == 1900)[0]

Mean year of release: 1989
Median year of release: 1995
Index of '1900' after reassigning median: []

In the above block of code, you should see some familiar methods used, and the all too familiar lambda expression. 
See if you can understand what each line of code is doing. 
You already know `map()`, and you've seen how lambdas work. 
You may need to look up a couple of the methods used, such as array() and where().
In the `index_bad_data` line of code, the `[0][0]` piece on the end of the line is an index into the numpy array.
See if you follow the rest. :)

# Extracting useful features from your data

Now we are ready to extract actual features from the data with which our machine learning model can be trained. 

Features refer to the variables that we use to [train our model](http://docs.aws.amazon.com/machine-learning/latest/dg/training-ml-models.html). Almost all machine learning models ultimately work on numerical data in the form of vectors, so we need to convert raw data into numbers.

Here are some categories of features:
* Numerical features: typically real or integer numbers
* Categorical features: refer to variables that can take one from a set of possible states at any given time
* Text features: derived from the text content in the data
* Other features: most other types of features are ultimately represented numerically

## Categorical features

These features cannot be used as input in their raw form, as they are not numbers. They are members of a set of possible values that the variable can take. From the pervious example, user occupation is a categorical variable that can take a handful of values. 

Categroical variables are also known as nominal variables where there is no concept of order between the values of the variable. If there is such a concept of order between variables, we refer to them as ordinal variables. 

To transform categorical variables into a numerical representation, we can use a common appraoch known as 1-of-k encoding. This is necessary to represent nominal variables in a way that makes sense for a machine leanring task. 

 - Assume there are k possible values that the variable can take. If we assign each possible value an index from the set of 1 to k, then we can represent a given state of the variable using a binary vector of length k. Here, all entries are zero, except the entry at the index that corresponds to the given state of the varialbe. 

 - Here, we collect all the possible states of the occupation variable. We then, assign index values to each possible occupation in turn.

Basically, we are creating a map of occupations to numbers. 

In [26]:
all_occupations = user_fields.map(lambda fields: fields[3]).distinct().collect()
all_occupations.sort()

idx = 0
all_occupations_dict = {}
for o in all_occupations:
    all_occupations_dict[o] = idx
    idx += 1

print "Encoding of 'doctor': %d" % all_occupations_dict['doctor']
print "Encoding of 'programmer': %d" % all_occupations_dict['programmer']

Encoding of 'doctor': 2
Encoding of 'programmer': 14

In the above block of code, the first two lines contain pieces that look familiar.
The middle chunk creates a dictionary. 
A python dictionary is a commonly used data structure in the language.
It is, in essence, a list of key value pairs.
Keys are separated from their values via a colon, and items are separated by commas. 
The whole thing is enclosed in curly braces `{}`.
The keys must be unique, but the values do not have to be.
The keys must be of an immutable data type.

`all_occupations_dict`: initially, an empty dictionary
The `for` loop steps through all of the occupations in the `all_occupations` list and assigns an index. 

Two examples are printed at the bottom.

We can then, encode the value of programmer. We start by creating  a numpy array of a length that is equal to the number of possible occupations (k) and filling it with zeros. Then, we use the zeros() function to create the array. Then, we extract the index of the word programmer and assign a value of 1 to the array value at the index:

In [27]:
print(all_occupations_dict)

{u'administrator': 0, u'executive': 6, u'retired': 15, u'student': 18, u'doctor': 2, u'marketing': 11, u'entertainment': 5, u'none': 12, u'scientist': 17, u'writer': 20, u'healthcare': 7, u'other': 13, u'lawyer': 9, u'educator': 3, u'technician': 19, u'librarian': 10, u'programmer': 14, u'artist': 1, u'salesman': 16, u'homemaker': 8, u'engineer': 4}

In [28]:
K = len(all_occupations_dict)
binary_x = np.zeros(K)
k_programmer = all_occupations_dict['programmer']
binary_x[k_programmer] = 1
print "Binary feature vector: %s" % binary_x
print "Length of binary vector: %d" % K

Binary feature vector: [ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  1.  0.  0.  0.
  0.  0.  0.]
Length of binary vector: 21

## Derived features

It is often useful to compute a derived feature from one or more available variables. We can compute the average rating given by each user to all the movies they rated. This feature is one that can provide a user-specific intercept in our model. By taking the raw rating data and creating a new feature, we can now learn a better model. Often the idea behind using these types of transformations is to summarize the numerical data in a way that may make it easier for a model to learn. 

To illustrate this concept, we will use the times of the ratings given by users to movies. These are within the Unix timestamps format. We will use the datetime() module to extract the date and time from the timestamp, and in turn, extract the hour of the day.

In [29]:
def extract_datetime(ts):
    return datetime.datetime.fromtimestamp(ts)

Firstly, we will use the map() transformation to extract the timestamp field. This converts it to a Python int datatype. We will then apply the extract_datetime() function to each of the timestamps and extract the hour from the resulting datetime object.

In [30]:
timestamps = rating_data.map(lambda fields: int(fields[3]))
hour_of_day = timestamps.map(lambda ts: extract_datetime(ts).hour)
hour_of_day.take(5)

[15, 19, 7, 5, 5]

We have transformed the raw time data into a categorical feature that represents the hour of the day in which the rating was given. We now demonstrate assigning each hour-of-the-day value into a defined "bucket" that represents a time of day. 

In [31]:
def assign_tod(hr):
    times_of_day = {
        'morning': range(7, 12),
        'lunch': range(12, 14),
        'afternoon': range(14, 18),
        'evening': range(18, 23),
        'night': range(0, 7) + [23]
    }
    for k, v in times_of_day.iteritems():
        if hr in v:
            return k

The above function definition does this:
* Firstly, it takes in an hour as a parameter.
* Then, it gives a time range to each key (denoted by the red text).
    * Notice that the first value in all of the `range()` calls is less than the second value.
    * This is how we present ranges validly in python.
* After the `times_of_day` dictionary has been created, we use a `for` loop to step through the all of the items in the `times_of_day` dictionary.
* The `if` statement states that if `hr` is found in one of the range values, then return that value's key.

Here we use the `assign_tod()` function to the hour of each rating event contained in the `hour_of_day` RDD.

In [32]:
time_of_day = hour_of_day.map(lambda hr: assign_tod(hr))
time_of_day.take(5)

['afternoon', 'evening', 'morning', 'night', 'night']

## Text Features

Text features can be considered forms of categorical and derived features. The goal is to turn raw text into a form that is more amenable to machine learning. The field of natural language processing is dedicated to processing, representing, and modeling textual content. A full treatment is beyond the scope of this module. However, here we will use the bag-of-words representation, which is a simple and standard approach for text-feature extraction.

This approach treats a piece of text content as a set of the words, and possibly numbers. The process is as follows:
* Tokenization: First, tokenization is applied to the text ot split it into a set of tokens (i.e. words, numbers, etc.)
* Stop word removal: Next, we remove very common words such as "the", "and", and "but" (a.k.a. stop words)
* Stemming: Next, you can include stemming, which refers to taking a term and reducing it to its base form or stem. (Ex: Plural terms become singular.) 
* Vectorization: The final step is turning the processed terms into a vector representation. The simplest form is a binary vector representation where we assign a value of one if a term exists in the text and zero if it does not. (This is identical to the categorical 1-of-k encoding we encountered earlier.)

The following is an example of extracting textual features in the binary vector representation. We will use the movie titles that are available. 

We firstly, create a function that strips away the year of release for each movie. We use Python's regular expression module, re, to search for hte year between parentheses in the movie titles. If a match is found, we extract only the title up to the index of the first match. 

In [33]:
def extract_title(raw):
    # non-word numbers between parentheses
    grps = re.search('\((\w+)\)', raw)
    if grps:
        # take only the titel, strip remaining whitespace
        return raw[:grps.start()].strip()
    else:
        return raw

Now, we extract the raw movie titles from the movie_fields RDD. Then, we test out our extract_title() function.

In [34]:
raw_titles = movie_fields.map(lambda fields: fields[1])
for raw_title in raw_titles.take(5):
    print extract_title(raw_title)

Toy Story
GoldenEye
Four Rooms
Get Shorty
Copycat

We would then like to apply our function to the raw titles and apply a tokenization scheme to the extracted titles to convert them to terms.

In [35]:
movie_titles = raw_titles.map(lambda m: extract_title(m)) # tokenize on whitespace
title_terms = movie_titles.map(lambda t: t.split(' '))
print title_terms.take(5)

[[u'Toy', u'Story'], [u'GoldenEye'], [u'Four', u'Rooms'], [u'Get', u'Shorty'], [u'Copycat']]

Now, we use Spark's flatMap() function to expand the list of strings in each record of hte title_terms RDD into a new RDD of strings where each record is a term called all_terms. 

We then print the total number of unique terms and test out our term mapping on a few different terms.

In [36]:
all_terms = title_terms.flatMap(lambda x: x).distinct().collect()
idx = 0
all_terms_dict = {}
for term in all_terms:
    all_terms_dict[term] = idx
    idx += 1

print "Total number of terms: %d" % len(all_terms_dict)
print "Index of term 'Dead': %d" % all_terms_dict['Dead']
print "Index of term 'Rooms': %d" % all_terms_dict['Rooms']

Total number of terms: 2645
Index of term 'Dead': 723
Index of term 'Rooms': 321

The `flatMap()` function is similar to `map()`, but it also flattens the results.
(Search the web for more about flattening results, if you are curious!)

Spark's `zipWithIndex()` function is more efficient at acquiring this result. The function takes an RDD of values and merges them together with an index to create a new RDD of key-value pairs. 

(The key is the term and the value is the index in the term directory.) 

The `collectAsMap()` function collects the key-value RDD to the driver as a Python dict method. 

In [37]:
all_terms_dict2 = title_terms.flatMap(lambda x: x).distinct().zipWithIndex().collectAsMap()
print "Index of term 'Dead': %d" % all_terms_dict2['Dead']
print "Index of term 'Rooms': %d" % all_terms_dict2['Rooms']

Index of term 'Dead': 723
Index of term 'Rooms': 321

The last step is to create a function that converts a set of terms into a sparse vector representation. To do this:
 - we create an empty sparse matrix with one row and a number of columns equal to the total number of terms in our dictionary. 
 - We then step through each term in the input list of terms and check whether this term is in our term dictionary. 
  - If it is, we assign a value of 1 to the vector at the index that corresponds to the term in our dictionary mapping. 

Once we have the function, we apply it to each record in our RDD of extracted terms. 

In [38]:
def create_vector(terms, term_dict):
    num_terms = len(term_dict)
    x = sp.csc_matrix((1, num_terms))
    for t in terms:
        if t in term_dict:
            idx = term_dict[t]
            x[0, idx] = 1
    return x

In [39]:
all_terms_bcast = sc.broadcast(all_terms_dict)
term_vectors = title_terms.map(lambda terms: create_vector(terms, all_terms_bcast.value))
term_vectors.take(5)

[<1x2645 sparse matrix of type '<type 'numpy.float64'>'
	with 2 stored elements in Compressed Sparse Column format>, <1x2645 sparse matrix of type '<type 'numpy.float64'>'
	with 1 stored elements in Compressed Sparse Column format>, <1x2645 sparse matrix of type '<type 'numpy.float64'>'
	with 2 stored elements in Compressed Sparse Column format>, <1x2645 sparse matrix of type '<type 'numpy.float64'>'
	with 2 stored elements in Compressed Sparse Column format>, <1x2645 sparse matrix of type '<type 'numpy.float64'>'
	with 1 stored elements in Compressed Sparse Column format>]

We can see that each movie title has been transformed into a sparse vector. We can see that the titles in which we extracted two terms have two non-zero entries in the vector, titles where we extracted only one term have one non-zero entry.

## Normalizing features

Once the features have been extracted into the form of a vector, a common preprocessing step is to normalize the numerical data. The general idea is to transform each numerical feature in a way that scales it to a standard size. 

There are different kinds of normalization, which are listed below:
* Normalize a feature: This is usually a transformation applied to an individual feature across the dataset.
* Normalize a feature vector: This is usually a transformation applied to all features in a given row of the dataset such that the resulting feature vector has a normalized length. This ensures that each feature in the vector is scaled such that the vector has a norm of 1.

Below, we use the norm() function to achieve the vector normalization by first computing the L2 norm of a random vector and then dividing each element in the vector by this norm to create our normalized vector. Why Normalize?
 - In simple terms, normalization is tuning or selecting the preferred level of model complexity so your models are better at predicting (generalizing). 
 - If you don't do this your models may be too complex and overfit or too simple and underfit, either way giving poor predictions.

In [40]:
np.random.seed(42)
x = np.random.randn(10)
norm_x_2 = np.linalg.norm(x)
normalized_x = x / norm_x_2

print "X:\n%s" % x
print "2-Norm of x: %2.4f" %norm_x_2
print "Normalized x:\n%s" % normalized_x
print "2-Norm of normalized_x: %2.4f" % np.linalg.norm(normalized_x)

X:
[ 0.49671415 -0.1382643   0.64768854  1.52302986 -0.23415337 -0.23413696
  1.57921282  0.76743473 -0.46947439  0.54256004]
2-Norm of x: 2.5908
Normalized x:
[ 0.19172213 -0.05336737  0.24999534  0.58786029 -0.09037871 -0.09037237
  0.60954584  0.29621508 -0.1812081   0.20941776]
2-Norm of normalized_x: 1.0000

In [41]:
normalizer = Normalizer()
vector = sc.parallelize([x])

normalized_x_mllib = normalizer.transform(vector).first().toArray()

print "x:\n%s" % x
print "2-Norm of x: %2.4f" %norm_x_2
print "Normalized x:\n%s" % normalized_x_mllib
print "2-Norm of normalized_x: %2.4f" % np.linalg.norm(normalized_x_mllib)

x:
[ 0.49671415 -0.1382643   0.64768854  1.52302986 -0.23415337 -0.23413696
  1.57921282  0.76743473 -0.46947439  0.54256004]
2-Norm of x: 2.5908
Normalized x:
[ 0.19172213 -0.05336737  0.24999534  0.58786029 -0.09037871 -0.09037237
  0.60954584  0.29621508 -0.1812081   0.20941776]
2-Norm of normalized_x: 1.0000