### Data Preprocessing using Spark 
the link to download the data 
http://files.grouplens.org/datasets/movielens/ml-100k.zip

#### Load the Dataset which contains the information about users, later we will load other datasets that contains the description about movie and ratings.

In [3]:
## read the file by changing the path 
user_data = sc.textFile("/FileStore/tables/kv4939va1496946928579/u.user")
user_data.first()

In [4]:
user_fields = user_data.map(lambda line: line.split("\t"))
user_fields.first()

#### Get the information about Users
Such as Total number of Users
Since occupation is an class varible we need to find number distinct classes 
Count of each Genders 
and Distinct Count of Zip-codes

In [6]:
## Counting the Number of Users 
num_users = user_fields.map(lambda fields: fields[0]).count()

num_occupations = user_fields.map(lambda fields: fields[3]).distinct().count()

num_genders = user_fields.map(lambda fields:fields[2]).distinct().count()

num_zipcodes = user_fields.map(lambda fields:fields[4]).distinct().count()

print "Users: %d, genders: %d, occupations: %d, ZIP codes: %d" % (num_users, num_genders, num_occupations, num_zipcodes)

In [7]:
import matplotlib.pyplot as plt
ages = user_fields.map(lambda x: int(x[1])).collect()
plt.hist(ages, bins=20, color='red', normed=True)
fig = plt.gcf()
fig.set_size_inches(16, 10)
display(fig)

In [9]:
import numpy as np 
count_by_occupation = user_fields.map(lambda fields: (fields[3], 1)).reduceByKey(lambda x, y: x + y).collect()
x_axis1 = np.array([c[0] for c in count_by_occupation])
y_axis1 = np.array([c[1] for c in count_by_occupation])

In [10]:
pos = np.arange(len(x_axis)) 
width = 1.0
ax = plt.axes()
ax.set_xticks(pos + (width/2))
ax.set_xticklabels(x_axis)
plt.bar(pos, y_axis, width, color='lightblue')
plt.xticks(rotation=90)
fig = plt.gcf()
fig.set_size_inches(16, 10)
display(fig)

In [11]:
count_by_occupation2 = user_fields.map(lambda fields: fields[3]).countByValue()
count_by_occupation2

In [12]:
movie_data = sc.textFile("/FileStore/tables/djtncxwv1496949426397/u.item")
print movie_data.first()
num_movies = movie_data.count()
print "Movies: %d" % num_movies
# movie_data.take(1)[0][2]

In [13]:
Movie_feat = movie_data.map(lambda line: line.split("|"))
Movie_feat.take(3)

In [14]:
def convert_year(x):
  try:
    return int(x[-4:])
  except:
    return 1900 # there is a 'bad' data point with a blank year, which we set to 1900 and will filter out later

In [15]:
movie_fields = movie_data.map(lambda lines: lines.split("|"))
years = movie_fields.map(lambda fields: fields[2]).map(lambda x: convert_year(x))

In [16]:
years_filtered = years.filter(lambda x: x != 1900)
years_filtered.take(1)

In [17]:
import matplotlib.pyplot as plt
movie_ages = years_filtered.map(lambda yr: 2017 - yr).countByValue()
values = movie_ages.values()
bins = movie_ages.keys()
plt.hist(values, bins=bins, color='lightblue', normed = True)
fig1 = plt.gcf()
fig1.set_size_inches(16,10)
display(fig1)

In [18]:
rating_data_raw = sc.textFile("/FileStore/tables/y3dmjo4l1497452649183/u.data")
print rating_data_raw.first()
num_ratings = rating_data_raw.count()
print "Ratings: %d" % num_ratings
rating_data = rating_data_raw.map(lambda line: line.split("\t"))
rating_data.take(2)

#### Calculating Stats of ratings

In [20]:
import numpy as np 
ratings = rating_data.map(lambda fields: int(fields[2]))
max_rating = ratings.reduce(lambda x, y: max(x, y))
min_rating = ratings.reduce(lambda x, y: min(x,y))
mean_rating = ratings.reduce(lambda x, y: (x + y))/ num_ratings
median_rating = np.median(ratings.collect())

### Display the ratings basic stats

In [22]:
print "Min rating: %d" % min_rating
print "Max rating: %d" % max_rating
print "Average rating: %2.4f" % mean_rating
print "Median rating: %d" % median_rating

#### Now we have to calculate number of ratings per user and number of ratings per movie i.e. number of total ratings divided by num of users.
we see that on an average each user has given 106 ratings and each movie receives approx. 59 ratings

In [24]:
ratings_per_user = num_ratings / num_users
ratings_per_movie = num_ratings / num_movies
print "Average # of ratings per user: %2.2f" % ratings_per_user
print "Average # of ratings per movie: %2.2f" % ratings_per_movie

#### Statistics from Spark inbuilt stats function

In [26]:
ratings.stats()

To compute the distribution of ratings per user, we will first extract the user ID as 
key and rating as value from
rating_data RDD. We will then group the ratings by
user ID using Spark's
groupByKey function:

In [28]:
### lets have a look at the ratings data get the fields 
rating_data.take(5)

In [29]:
user_ratings_grouped = rating_data.map(lambda fields: (int(fields[0]),int(fields[2]))).groupByKey()

In [30]:
## for each key (user ID), we will find the size of the set of ratings; this will give us the number of ratings for that user:
user_ratings_byuser = user_ratings_grouped.map(lambda (k, v): (k,len(v)))
user_ratings_byuser.take(3)

### plot the histogram of number of ratings per user using our favorite hist function

In [32]:
user_ratings_byuser_local = user_ratings_byuser.map(lambda (k, v): v).collect()
plt.hist(user_ratings_byuser_local, bins=200, color='lightblue', normed=True)
plt.subplot(211)
#fig3.set_size_inches(16,10)
display()

### Plotting number of ratings given to each movie

In [34]:
movie_ratings_grouped = rating_data.map(lambda fields: (int(fields[1]),int(fields[2]))).groupByKey()
## for each key (user ID), we will find the size of the set of ratings; this will give us the number of ratings for that user:
ratings_bymovie = movie_ratings_grouped.map(lambda (k, v): (k,len(v)))
ratings_bymovie.take(3)

In [35]:
ratings_bymovie_local = ratings_bymovie.map(lambda (k, v): v).collect()
plt.hist(ratings_bymovie_local, bins=200, color='cyan', normed=True)
plt.subplot(211)
#fig3.set_size_inches(16,10)
#display()

In [36]:
def extract_datetime(ts):
    import datetime
    return datetime.datetime.fromtimestamp(ts)

In [37]:
timestamps = rating_data.map(lambda fields: int(fields[3]))
hour_of_day = timestamps.map(lambda ts: extract_datetime(ts).hour)
hour_of_day.take(5)

### Converting Time Stamp to Categorical Features

In [39]:
def assign_tod(hr):
  times_of_day = {
    'morning' : range(7, 12),
    'lunch' : range(12, 14),
    'afternoon' : range(14, 18),
    'evening' : range(18, 23),
    'night' : range(24, 7),
    'Super night': range(0,7)
  }
  for k, v in times_of_day.iteritems():
    if hr in v:
      return k

In [40]:
time_of_day = hour_of_day.map(lambda hr: assign_tod(hr))
time_of_day.take(5)

### Missing Data Treatment

In [42]:
years_pre_processed = movie_fields.map(lambda fields: fields[2]).map(lambda x: convert_year(x)).collect()
years_pre_processed_array = np.array(years_pre_processed)

In [43]:
years_pre_processed_array

#### we will compute the mean and median year

In [45]:
mean_year = np.mean(years_pre_processed_array[years_pre_processed_array!=1900])
median_year = np.median(years_pre_processed_array[years_pre_processed_array!=1900])

In [46]:
# Lets have a look at mean year
mean_year

In [47]:
# Lets have a look at median year
median_year

In [48]:
index_bad_data = np.where(years_pre_processed_array==1900)[0][0]
years_pre_processed_array[index_bad_data] = median_year

### Extracting features from data
Categorical Features

In [50]:
## occupation variable
all_occupations = user_fields.map(lambda fields: fields[3]).distinct().collect()
all_occupations.sort()

In [51]:
all_occupations

In [52]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
df = spark.createDataFrame(zip(range(len(all_occupations)), all_occupations), ["id", "category"])
stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
df.show(5)

In [53]:
indexed = model.transform(df)
encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()