In [1]:
from pyspark import SparkContext

# Initialize SparkContext
sc = SparkContext(master="local", appName="RDD")

In [2]:

# Create the list of strings
data = [
    'Warm summer sun,',
    'Shine kindly here,',
    'Warm southern wind,',
    'Blow softly here.',
    'Green sod above,',
    'Lie light, lie light.',
    'Good night, dear heart,',
    'Good night, good night.'
]

# Create the RDD
text_rdd = sc.parallelize(data)

In [3]:
text_rdd.collect()

['Warm summer sun,',
 'Shine kindly here,',
 'Warm southern wind,',
 'Blow softly here.',
 'Green sod above,',
 'Lie light, lie light.',
 'Good night, dear heart,',
 'Good night, good night.']

In [4]:
text_rdd.count()

8

In [7]:
text_rdd.flatMap(lambda line: str.lower(line).split(r" "))\
.map(lambda word: (word, 1))\
.groupByKey()\
.mapValues(lambda value: sum(value))\
.collect()

[('warm', 2),
 ('summer', 1),
 ('sun,', 1),
 ('shine', 1),
 ('kindly', 1),
 ('here,', 1),
 ('southern', 1),
 ('wind,', 1),
 ('blow', 1),
 ('softly', 1),
 ('here.', 1),
 ('green', 1),
 ('sod', 1),
 ('above,', 1),
 ('lie', 2),
 ('light,', 1),
 ('light.', 1),
 ('good', 3),
 ('night,', 2),
 ('dear', 1),
 ('heart,', 1),
 ('night.', 1)]

### MovieLens Join of Data Using RDD

In [8]:
ls data/movieslens/*

data/movieslens/links.csv    data/movieslens/ratings.csv
data/movieslens/movies.csv   data/movieslens/ratings.json
data/movieslens/movies.json  data/movieslens/tags.csv


In [9]:
movies = sc\
.textFile("data/movieslens/movies.csv")\
.filter(lambda line: not line.startswith("movieId"))\
.map(lambda line: line.split(","))\
.map(lambda tokens: (tokens[0], tokens[1]))

for line in movies.take(10):
    print(line)

('1', 'Toy Story (1995)')
('2', 'Jumanji (1995)')
('3', 'Grumpier Old Men (1995)')
('4', 'Waiting to Exhale (1995)')
('5', 'Father of the Bride Part II (1995)')
('6', 'Heat (1995)')
('7', 'Sabrina (1995)')
('8', 'Tom and Huck (1995)')
('9', 'Sudden Death (1995)')
('10', 'GoldenEye (1995)')


In [11]:
ratings = sc\
.textFile("data/movieslens/ratings.csv")\
.filter(lambda line: not line.startswith("userId"))\
.map(lambda line: line.split(","))\
.map(lambda tokens: (tokens[1], float(tokens[2])))

for line in ratings.take(10):
    print(line)

('31', 2.5)
('1029', 3.0)
('1061', 3.0)
('1129', 2.0)
('1172', 4.0)
('1263', 2.0)
('1287', 2.0)
('1293', 2.0)
('1339', 3.5)
('1343', 2.0)


In [12]:
ratings_avg = ratings.groupByKey().mapValues(lambda values: sum(values) / len(values))

for p in ratings_avg.take(10):
    print(p)


('31', 3.1785714285714284)
('1029', 3.7023809523809526)
('1061', 3.5454545454545454)
('1129', 3.3125)
('1172', 4.260869565217392)
('1263', 3.8645833333333335)
('1287', 3.891304347826087)
('1293', 3.9782608695652173)
('1339', 3.298076923076923)
('1343', 3.7435897435897436)


In [13]:
for p in ratings_avg.join(movies).take(10):
    print(p)

('1129', (3.3125, 'Escape from New York (1981)'))
('1293', (3.9782608695652173, 'Gandhi (1982)'))
('1339', (3.298076923076923, "Dracula (Bram Stoker's Dracula) (1992)"))
('1405', (3.032608695652174, 'Beavis and Butt-Head Do America (1996)'))
('2105', (3.478723404255319, 'Tron (1982)'))
('2150', (3.513888888888889, '"Gods Must Be Crazy'))
('2455', (3.393617021276596, '"Fly'))
('10', (3.4508196721311477, 'GoldenEye (1995)'))
('17', (3.9244186046511627, 'Sense and Sensibility (1995)'))
('50', (4.370646766169155, '"Usual Suspects'))
