# Cleaning data into DataFrame

In [7]:

val movies=spark.read.textFile("u.item")
val m_id=movies.map(lines=>lines.split("::")(0)).toDF("MovieID")
val m_title=movies.map(lines=>lines.split("::")(1)).toDF("Title")
val m_genre=movies.map(lines=>lines.split("::")(2)).toDF("Genres")

// For appending the dataframes, we need to import monotonically_increasing_id
import org.apache.spark.sql.functions.monotonically_increasing_id
val m_res1=m_id.withColumn("id", monotonically_increasing_id()).join(m_title.withColumn("id", monotonically_increasing_id()), Seq("id")).drop("id")
val m_result=m_res1.withColumn("id", monotonically_increasing_id()).join(m_genre.withColumn("id", monotonically_increasing_id()), Seq("id")).drop("id")

// This will give us the valid data with schema
m_result.show

// Examples
m_result.where("MovieID=1").show
m_result.filter("Genres == 'Action'").show
m_result.select("Title").show

// Will continue
System.exit(0)

# Top 10 movies

In [6]:
val ratingsRDD=sc.textFile("u.data")
val movies=ratingsRDD.map(line=>line.split("::")(1).toInt)
val movies_pair=movies.map(mv=>(mv,1))

val movies_count=movies_pair.reduceByKey((x,y)=>x+y)
val movies_sorted=movies_count.sortBy(x=>x._2,false,1)

val mv_top10List=movies_sorted.take(10).toList
val mv_top10RDD=sc.parallelize(mv_top10List)

val mv_names=sc.textFile("u.item").map(line=>(line.split("::")(0).toInt,line.split("::")(1)))
val join_out=mv_names.join(mv_top10RDD)
join_out.sortBy(x=>x._2._2,false).map(x=> x._1+","+x._2._1+","+x._2._2).repartition(1).saveAsTextFile("Top-10-CSV")
System.exit(0)

# Movies Sorted according to Genre

In [None]:
val movies_rdd=sc.textFile("u.item")
val genre=movies_rdd.map(lines=>lines.split("::")(2))
val flat_genre=genre.flatMap(lines=>lines.split("\\|"))
val genre_kv=flat_genre.map(k=>(k,1))
val genre_count=genre_kv.reduceByKey((k,v)=>(k+v))
val genre_sort= genre_count.sortByKey()
genre_sort.saveAsTextFile("result-csv")
System.exit(0)

# LIST OF DISTINCT GENRES

In [None]:
  
val movies_rdd=sc.textFile("u.item")
val genres=movies_rdd.map(lines=>lines.split("::")(2))
val testing=genres.flatMap(line=>line.split('|'))
val genres_distinct_sorted=testing.distinct().sortBy(_(0))
genres_distinct_sorted.saveAsTextFile("result")
System.exit(0)

# List of Oldest Movies

In [None]:
  
val movies_rdd=sc.textFile("u.item")
// 1st method, convert existing rdd into DF using toDF function and then make it into a view
val movies_DF=movies_rdd.toDF.createOrReplaceTempView("movies_view")
// To use spark.sql, it should be at least a temporary view or even an table
spark.sql(""" select
split(value,'::')[0] as movieid,
split(value,'::')[1] as moviename,
substring(split(value,'::')[1],length(split(value,'::')[1])-4,4) as year
from movies_view """).createOrReplaceTempView("movies");
// To view the records, use spark.sql("select * from movies").show()
var result=spark.sql("Select * from movies m1 where m1.year=(Select min(m2.year) from movies m2)").repartition(1).rdd.saveAsTextFile("result")
System.exit(0);