In [2]:
# SparkContext is already defined as sc
HDFS = 'hdfs://ScutAmazon:9000/ml-100k/'

## Exploer Files

### user file

In [3]:
# user file
f = 'u.user'
user_data = sc.textFile(HDFS+f)
print user_data.first()
print 'Lines Count:%s'%user_data.count()
user_fields = user_data.map(lambda line: line.split('|'))
num_users = user_fields.map(lambda fields: fields[0]).distinct().count()
num_genders = user_fields.map(lambda fields: fields[2]).distinct().count()
num_occupations = user_fields.map(lambda fields: fields[3]).distinct().count()
num_zipcodes = user_fields.map(lambda fields: fields[4]).distinct().count()
print 'users: %s, genders: %s, occupations: %s, zipcodes: %s'%(num_users, num_genders, num_occupations, num_zipcodes)

comment = """cannot draw in pyspark
from matplotlib import pyplot
ages = user_fields.map(lambda fields: int(fields[1])).collect()
pyplot.hist(ages, bins = 20, normed = True)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16, 10)
"""

count_by_occupation = user_fields.map(lambda x : (x[3], 1))
count_by_occupation = count_by_occupation.reduceByKey(lambda a,b:a+b).collect()
# equal to the above two lines
# count_by_occupation = user_fields.map(lambda x : x[3]).countByValue()

# cannot draw
import numpy as np
x_axis = np.array([c[0] for c in count_by_occupation])
y_axis = np.array([c[1] for c in count_by_occupation])

print np.argsort(y_axis) # return indices of elements in the list after being sorted
x_axis = x_axis[np.argsort(y_axis)]
y_axis = x_axis[np.argsort(y_axis)]

1|24|M|technician|85711
Lines Count:943
users: 943, genders: 2, occupations: 21, zipcodes: 795
[10  8  3  2  7  1 16 11 12  5 18 14  9 13 17  6 20  0 15 19  4]


### movie file

In [4]:
f = 'u.item'
movie_data = sc.textFile(HDFS+f)
print movie_data.first()
num_movies = movie_data.count()
print num_movies

def convert_year(x):
    try:
        return int(x[-4:])
    except:
        return 1990
movie_fields = movie_data.map(lambda x:x.split('|'))
years = movie_fields.map(lambda fields:fields[2]).map(lambda x:convert_year(x))
count_by_year = years.countByValue()
print count_by_year.keys()

comment = """draw the histogrm
from matplotlib import pyplot
pyplot.hist(count_by_year, bins = count_by_years.keys(), normed = True)
fig = pyplot.gvf()
fig.set_size_inches(16, 10)
"""

1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
1682
[1922, 1926, 1930, 1931, 1932, 1933, 1934, 1935, 1936, 1937, 1938, 1939, 1940, 1941, 1942, 1943, 1944, 1945, 1946, 1947, 1948, 1949, 1950, 1951, 1952, 1953, 1954, 1955, 1956, 1957, 1958, 1959, 1960, 1961, 1962, 1963, 1964, 1965, 1966, 1967, 1968, 1969, 1970, 1971, 1972, 1973, 1974, 1975, 1976, 1977, 1978, 1979, 1980, 1981, 1982, 1983, 1984, 1985, 1986, 1987, 1988, 1989, 1990, 1991, 1992, 1993, 1994, 1995, 1996, 1997, 1998]


### rating file

In [5]:
f = 'u.data'
rating_data = sc.textFile(HDFS+f)
print rating_data.first()
num_ratings = rating_data.count()
print "Rating count:%s"%num_ratings

rating_fields = rating_data.map(lambda x:x.split('\t'))
rating = rating_fields.map(lambda x:int(x[2]))
max_rating = rating.reduce(lambda a,b: max(a,b))
print 'max rating: %s'%max_rating
min_rating = rating.reduce(lambda a,b: min(a,b))
print 'min rating: %s'%min_rating
mean_rating = rating.reduce(lambda a,b: a+b)*1.0/num_ratings
print 'average rating: %2.2f'%mean_rating
median_rating = np.median(rating.collect())
print 'median rating: %s'%median_rating
average_rating = num_ratings/num_users
print 'average rating: %s'%average_rating
rating_per_movie = num_ratings/num_movies
print 'rating per movie: %s'%rating_per_movie

# build-in function
print rating.stats()

user_ratings = rating_fields.map(lambda fields: (fields[0], fields[2])).groupByKey()
user_ratings_by_user = user_ratings.map(lambda (k, v): (k, len(v)))
print user_ratings_by_user.take(10)

comment ="""draw the histgram
user_ratings_by_user_count = user_ratings_by_user.map(lambda (k,v) : v)
pyplot.hist(user_ratings_by_user_count, bins = user_ratings_by_user_count.count(), normed = True)
fig = pyplot.gcf()
fig.set_size_inches(16, 10)
"""

196	242	3	881250949
Rating count:100000
max rating: 5
min rating: 1
average rating: 3.53
median rating: 4.0
average rating: 106
rating per movie: 59
(count: 100000, mean: 3.52986, stdev: 1.12566797076, max: 5.0, min: 1.0)
[(u'344', 190), (u'346', 193), (u'340', 44), (u'342', 201), (u'810', 26), (u'812', 20), (u'814', 35), (u'816', 25), (u'719', 67), (u'717', 93)]


In [6]:
print years.mean() # don't need collect() to return all the data to driven program
years_array = np.array(years.collect())
mean_year = np.mean(years_array[years_array != 1990])
median_year = np.median(years_array[years_array != 1990])
print 'mean of the years: %s'%mean_year
print 'median of the years: %s'%median_year

bad_indcices = np.where(years_array[years_array == 1990])[0][0]
years_array[bad_indcices] = median_year
print 'all missing years are filled with median year'

1989.38644471
mean of the years: 1989.37718769
median of the years: 1995.0
all missing years are filled with median year


## Process and Transform

### category feature

In [7]:
# one-hot encoding
import numpy as np
all_occupation = user_fields.map(lambda fields:fields[3]).distinct().collect()
all_occupation.sort()
mapping = {}
for i in xrange(len(all_occupation)):
    mapping[all_occupation[i]] = i
k = len(all_occupation)
occupation = 'programmer'
one_hot_vec = np.zeros(k)
one_hot_vec[mapping[occupation]] = 1
print 'one hot encoding for %s is \n %s'%(occupation, one_hot_vec)

one hot encoding for programmer is 
 [ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  1.  0.  0.  0.
  0.  0.  0.]


### derived features

In [8]:
import datetime
def extract_hour(timestamp):
    """extract hour from unix time stamp, can also extract year, month, day, miniute"""
    return datetime.datetime.fromtimestamp(timestamp).hour

timestamps = rating_fields.map(lambda fields:int(fields[3]))
hours = timestamps.map(extract_hour)
print hours.take(5)

def assign_tag(hour):
    mapping = {'morning': range(6,12), 'lunch': range(12, 14), 'afternoon': range(14, 18), 'evening': range(18, 24), 'night':range(0, 6)}
    for k, v in mapping.items():
        if hour in v:
            return k

tags = hours.map(assign_tag)
print tags.take(5)

[23, 3, 15, 13, 13]
['evening', 'night', 'afternoon', 'lunch', 'lunch']


### text feature

In [12]:
import re
def extract_title(raw):
    # this regular expression finds the non-word (numbers) between parentheses
    grps = re.search("\((\w+)\)", raw)
    if grps: # we take only the title part, and strip the trailing whitespace from the remaining text, below
        return raw[:grps.start()].strip()
    else:
        return raw

# step1: extract text
raw_titles = movie_fields.map(lambda fields: fields[1])
titles = raw_titles.map(extract_title)
print titles.take(5)

# step2: tokenizer
title_terms = titles.map(lambda t: t.split())
print title_terms.take(5)

# step3: build dictionary to map word to integer
all_terms = title_terms.flatMap(lambda x: x).distinct().collect()
idx = 0
all_terms_dict = {}
for term in all_terms:
    all_terms_dict[term] = idx
    idx += 1

print "Total number of terms: %d" % len(all_terms_dict)
print "Index of term 'Dead': %d" % all_terms_dict['Dead']
print "Index of term 'Rooms': %d" % all_terms_dict['Rooms']

# the above can also be implemented in paralle 
all_terms_dict1 = title_terms.flatMap(lambda x: x).distinct().zipWithIndex().collectAsMap()
# print all_terms_dict1

# step4: convert a list of terms to a vector
from scipy import sparse as sp
def create_vector(terms, term_dict):
    n = len(term_dict)
    vector = sp.csc_matrix((1, n))
    for term in terms:
        if term in term_dict:
            vector[0, term_dict[term]] = 1
    return vector

# broadcast variable
all_terms_broad = sc.broadcast(all_terms_dict)
vectors = title_terms.map(lambda terms: create_vector(terms, all_terms_broad.value))
vectors.take(5)

[u'Toy Story', u'GoldenEye', u'Four Rooms', u'Get Shorty', u'Copycat']
[[u'Toy', u'Story'], [u'GoldenEye'], [u'Four', u'Rooms'], [u'Get', u'Shorty'], [u'Copycat']]
Total number of terms: 2645
Index of term 'Dead': 146
Index of term 'Rooms': 1963


[<1x2645 sparse matrix of type '<type 'numpy.float64'>'
	with 2 stored elements in Compressed Sparse Column format>, <1x2645 sparse matrix of type '<type 'numpy.float64'>'
	with 1 stored elements in Compressed Sparse Column format>, <1x2645 sparse matrix of type '<type 'numpy.float64'>'
	with 2 stored elements in Compressed Sparse Column format>, <1x2645 sparse matrix of type '<type 'numpy.float64'>'
	with 2 stored elements in Compressed Sparse Column format>, <1x2645 sparse matrix of type '<type 'numpy.float64'>'
	with 1 stored elements in Compressed Sparse Column format>]

### normalize features

In [25]:
# normalize with numpy
print '===================normalize with numpy======================='
import numpy as np
np.random.seed(42)
x = np.random.randn(10)
norm_2 = np.linalg.norm(x)
normalized_x = x/norm_2
print "x:\n%s" % x
print "2-Norm of x: %2.4f" % norm_2
print "Normalized x:\n%s" % normalized_x
print "2-Norm of normalized_x: %2.4f" %np.linalg.norm(normalized_x)


print '===================normalize with spark======================='
from pyspark.mllib.feature import Normalizer
normalizer = Normalizer()
vector = sc.parallelize([x])
normalized_x_mllib = normalizer.transform(vector).first().toArray()
print "x:\n%s" % x
print "Normalized x MLLib:\n%s" % normalized_x_mllib
print "2-Norm of normalized_x_mllib: %2.4f" %np.linalg.norm(normalized_x_mllib)

x:
[ 0.49671415 -0.1382643   0.64768854  1.52302986 -0.23415337 -0.23413696
  1.57921282  0.76743473 -0.46947439  0.54256004]
2-Norm of x: 2.5908
Normalized x:
[ 0.19172213 -0.05336737  0.24999534  0.58786029 -0.09037871 -0.09037237
  0.60954584  0.29621508 -0.1812081   0.20941776]
2-Norm of normalized_x: 1.0000
x:
[ 0.49671415 -0.1382643   0.64768854  1.52302986 -0.23415337 -0.23413696
  1.57921282  0.76743473 -0.46947439  0.54256004]
Normalized x MLLib:
[ 0.19172213 -0.05336737  0.24999534  0.58786029 -0.09037871 -0.09037237
  0.60954584  0.29621508 -0.1812081   0.20941776]
2-Norm of normalized_x_mllib: 1.0000
