In [1]:
# analysis in Databricks; Python used;
# data source:
# https://grouplens.org/datasets/movielens/

# file structure:
# columns: movieId, title, genres
# sc - SparkContext - built into Databricks, no need for import

rawMovies = sc.textFile("/FileStore/tables/movies.dat") 
rawMovies.take(2)
# ['1@Toy Story (1995)@Adventure|Animation|Children|Comedy|Fantasy',
# '2@Jumanji (1995)@Adventure|Children|Fantasy']

In [2]:
# raw.flatMap(lambda x: x.split("@")).take(3)
# ['1', 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy']

In [3]:
# 'map' - applies a function to all rows of RDD (can't be flatMap)
# @ - delimiter
moviesData = rawMovies.map(lambda x: x.split("@"))

In [4]:
moviesData.take(3)

In [5]:
moviesData.take(3)[-1]

In [6]:
# counting empty records
moviesData.filter(lambda x: "" in x).count()

In [7]:
# counting rows:
moviesData.map(lambda x: x).count()
# 10681

In [8]:
moviesData.flatMap(lambda row: [j for i,x in enumerate(row) if x == ''])\
.distinct().collect()

In [9]:
moviesData.first()
# ['1', 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy']

In [10]:
# counting genre selection
moviesData.map(lambda x: (x[2], 1))\
.reduceByKey(lambda x,y: x+y).count()
# 797

In [11]:
# ==========================================================

In [12]:
# load ratings data (0.2 GB)
rawRatings = sc.textFile("/FileStore/tables/ratings.dat") 
rawRatings.take(1)
# ['1@122@5@838985046']

In [13]:
# ratings.dat file structure:
# userId, movieId, rating, timestamp
# indeksy: 0, 1, 2, 3

In [14]:
# splitting
rawRatings.map(lambda x: x.split("@")).take(1)
# [['1', '122', '5', '838985046']

In [15]:
ratingsData = rawRatings.map(lambda x: x.split("@"))
ratingsData.take(1)
# [['1', '122', '5', '838985046']

# columns: userId, movieId, rating, timestamp

# count rows
ratingsData.map(lambda x: x).count()
# 10000054

In [16]:
# counting users (userId is index 0)
ratingsData.map(lambda x: (x[0], 1))\
.reduceByKey(lambda x,y: x+y)\
.count()
# 69878
# or
# ratingsData.map(lambda x: x[0]).distinct().count()

In [17]:
# counting movies; movieId has index 1
ratingsData.map(lambda x: x[1]).distinct().count()
# 10677

In [18]:
# counting userID [0], 
ratingsData.map(lambda x: (x[0], 1))\
.reduceByKey(lambda x,y: x+y)\
.take(3)

In [19]:
# how many rating [2] category  - 10
# DISTINCT
ratingsData.map(lambda x: x[2]).distinct().count()
# 10

In [20]:
# number of each rating 1-5 (index 2)
ratingsData.map(lambda x: (x[2], 1))\
.reduceByKey(lambda x,y: x+y)\
.collect()

In [21]:
# sort ratings (ratingId index 0) 1-5:
# 0- userId, 1-movieId, 2-rating, 3-timestamp
ratingsData.map(lambda x: (x[2], 1))\
.reduceByKey(lambda x,y: x+y)\
.sortBy(lambda x: x[0]).collect()
# [('0.5', 94988), ...

In [22]:
# with distinct
# levels of ratings
ratingsData.map(lambda x: (x[2], 1)).distinct().count()
# 10

In [23]:
# soritng numbers in each rating category
ratingsData.map(lambda x: (x[2], 1))\
.reduceByKey(lambda x,y: x+y)\
.sortBy(lambda x: x[1]).collect()
# [('0.5', 94988), ('1.5', 118278) ...

In [24]:
# descending: tuple(number, rating)
ratingsData.map(lambda x: (x[2],1))\
.reduceByKey(lambda x,y: x+y)\
.sortBy(lambda x: -x[1]).collect()
# sorting by 2nd elem of the tuple, descending
# [('4', 2875850), ('3', 2356676), ...

In [25]:
# userId, movieId, rating, timestamp
# max rating
ratingsData.map(lambda x: (x[2],1))\
.reduceByKey(lambda x,y: x+y)\
.sortBy(lambda x: -x[1]).take(1)
# [('4', 2875850)]
# rating '4' is most prominent

In [27]:
# =======================================================

In [28]:
# users:
# id | age | gender | occupation | zip
rawUsers = sc.textFile("/FileStore/tables/users.csv") 
rawUsers.take(2)
# ['1|24|M|technician|85711',
#  '2|53|F|other|94043']

In [29]:
# id | age | gender | occupation | zip
# splitting by delimiter '|', taking second col, counting  genders
usersData = rawUsers.map(lambda x: (x.split('|')[2],1))\
    .reduceByKey( lambda x,y: x+y)
usersData.collect()
# [('M', 670), ('F', 273)]

In [30]:
# id | age | gender | occupation | zip
userData = rawUsers.map(lambda x: x.split("|"))
userData.take(1)
# [['1', '24', 'M', 'technician', '85711']]

In [31]:
# gender count
userData.map(lambda x: (x[2],1)).count()

In [32]:
userData.map(lambda x: (x[2], 1)).distinct().count()
# 2

In [33]:
userData.map(lambda x: (x[2], 1))\
.reduceByKey(lambda x,y: x+y).collect()
# [('M', 670), ('F', 273)]

In [34]:
# min age (age - index 1):
userData.map(lambda x: x[1]).min()
# 10
# max age:
userData.map(lambda x: x[1]).max()
# 73
# both:
[userData.map(lambda x: x[1]).min(),
userData.map(lambda x: x[1]).max()]
# ['10', '73']
# userData.map(lambda x: x[1]).stats()

In [35]:
# id | age | gender | occupation | zip
# age (index 1):

In [36]:
# min age
userData.map(lambda x: (x[1], 1))\
.reduceByKey(lambda x,y: x+y)\
.takeOrdered(5,(lambda x: x[0]))

In [37]:
# columns: id | age | gender | occupation | zip
userData.map(lambda x: (x[1],1))\
.reduceByKey(lambda x,y: x+y)\
.sortBy(lambda x: -x[1]).take(3)
# [('30', 39), ('25', 38), ('22', 37)]
# people aged 30 are the most common

In [38]:
# least common age group
userData.map(lambda x: (x[1],1))\
.reduceByKey(lambda x,y: x+y)\
.sortBy(lambda x: x[1]).take(1)
# [('66', 1)]
# people aged 66 are least common

In [39]:
# 10 least common age group users 
userData.map(lambda x: (x[1],1))\
.reduceByKey(lambda x,y: x+y)\
.sortBy(lambda x: x[1]).take(10)
# [('66', 1), ('10', 1) ...

In [40]:
# most common age group users 
userData.map(lambda x: (x[1],1))\
.reduceByKey(lambda x,y: x+y)\
.sortBy(lambda x: -x[1]).take(10)
# most common users are in their 20-ties

In [41]:
# group the users based on their occupations
# index 3
# counting occupations
userData.map(lambda x: x[3]).distinct().count()
# 21

In [42]:
# numbers in each occupation (index 3)
userData.map(lambda x: (x[3], 1))\
.reduceByKey(lambda x,y: x+y).take(3)
#.collect()

In [43]:
# most common occupation:
userData.map(lambda x: (x[3], 1))\
.reduceByKey(lambda x,y: x+y)\
.sortBy(lambda x: -x[1]).take(10)
# -x[1] - descending order 
# 'student' is most common - 196 in number

In [44]:
# least common occupations:
userData.map(lambda x: (x[3], 1))\
.reduceByKey(lambda x,y: x+y)\
.sortBy(lambda x: x[1]).take(5)
# [('homemaker', 7), ('doctor', 7) ...

In [45]:
# exclude 'other' in occupation column
userData\
.map(lambda x: (x[3], 1))\
.filter(lambda x: x[0] != 'other')\ # filter po kluczu
.reduceByKey(lambda x,y: x+y)\
.sortBy(lambda x: -x[1]).take(8)
# [('student', 196), ...

In [46]:
# id | age | gender | occupation | zip
# number of male students:
userData\
.map(lambda x: (x[3], x[2]))\
.filter(lambda x: x[0] == 'student' and x[1] == 'M')\
.count()
# 136

In [47]:
# number of female students:
userData\
.map(lambda x: (x[3], x[2]))\
.filter(lambda x: x[0] == 'student' and x[1] == 'F')\
.count()
# 60

In [48]:
# id | age | gender | occupation | zip
# programmers' gender:
[userData.map(lambda x: (x[3], x[2]))\
.filter(lambda x: x[0] == 'programmer' and x[1] == 'M')\
.count(),
 userData.map(lambda x: (x[3], x[2]))\
.filter(lambda x: x[0] == 'programmer' and x[1] == 'F')\
.count()]
# [60, 6]

In [49]:
# executives' gender:
[userData.map(lambda x: (x[3], x[2]))\
.filter(lambda x: x[0] == 'executive' and x[1] == 'M')\
.count(),
 userData.map(lambda x: (x[3], x[2]))\
.filter(lambda x: x[0] == 'executive' and x[1] == 'F')\
.count()]
# [29, 3]
# id | age | gender | occupation | zip

In [50]:
# executives' age:
userData.filter(lambda x: x[3] == 'executive')\
.map(lambda x: (x[1],1))\
.reduceByKey(lambda x,y: x+y)\
.sortBy(lambda x: -x[1]).take(10)
# [('26', 4), ('36', 3), ('31', 3) ...

In [51]:
# most common student age:
userData.filter(lambda x: x[3] == 'student')\
.map(lambda x: (x[1],1))\
.reduceByKey(lambda x,y: x+y)\
.sortBy(lambda x: -x[1]).take(10)
# [('20', 25), ('19', 21) ...

In [52]:
# number of student age groups
userData.filter(lambda x: x[3] == 'student')\
.map(lambda x: (x[1],1))\
.reduceByKey(lambda x,y: x+y)\
.sortBy(lambda x: -x[1]).count()

In [53]:
# scientists' age:
userData.filter(lambda x: x[3] == 'scientist')\
.map(lambda x: (x[1],1))\
.reduceByKey(lambda x,y: x+y)\
.sortBy(lambda x: -x[1]).take(10)
# [('39', 4), ('33', 3), ('45', 2), ('40', 2) ...

In [54]:
# 