In [1]:
# Let us check if the sparkcontext is present

In [2]:
sc

In [3]:
# Distribute the data - Create a RDD 
lines = sc.textFile("/FileStore/tables/shakespeare.txt")                   

# Create a list with all words, Create tuple (word,1), reduce by key i.e. the word
counts = (lines.flatMap(lambda x: x.split(' '))          
                  .map(lambda x: (x, 1))                 
                  .reduceByKey(lambda x,y : x + y))

# get the output on local
output = counts.take(10)                                 
# print output
for (word, count) in output:                             
    print("%s: %i" % (word, count))

# Functional Programming with Python

In [5]:
#MAP

my_list = [1,2,3,4,5,6,7,8,9,10]
# Lets say I want to square each term in my_list.
squared_list = map(lambda x:x**2,my_list)
print(list(squared_list))

In [6]:
def squared(x):
    return x**2
my_list = [1,2,3,4,5,6,7,8,9,10]
# Lets say I want to square each term in my_list.
squared_list = map(squared,my_list)
print(list(squared_list))


In [7]:
#Filter

my_list = [1,2,3,4,5,6,7,8,9,10]
# Lets say I want only the even numbers in my list.
filtered_list = filter(lambda x:x%2==0,my_list)
print(list(filtered_list))

In [8]:
#reduce
import functools
my_list = [1,2,3,4,5]
# Lets say I want to sum all elements in my list.
sum_list = functools.reduce(lambda x,y:x+y,my_list)
print(sum_list)

# sc.parallelize usage

In [10]:
data = [1,2,3,4,5,6,7,8,9,10]
new_rdd = sc.parallelize(data,4)
new_rdd

# Understanding Transformations

### 1.Map

In [13]:
data = [1,2,3,4,5,6,7,8,9,10]
rdd = sc.parallelize(data,4)
squared_rdd = rdd.map(lambda x:x**2)
squared_rdd.collect()

## 2.Filter

In [15]:
data = [1,2,3,4,5,6,7,8,9,10]
rdd = sc.parallelize(data,4)
filtered_rdd = rdd.filter(lambda x:x%2==0)
filtered_rdd.collect()

## 3.distinct

In [17]:
data = [1,2,2,2,2,3,3,3,3,4,5,6,7,7,7,8,8,8,9,10]
rdd = sc.parallelize(data,4)
distinct_rdd = rdd.distinct()
distinct_rdd.collect()

## 4.flatmap

In [19]:
data = [1,2,3,4]
rdd = sc.parallelize(data,4)
flat_rdd = rdd.flatMap(lambda x:[x,x**3])
flat_rdd.collect()

##5.reducebykey

In [21]:
data = [('Apple','Fruit',200),('Banana','Fruit',24),('Tomato','Fruit',56),('Potato','Vegetable',103),('Carrot','Vegetable',34)]
rdd = sc.parallelize(data,4)

In [22]:
category_price_rdd = rdd.map(lambda x: (x[1],x[2]))
category_price_rdd.collect()

In [23]:
category_total_price_rdd = category_price_rdd.reduceByKey(lambda x,y:x+y)
category_total_price_rdd.collect()

## 6. GroupByKey

In [25]:
data = [('Apple','Fruit',200),('Banana','Fruit',24),('Tomato','Fruit',56),('Potato','Vegetable',103),('Carrot','Vegetable',34)]
rdd = sc.parallelize(data,4)
category_product_rdd = rdd.map(lambda x: (x[1],x[0]))
category_product_rdd.collect()

In [26]:
grouped_products_by_category_rdd = category_product_rdd.groupByKey()
findata = grouped_products_by_category_rdd.collect()
for data in findata:
    print(data[0],list(data[1]))

# Actions

## 1. reduce

In [29]:
rdd = sc.parallelize([1,2,3,4,5])
rdd.reduce(lambda x,y : x+y)

## 2.take

In [31]:
rdd = sc.parallelize([1,2,3,4,5])
rdd.take(3)

## 3.takeOrdered

In [33]:
rdd = sc.parallelize([5,3,12,23])
# descending order
rdd.takeOrdered(3,lambda s:-1*s)


In [34]:
rdd = sc.parallelize([(5,23),(3,34),(12,344),(23,29)])
# descending order
rdd.takeOrdered(3,lambda s:-1*s[1])

# Spark in Action

In [36]:
userRDD = sc.textFile("/FileStore/tables/u.user") 
ratingRDD = sc.textFile("/FileStore/tables/u.data") 
movieRDD = sc.textFile("/FileStore/tables/u.item") 
print("userRDD:",userRDD.take(1))
print("ratingRDD:",ratingRDD.take(1))
print("movieRDD:",movieRDD.take(1))

## 25 most rated movie titles

In [38]:
# Create a RDD from RatingRDD that only contains the two columns of interest i.e. movie_id,rating.
RDD_movid_rating = ratingRDD.map(lambda x : (x.split("\t")[1],x.split("\t")[2]))
print("RDD_movid_rating:",RDD_movid_rating.take(4))

# Create a RDD from MovieRDD that only contains the two columns of interest i.e. movie_id,title.
RDD_movid_title = movieRDD.map(lambda x : (x.split("|")[0],x.split("|")[1]))
print("RDD_movid_title:",RDD_movid_title.take(2))

# merge these two pair RDDs based on movie_id. For this we will use the transformation leftOuterJoin()
rdd_movid_title_rating = RDD_movid_rating.leftOuterJoin(RDD_movid_title)
print("rdd_movid_title_rating:",rdd_movid_title_rating.take(1))

# use the RDD in previous step to create (movie,1) tuple pair RDD
rdd_title_rating = rdd_movid_title_rating.map(lambda x: (x[1][1],1 ))
print("rdd_title_rating:",rdd_title_rating.take(2))

# Use the reduceByKey transformation to reduce on the basis of movie_title
rdd_title_ratingcnt = rdd_title_rating.reduceByKey(lambda x,y: x+y)
print("rdd_title_ratingcnt:",rdd_title_ratingcnt.take(2))

# Get the final answer by using takeOrdered Transformation
print("#####################################")
print("25 most rated movies:",rdd_title_ratingcnt.takeOrdered(25,lambda x:-x[1]))
print("#####################################")

We could have done all this in a single command

In [40]:
print (((ratingRDD.map(lambda x : (x.split("\t")[1],x.split("\t")[2]))).
     leftOuterJoin(movieRDD.map(lambda x : (x.split("|")[0],x.split("|")[1])))).
     map(lambda x: (x[1][1],1)).
     reduceByKey(lambda x,y: x+y).
     takeOrdered(25,lambda x:-x[1]))



Find the most highly rated 25 movies using the same dataset. We actually want only those movies which have been rated at least 100 times

In [42]:
# We already have the RDD rdd_movid_title_rating: [(u'429', (u'5', u'Day the Earth Stood Still, The (1951)'))]
# We create an RDD that contains sum of all the ratings for a particular movie

rdd_title_ratingsum = (rdd_movid_title_rating.
                        map(lambda x: (x[1][1],int(x[1][0]))).
                        reduceByKey(lambda x,y:x+y))
                        
print("rdd_title_ratingsum:",rdd_title_ratingsum.take(2))

# Merge this data with the RDD rdd_title_ratingcnt we created in the last step
# And use Map function to divide ratingsum by rating count.

rdd_title_ratingmean_rating_count = (rdd_title_ratingsum.
                                    leftOuterJoin(rdd_title_ratingcnt).
                                    map(lambda x:(x[0],(float(x[1][0])/x[1][1],x[1][1]))))
                                    
print("rdd_title_ratingmean_rating_count:",rdd_title_ratingmean_rating_count.take(1))

# We could use take ordered here only but we want to only get the movies which have count
# of ratings more than or equal to 100 so lets filter the data RDD.
rdd_title_rating_rating_count_gt_100 = (rdd_title_ratingmean_rating_count.
                                        filter(lambda x: x[1][1]>=100))
                                        
print("rdd_title_rating_rating_count_gt_100:",rdd_title_rating_rating_count_gt_100.take(1))

# Get the final answer by using takeOrdered Transformation
print("#####################################")
print ("25 highly rated movies:")
print(rdd_title_rating_rating_count_gt_100.takeOrdered(25,lambda x:-x[1][0]))
print("#####################################")

#Spark DataFrames

## 1.Reading file

In [45]:
ratings = spark.read.load("/FileStore/tables/u.data",
                     format="csv", sep="\t", inferSchema="true", header="false")

## 2. Show File

In [47]:
ratings.show()

In [48]:
display(ratings)

_c0,_c1,_c2,_c3
196,242,3,881250949
186,302,3,891717742
22,377,1,878887116
244,51,2,880606923
166,346,1,886397596
298,474,4,884182806
115,265,2,881171488
253,465,5,891628467
305,451,3,886324817
6,86,3,883603013


## 3. Change Column names

In [50]:
ratings = ratings.toDF(*['user_id', 'movie_id', 'rating', 'unix_timestamp'])

In [51]:
display(ratings)

user_id,movie_id,rating,unix_timestamp
196,242,3,881250949
186,302,3,891717742
22,377,1,878887116
244,51,2,880606923
166,346,1,886397596
298,474,4,884182806
115,265,2,881171488
253,465,5,891628467
305,451,3,886324817
6,86,3,883603013


## 4. Basic Stats

In [53]:
print(ratings.count()) #// Row Count
print(len(ratings.columns)) #//Column Count

In [54]:
display(ratings.describe())

summary,user_id,movie_id,rating,unix_timestamp
count,100000.0,100000.0,100000.0,100000.0
mean,462.48475,425.53013,3.52986,883528851.48862
stddev,266.61442012750905,330.79835632558473,1.1256735991443214,5343856.189502848
min,1.0,1.0,1.0,874724710.0
max,943.0,1682.0,5.0,893286638.0


##5. Select few columns

In [56]:
display(ratings.select('user_id','movie_id'))

user_id,movie_id
196,242
186,302
22,377
244,51
166,346
298,474
115,265
253,465
305,451
6,86


##6. Filter

In [58]:
display(ratings.filter(ratings.rating == 5))

user_id,movie_id,rating,unix_timestamp
253,465,5,891628467
286,1014,5,879781125
200,222,5,876042340
122,387,5,879270459
38,95,5,892430094
160,234,5,876861185
278,603,5,891295330
287,327,5,875333916
246,201,5,884921594
242,1137,5,879741196


In [59]:
display(ratings.filter((ratings.rating==5) & (ratings.user_id==253)))

user_id,movie_id,rating,unix_timestamp
253,465,5,891628467
253,510,5,891628416
253,183,5,891628341
253,483,5,891628122
253,198,5,891628392
253,127,5,891628060
253,173,5,891628483
253,527,5,891628518
253,117,5,891628535
253,87,5,891628278


## 7. Groupby

In [61]:
from pyspark.sql import functions as F
display(ratings.groupBy("user_id").agg(F.count("user_id"),F.mean("rating")))

user_id,count(user_id),avg(rating)
148,65,4.0
463,133,2.8646616541353382
471,31,3.3870967741935485
496,129,3.031007751937985
833,267,3.056179775280899
243,81,3.641975308641976
392,111,4.045045045045045
540,63,3.7142857142857135
623,45,3.733333333333333
737,33,3.9696969696969697


## 8. Sort

In [63]:
display(ratings.sort("user_id"))

user_id,movie_id,rating,unix_timestamp
1,78,1,878543176
1,212,4,875072895
1,17,3,875073198
1,143,1,875072631
1,151,4,875072865
1,51,4,878543275
1,265,4,878542441
1,20,4,887431883
1,202,5,875072442
1,171,5,889751711


In [64]:
#descending Sort
from pyspark.sql import functions as F
display(ratings.sort(F.desc("user_id")))

user_id,movie_id,rating,unix_timestamp
943,570,1,888640125
943,229,2,888693158
943,76,4,888639523
943,566,4,888639886
943,230,1,888693158
943,188,4,888639269
943,122,1,875502576
943,431,4,888639724
943,356,4,888639598
943,471,5,875502042


## JOINS and Merging with Spark Dataframes

In [66]:
# Let us try to run some SQL on Ratings
ratings.registerTempTable('ratings_table')
newDF = sqlContext.sql('select * from ratings_table where rating>4')
display(newDF)

user_id,movie_id,rating,unix_timestamp
253,465,5,891628467
286,1014,5,879781125
200,222,5,876042340
122,387,5,879270459
38,95,5,892430094
160,234,5,876861185
278,603,5,891295330
287,327,5,875333916
246,201,5,884921594
242,1137,5,879741196


In [67]:
#get one more dataframe to join
movies = spark.read.load("/FileStore/tables/u.item",
                     format="csv", sep="|", inferSchema="true", header="false")
movies = movies.toDF(*["movie_id","movie_title","release_date","video_release_date","IMDb_URL","unknown","Action","Adventure","Animation ","Children","Comedy","Crime","Documentary","Drama","Fantasy","Film_Noir","Horror","Musical","Mystery","Romance","Sci_Fi","Thriller","War","Western"])

In [68]:
display(movies)

movie_id,movie_title,release_date,video_release_date,IMDb_URL,unknown,Action,Adventure,Animation,Children,Comedy,Crime,Documentary,Drama,Fantasy,Film_Noir,Horror,Musical,Mystery,Romance,Sci_Fi,Thriller,War,Western
1,Toy Story (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Toy%20Story%20(1995),0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0
2,GoldenEye (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?GoldenEye%20(1995),0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0
3,Four Rooms (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995),0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0
4,Get Shorty (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995),0,1,0,0,0,1,0,0,1,0,0,0,0,0,0,0,0,0,0
5,Copycat (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Copycat%20(1995),0,0,0,0,0,0,1,0,1,0,0,0,0,0,0,0,1,0,0
6,Shanghai Triad (Yao a yao yao dao waipo qiao) (1995),01-Jan-1995,,http://us.imdb.com/Title?Yao+a+yao+yao+dao+waipo+qiao+(1995),0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0
7,Twelve Monkeys (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Twelve%20Monkeys%20(1995),0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,1,0,0,0
8,Babe (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Babe%20(1995),0,0,0,0,1,1,0,0,1,0,0,0,0,0,0,0,0,0,0
9,Dead Man Walking (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Dead%20Man%20Walking%20(1995),0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0
10,Richard III (1995),22-Jan-1996,,http://us.imdb.com/M/title-exact?Richard%20III%20(1995),0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,1,0


In [69]:
#Let us try joins
movies.registerTempTable('movies_table')
display(sqlContext.sql('select ratings_table.*,movies_table.movie_title from ratings_table left join movies_table on movies_table.movie_id = ratings_table.movie_id'))

user_id,movie_id,rating,unix_timestamp,movie_title
196,242,3,881250949,Kolya (1996)
186,302,3,891717742,L.A. Confidential (1997)
22,377,1,878887116,Heavyweights (1994)
244,51,2,880606923,Legends of the Fall (1994)
166,346,1,886397596,Jackie Brown (1997)
298,474,4,884182806,Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)
115,265,2,881171488,"Hunt for Red October, The (1990)"
253,465,5,891628467,"Jungle Book, The (1994)"
305,451,3,886324817,Grease (1978)
6,86,3,883603013,"Remains of the Day, The (1993)"


In [70]:
# top 25 most rated movies:

In [71]:
mostrateddf = sqlContext.sql('select movie_id,movie_title, count(user_id) as num_ratings from (select ratings_table.*,movies_table.movie_title from ratings_table left join movies_table on movies_table.movie_id = ratings_table.movie_id)A group by movie_id,movie_title order by num_ratings desc ')

display(mostrateddf)

movie_id,movie_title,num_ratings
50,Star Wars (1977),583
258,Contact (1997),509
100,Fargo (1996),508
181,Return of the Jedi (1983),507
294,Liar Liar (1997),485
286,"English Patient, The (1996)",481
288,Scream (1996),478
1,Toy Story (1995),452
300,Air Force One (1997),431
121,Independence Day (ID4) (1996),429


In [72]:
# top 25 highest rated movies having more than 100 votes:

highrateddf = sqlContext.sql('select movie_id,movie_title, avg(rating) as avg_rating,count(movie_id) as num_ratings from (select ratings_table.*,movies_table.movie_title from ratings_table left join movies_table on movies_table.movie_id = ratings_table.movie_id)A group by movie_id,movie_title having num_ratings>100 order by avg_rating desc ')

display(highrateddf)


movie_id,movie_title,avg_rating,num_ratings
408,"Close Shave, A (1995)",4.491071428571429,112
318,Schindler's List (1993),4.466442953020135,298
169,"Wrong Trousers, The (1993)",4.466101694915254,118
483,Casablanca (1942),4.45679012345679,243
64,"Shawshank Redemption, The (1994)",4.445229681978798,283
603,Rear Window (1954),4.3875598086124405,209
12,"Usual Suspects, The (1995)",4.385767790262173,267
50,Star Wars (1977),4.358490566037736,583
178,12 Angry Men (1957),4.344,125
134,Citizen Kane (1941),4.292929292929293,198


In [73]:
display(highrateddf)

movie_id,movie_title,avg_rating,num_ratings
408,"Close Shave, A (1995)",4.491071428571429,112
318,Schindler's List (1993),4.466442953020135,298
169,"Wrong Trousers, The (1993)",4.466101694915254,118
483,Casablanca (1942),4.45679012345679,243
64,"Shawshank Redemption, The (1994)",4.445229681978798,283
603,Rear Window (1954),4.3875598086124405,209
12,"Usual Suspects, The (1995)",4.385767790262173,267
50,Star Wars (1977),4.358490566037736,583
178,12 Angry Men (1957),4.344,125
134,Citizen Kane (1941),4.292929292929293,198


# Converting back and forth from RDD to DF

In [75]:
highratedrdd =highrateddf.rdd
highratedrdd.take(2)

In [76]:
from pyspark.sql import Row
data = [('A',1),('B',2),('C',3),('D',4)]
rdd = sc.parallelize(data)
rdd_new = rdd.map(lambda x: Row(key=x[0], value=int(x[1])))
rdd_as_df = sqlContext.createDataFrame(rdd_new)
display(rdd_as_df)

key,value
A,1
B,2
C,3
D,4
