# 1 Calculating the WordCount of Student dataset

# Create SparkSession and sparkcontext

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder\
                    .master("local")\
                    .appName('Word Count')\
                    .getOrCreate()
sc=spark.sparkContext

# Loading and Reading the dataset

In [3]:
text_file = sc.textFile("/spark/datasets/student.dat")
text_file.collect()

['Ohan sam,123,Mathematics#Physics#Chemistry#Molecular Biology,semseterfee|18000.50#labfee|1000#examfee|2500,011#1023550456']

# Calculating the Word Count

In [4]:
counts = text_file.flatMap(lambda line: line.split(",")) \
                            .flatMap(lambda line: line.split(" ")) \
                            .flatMap(lambda line: line.split("#")) \
                            .flatMap(lambda line: line.split("|")) \
                            .map(lambda word: (word, 1)) \
                           .reduceByKey(lambda x, y: x + y)

# Print the OutPut

In [5]:
output = counts.collect()
for (word, count) in output:
    print("%s: %i" % (word, count))
Word_Count=sqlContext.createDataFrame(output,['Word'],['Count'])
Word_Count.write.format("csv").save("/spark/Word_Count")

Ohan: 1
sam: 1
123: 1
Mathematics: 1
Physics: 1
Chemistry: 1
Molecular: 1
Biology: 1
semseterfee: 1
18000.50: 1
labfee: 1
1000: 1
examfee: 1
2500: 1
011: 1
1023550456: 1


# 2. Top 10 Movie Analysis using RDD

# Read the Movie file

In [6]:
movieFile = sc.textFile("/spark/datasets/movies-csv.item")
moviesHdr = movieFile.first()
moviesWithoutHdr = movieFile.filter(lambda x: x != moviesHdr)
moviesWithoutHdr.collect()

['1,Toy Story (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Toy%20Story%20(1995),0,0,0,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0',
 '2,GoldenEye (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?GoldenEye%20(1995),0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0',
 '3,Four Rooms (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995),0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0',
 '4,Get Shorty (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995),0,1,0,0,0,1,0,0,1,0,0,0,0,0,0,0,0,0,0',
 '5,Copycat (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Copycat%20(1995),0,0,0,0,0,0,1,0,1,0,0,0,0,0,0,0,1,0,0',
 '6,Shanghai Triad (Yao a yao yao dao waipo qiao) (1995),01-Jan-1995,,http://us.imdb.com/Title?Yao+a+yao+yao+dao+waipo+qiao+(1995),0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0',
 '7,Twelve Monkeys (1995),01-Jan-1995,,http://us.imdb.com/M/title-exact?Twelve%20Monkeys%20(1995),0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,1,0,0,0',
 '8,Babe (1995),01-Jan-1995,,http://us.imdb.com/M/title

# Remove Header

In [7]:
movieIdTitle = moviesWithoutHdr.map(lambda x: (x.split(",")[0], x.split(",")[1]))

# Read the Rating FIle

In [8]:
ratingFile = sc.textFile("/spark/datasets/rating-csv.data")
ratingHdr = ratingFile.first()
ratingWithoutHdr = ratingFile.filter(lambda x: x != ratingHdr)
ratingWithoutHdr.collect()

['196,242,3,881250949',
 '186,302,3,891717742',
 '22,377,1,878887116',
 '244,51,2,880606923',
 '166,346,1,886397596',
 '298,474,4,884182806',
 '115,265,2,881171488',
 '253,465,5,891628467',
 '305,451,3,886324817',
 '6,86,3,883603013',
 '62,257,2,879372434',
 '286,1014,5,879781125',
 '200,222,5,876042340',
 '210,40,3,891035994',
 '224,29,3,888104457',
 '303,785,3,879485318',
 '122,387,5,879270459',
 '194,274,2,879539794',
 '291,1042,4,874834944',
 '234,1184,2,892079237',
 '119,392,4,886176814',
 '167,486,4,892738452',
 '299,144,4,877881320',
 '291,118,2,874833878',
 '308,1,4,887736532',
 '95,546,2,879196566',
 '38,95,5,892430094',
 '102,768,2,883748450',
 '63,277,4,875747401',
 '160,234,5,876861185',
 '50,246,3,877052329',
 '301,98,4,882075827',
 '225,193,4,879539727',
 '290,88,4,880731963',
 '97,194,3,884238860',
 '157,274,4,886890835',
 '181,1081,1,878962623',
 '278,603,5,891295330',
 '276,796,1,874791932',
 '7,32,4,891350932',
 '10,16,4,877888877',
 '284,304,4,885329322',
 '201,979,2

In [9]:
ratingsMap = ratingWithoutHdr.map(lambda x:(int(x.split(",")[1]),float(x.split(",")[2])))
aggRatings = ratingsMap.aggregateByKey((0.0,0),lambda x,y:(x[0]+y, x[1]+1),lambda x,y:(x[0]+y[0], x[1]+y[1]))
rddOut = aggRatings.map(lambda x: (str(x[0]), round(x[1][0]/x[1][1], 2)))
rddOut.collect()

[('242', 3.99),
 ('302', 4.16),
 ('377', 2.15),
 ('51', 3.46),
 ('346', 3.64),
 ('474', 4.25),
 ('265', 3.86),
 ('465', 3.56),
 ('451', 3.35),
 ('86', 3.94),
 ('257', 3.75),
 ('1014', 3.06),
 ('222', 3.66),
 ('40', 2.89),
 ('29', 2.67),
 ('785', 3.15),
 ('387', 3.38),
 ('274', 3.5),
 ('1042', 3.14),
 ('1184', 2.5),
 ('392', 3.54),
 ('486', 3.8),
 ('144', 3.87),
 ('118', 3.22),
 ('1', 3.88),
 ('546', 3.03),
 ('95', 3.81),
 ('768', 3.08),
 ('277', 3.46),
 ('234', 3.77),
 ('246', 3.94),
 ('98', 4.29),
 ('193', 3.92),
 ('88', 3.54),
 ('194', 4.06),
 ('1081', 2.75),
 ('603', 4.39),
 ('796', 3.08),
 ('32', 3.79),
 ('16', 3.21),
 ('304', 3.54),
 ('979', 3.2),
 ('564', 2.04),
 ('327', 3.38),
 ('201', 3.52),
 ('1137', 3.97),
 ('241', 3.55),
 ('4', 3.55),
 ('332', 3.46),
 ('100', 4.16),
 ('432', 3.77),
 ('322', 3.09),
 ('181', 4.01),
 ('196', 3.92),
 ('679', 3.05),
 ('384', 2.78),
 ('143', 3.77),
 ('423', 3.83),
 ('515', 4.2),
 ('20', 3.42),
 ('288', 3.44),
 ('219', 3.17),
 ('526', 3.83),
 ('919

# Join MovieRDD and RatingRDD

In [10]:
joinedRdd = movieIdTitle.join(rddOut)
joinedRdd.take(10)

[('1', ('Toy Story (1995)', 3.88)),
 ('4', ('Get Shorty (1995)', 3.55)),
 ('8', ('Babe (1995)', 4.0)),
 ('9', ('Dead Man Walking (1995)', 3.9)),
 ('10', ('Richard III (1995)', 3.83)),
 ('12', ('Usual Suspects', 4.39)),
 ('14', ('Postino', 3.97)),
 ('16', ('French Twist (Gazon maudit) (1995)', 3.21)),
 ('17', ('From Dusk Till Dawn (1996)', 3.12)),
 ('19', ("Antonia's Line (1995)", 3.96))]

In [11]:
movieRating = joinedRdd.map(lambda x: (x[1][0], x[1][1]))
movieRatingSort = movieRating.sortBy( lambda x: x[1],False)
movieRatingSort.take(10)
movieRatingSort.write.format("csv").save("/spark/MovieDF")
movieRatingSort.saveAsTextFile("/spark/MovieRDD")

[('Great Day in Harlem', 5.0),
 ('Marlene Dietrich: Shadow and Light (1996) ', 5.0),
 ('Star Kid (1997)', 5.0),
 ('Saint of Fort Washington', 5.0),
 ("Someone Else's America (1995)", 5.0),
 ('They Made Me a Criminal (1939)', 5.0),
 ('Prefontaine (1997)', 5.0),
 ('Santa with Muscles (1996)', 5.0),
 ('Aiqing wansui (1994)', 5.0),
 ('Entertaining Angels: The Dorothy Day Story (1996)', 5.0)]

# 3. Top 10 movies analysis using DataFrames

In [12]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import count ,col ,sum, explode, date_format ,from_utc_timestamp, from_unixtime
from pyspark.sql import functions as f

In [13]:
spark = SparkSession.builder \
            .master("local[*]") \
            .appName("DataFrame_Movies") \
            .getOrCreate()

In [14]:
ratingsDF = spark.read.format("csv")\
        .options(header = True)\
        .load("/spark/datasets/rating-csv.data")
ratingsDF.printSchema()
ratingsDF.show(5)

root
 |-- cid: string (nullable = true)
 |-- mid: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- stamp: string (nullable = true)

+---+---+------+---------+
|cid|mid|rating|    stamp|
+---+---+------+---------+
|196|242|     3|881250949|
|186|302|     3|891717742|
| 22|377|     1|878887116|
|244| 51|     2|880606923|
|166|346|     1|886397596|
+---+---+------+---------+
only showing top 5 rows



In [15]:
moviesDF = spark.read.format("csv")\
        .options(header = True)\
        .load("/spark/datasets/movies-csv.item")
moviesDF.printSchema()
moviesDF.show(5)

root
 |-- mid: string (nullable = true)
 |-- title: string (nullable = true)
 |-- releasedt: string (nullable = true)
 |-- empty: string (nullable = true)
 |-- imdburl: string (nullable = true)
 |-- r1: string (nullable = true)
 |-- r2: string (nullable = true)
 |-- r3: string (nullable = true)
 |-- r4: string (nullable = true)
 |-- r5: string (nullable = true)
 |-- r6: string (nullable = true)
 |-- r: string (nullable = true)
 |-- r8: string (nullable = true)
 |-- r9: string (nullable = true)
 |-- r10: string (nullable = true)
 |-- r11: string (nullable = true)
 |-- r12: string (nullable = true)
 |-- r13: string (nullable = true)
 |-- r14: string (nullable = true)
 |-- r15: string (nullable = true)
 |-- r16: string (nullable = true)
 |-- r17: string (nullable = true)
 |-- r18: string (nullable = true)
 |-- r19: string (nullable = true)

+---+-----------------+-----------+-----+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|mid|      

In [16]:
joinDF = ratingsDF.join(moviesDF,ratingsDF.mid == moviesDF.mid, "inner")
joinDF = ratingsDF.alias("r").join(moviesDF.alias("m"),f.col("r.mid") == f.col("m.mid"), "inner")

In [17]:
grpDF = joinDF.groupBy(f.col("title"))\
        .agg(f.round(f.avg("rating"),2).alias("Avg_Ratings"))\
        .orderBy(f.col("Avg_Ratings").desc())
grpDF.show(10,truncate=False)
grpDF.write.csv("spark/MovieDF")

+-------------------------------------------------+-----------+
|title                                            |Avg_Ratings|
+-------------------------------------------------+-----------+
|They Made Me a Criminal (1939)                   |5.0        |
|Prefontaine (1997)                               |5.0        |
|Someone Else's America (1995)                    |5.0        |
|Entertaining Angels: The Dorothy Day Story (1996)|5.0        |
|Star Kid (1997)                                  |5.0        |
|Santa with Muscles (1996)                        |5.0        |
|Aiqing wansui (1994)                             |5.0        |
|Marlene Dietrich: Shadow and Light (1996)        |5.0        |
|Saint of Fort Washington                         |5.0        |
|Great Day in Harlem                              |5.0        |
+-------------------------------------------------+-----------+
only showing top 10 rows



# Finding top 3 and least 10 movie reviewers

In [18]:
review = ratingsDF.select(col('cid'))
groupedData = review.groupBy('cid').count()
sortedDatades = groupedData.orderBy(col('count').asc())
sortedDatades.show(3)
sortedDatades.write.format("csv").save("/spark/least3_MovieReviewers")

+---+-----+
|cid|count|
+---+-----+
|475|   20|
|740|   20|
|926|   20|
+---+-----+
only showing top 3 rows



In [19]:
sortedDataasc = groupedData.orderBy(col('count').desc())
sortedDataasc.show(10)
sortedDataasc.write.format("csv").save("/spark/Top10_MovieReviewers")

+---+-----+
|cid|count|
+---+-----+
|405|  737|
|655|  685|
| 13|  636|
|450|  540|
|276|  518|
|416|  493|
|537|  490|
|303|  484|
|234|  480|
|393|  448|
+---+-----+
only showing top 10 rows

