In [40]:
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
import os

# User



In [3]:
userData = pd.read_csv('Data/u.user', sep = "|", encoding = "iso-8859-1", names =['UserId','Age','Gender','Occupation','Zip'])
userData.head(5)

Unnamed: 0,UserId,Age,Gender,Occupation,Zip
0,1,24,M,technician,85711
1,2,53,F,other,94043
2,3,23,M,writer,32067
3,4,24,M,technician,43537
4,5,33,F,other,15213


# Movie


In [4]:
genre = pd.read_csv('Data/u.genre', sep = "|")
genre.head(20)

genre_list = list(pd.Series(genre['unknown']))

In [11]:
movieInfo = pd.read_csv('Data/u.item', sep = "|", encoding = "iso-8859-1", names=['MovieId','Title','Date','RealeseDate', 'VideoRDate','IMDB']+genre_list)
requiredInfo = movieInfo.drop(['Date', 'RealeseDate','VideoRDate','IMDB'], axis =1)
movieInfo.head(5)

Unnamed: 0,MovieId,Title,Date,RealeseDate,VideoRDate,IMDB,Action,Adventure,Animation,Children's,...,Fantasy,Film-Noir,Horror,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western
0,1,Toy Story (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Toy%20Story%2...,0,0,0,1,1,...,0,0,0,0,0,0,0,0,0,0
1,2,GoldenEye (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?GoldenEye%20(...,0,1,1,0,0,...,0,0,0,0,0,0,0,1,0,0
2,3,Four Rooms (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Four%20Rooms%...,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,0
3,4,Get Shorty (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Get%20Shorty%...,0,1,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,5,Copycat (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Copycat%20(1995),0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,0


In [99]:
indexedMovies = requiredInfo.drop(genre_list, axis=1)
indexedMovies.head(5)

Unnamed: 0,MovieId,Title
0,1,Toy Story (1995)
1,2,GoldenEye (1995)
2,3,Four Rooms (1995)
3,4,Get Shorty (1995)
4,5,Copycat (1995)


In [21]:
columnPurpose = requiredInfo.drop(['MovieId','Title'],axis = 1)

In [43]:
requiredInfo.to_csv("ProcessedData/requiredInfo.csv", index=False)
columnPurpose.to_csv("ProcessedData/movie_genre.csv", index=False)
indexedMovies.to_csv("ProcessedData/indexedMovies.csv", index=False)

# User-Ratings


In [24]:
def mapper1(row):
    user, movie, rating, date = row.split("\t")
    return((user,["%s-%s" %(movie,rating)]))

In [95]:
def sortMoviesByRatings(row):
    movies = row[1]
    movies.sort(key = lambda x: -int(x[-1]))
    return (row[0]," ".join(movies[:3]))

In [96]:
sc = SparkContext.getOrCreate()

ratings = sc.textFile('Data/u.data').map(mapper1)
ratings = ratings.reduceByKey(lambda x,y: x+y)




In [97]:
ratings = ratings.map(sortMoviesByRatings)
ratings.collect()

[('22', '128-5 258-5 510-5'),
 ('244', '154-5 89-5 652-5'),
 ('115', '8-5 127-5 234-5'),
 ('305', '427-5 483-5 50-5'),
 ('286', '1014-5 379-5 288-5'),
 ('303', '69-5 134-5 161-5'),
 ('122', '387-5 715-5 708-5'),
 ('234', '705-5 134-5 519-5'),
 ('119', '1153-5 237-5 222-5'),
 ('167', '1306-5 1126-5 133-5'),
 ('299', '127-5 216-5 462-5'),
 ('102', '195-4 307-4 89-4'),
 ('63', '100-5 1007-5 301-5'),
 ('160', '234-5 174-5 160-5'),
 ('50', '253-5 475-5 1084-5'),
 ('301', '79-5 202-5 174-5'),
 ('290', '143-5 50-5 71-5'),
 ('157', '150-5 127-5 273-5'),
 ('278', '603-5 525-5 22-5'),
 ('10', '611-5 100-5 488-5'),
 ('284', '301-5 347-5 272-5'),
 ('246', '201-5 425-5 68-5'),
 ('249', '241-5 746-5 11-5'),
 ('20', '87-5 148-5 496-5'),
 ('138', '26-5 523-5 483-5'),
 ('60', '427-5 60-5 430-5'),
 ('57', '304-5 79-5 744-5'),
 ('223', '969-5 237-5 216-5'),
 ('189', '520-5 1060-5 56-5'),
 ('243', '221-5 582-5 511-5'),
 ('241', '750-5 880-5 288-5'),
 ('222', '750-5 173-5 53-5'),
 ('8', '22-5 50-5 182-5'),

In [98]:
sqlContext = SQLContext(sc)
combinedratings = sqlContext.createDataFrame(ratings)
combinedratings.createOrReplaceTempView("ratings")

combinedratings.coalesce(1).write.csv('ProcessedData/combinedRatings/')
os.system('mv ./ProcessedData/combinedRatings/*.csv ./ProcessedData/combinedRatings/combinedRatings.csv')


0

# Users

In [126]:
def user_mapper_one(rows):
    row = rows.strip().split("|")
    return (row[0],row[1],row[2],row[3],row[4],str(row[1])+"_"+row[3])

In [127]:
users = sc.textFile('Data/u.user')
users = users.map(user_mapper_one)

In [128]:
combinedUsers = sqlContext.createDataFrame(users)
combinedUsers.createOrReplaceTempView("combinedUsers")

combinedUsers.coalesce(1).write.csv('ProcessedData/combinedUsers/')
os.system('mv ./ProcessedData/combinedUsers/*.csv ./ProcessedData/combinedUsers/combinedUsers.csv')


0

In [129]:
users.collect()

[('1', '24', 'M', 'technician', '85711', '24_technician'),
 ('2', '53', 'F', 'other', '94043', '53_other'),
 ('3', '23', 'M', 'writer', '32067', '23_writer'),
 ('4', '24', 'M', 'technician', '43537', '24_technician'),
 ('5', '33', 'F', 'other', '15213', '33_other'),
 ('6', '42', 'M', 'executive', '98101', '42_executive'),
 ('7', '57', 'M', 'administrator', '91344', '57_administrator'),
 ('8', '36', 'M', 'administrator', '05201', '36_administrator'),
 ('9', '29', 'M', 'student', '01002', '29_student'),
 ('10', '53', 'M', 'lawyer', '90703', '53_lawyer'),
 ('11', '39', 'F', 'other', '30329', '39_other'),
 ('12', '28', 'F', 'other', '06405', '28_other'),
 ('13', '47', 'M', 'educator', '29206', '47_educator'),
 ('14', '45', 'M', 'scientist', '55106', '45_scientist'),
 ('15', '49', 'F', 'educator', '97301', '49_educator'),
 ('16', '21', 'M', 'entertainment', '10309', '21_entertainment'),
 ('17', '30', 'M', 'programmer', '06355', '30_programmer'),
 ('18', '35', 'F', 'other', '37212', '35_othe