# 006 Find the film(s) with the largest cast. Return the movie title and the size of the cast. By "cast size" we mean the number of distinct actors that played in that movie: if an actor played multiple roles, or if it simply occurs multiple times in casts, we still count her/him only once.

In [32]:
spark

org.apache.spark.sql.SparkSession@3a9f69c6

In [2]:
import org.apache.spark.sql.functions._

In [3]:
import org.apache.spark.sql.types._

In [4]:
import org.apache.spark.sql.expressions.Window

In [5]:
val spark=SparkSession.builder().appName("movies app").master("local[*]").getOrCreate()

spark = org.apache.spark.sql.SparkSession@3a9f69c6


org.apache.spark.sql.SparkSession@3a9f69c6

In [6]:
import spark.implicits._

# UDFs

In [7]:
val getYear=udf((year:String)=>{
    val p="\\d{4}".r
    val yr=p.findFirstMatchIn(year).getOrElse("").toString
    if(yr!=""){
        yr.toInt
    }
    else{
        0
    }
})

getYear = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType)))


UserDefinedFunction(<function1>,IntegerType,Some(List(StringType)))

# loading movies

In [8]:
val Movie=spark.read.option("header","true").csv("/home/use2cobadmin/practice_data/imdb/imdb_csv/Movie.csv")
.dropDuplicates()

Movie = [index: string, MID: string ... 4 more fields]


[index: string, MID: string ... 4 more fields]

# printing table structure to know what are the columns and it's data type

In [9]:
Movie.printSchema

root
 |-- index: string (nullable = true)
 |-- MID: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- num_votes: string (nullable = true)



# Displaying first two records to see sample data
# observation 1: year,rating,num_votes,index columns data types should be Integer
# observation 2: also we need do trim to remove any extra spaces

In [10]:
Movie.show(2)

+-----+---------+-----------------+----+------+---------+
|index|      MID|            title|year|rating|num_votes|
+-----+---------+-----------------+----+------+---------+
|   85|tt3224288|Beyond the Clouds|2017|   7.0|     1123|
|  913|tt2178508|   Son of Sardaar|2012|   4.0|     7895|
+-----+---------+-----------------+----+------+---------+
only showing top 2 rows



# displaying total records in the table

In [11]:
Movie.count

3475

# as per observation need to change the data type

In [12]:
val movies=Movie.withColumn("year",getYear(Movie("year")).cast(IntegerType))
.withColumn("title",trim(Movie("title")))
.withColumn("MID",trim(Movie("MID")))
.withColumn("rating",trim(Movie("rating")).cast(DoubleType))
.withColumn("num_votes",trim(Movie("num_votes")).cast(LongType))
.withColumn("index",trim(Movie("index")).cast(IntegerType))
.dropDuplicates("MID")

movies = [index: int, MID: string ... 4 more fields]


[index: int, MID: string ... 4 more fields]

# now print the schema to verify columns and it's data type

In [13]:
movies.printSchema

root
 |-- index: integer (nullable = true)
 |-- MID: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = false)
 |-- rating: double (nullable = true)
 |-- num_votes: long (nullable = true)



# print the total number of records in the table after data cleaning

In [14]:
movies.count

3475

# show the first two records as a sample data

In [15]:
movies.show(2)

+-----+---------+--------------------+----+------+---------+
|index|      MID|               title|year|rating|num_votes|
+-----+---------+--------------------+----+------+---------+
| 3008|tt0036077|              Kismet|1943|   7.5|       71|
| 2969|tt0102701|Prahaar: The Fina...|1991|   7.9|     1696|
+-----+---------+--------------------+----+------+---------+
only showing top 2 rows



# Loading and cleaning M_Cast table

In [27]:
val M_Cast=spark.read.option("header","true").csv("/home/use2cobadmin/practice_data/imdb/imdb_csv/M_Cast.csv")

M_Cast = [index: string, MID: string ... 2 more fields]


[index: string, MID: string ... 2 more fields]

# Sample data from M_Cast table

In [28]:
M_Cast.printSchema
M_Cast.show(2)
val m_c1Count=M_Cast.count

root
 |-- index: string (nullable = true)
 |-- MID: string (nullable = true)
 |-- PID: string (nullable = true)
 |-- ID: string (nullable = true)

+-----+---------+----------+---+
|index|      MID|       PID| ID|
+-----+---------+----------+---+
|    0|tt2388771| nm0000288|  0|
|    1|tt2388771| nm0000949|  1|
+-----+---------+----------+---+
only showing top 2 rows



m_c1Count = 82837


82837

# Cleaning data m_cast

In [29]:
val m_cast=M_Cast.withColumn("ID",trim(col("ID")).cast(LongType))
.withColumn("index",trim(col("index")).cast(LongType))
.withColumn("MID",trim(col("MID")))
.withColumn("PID",trim(col("PID")))
.dropDuplicates("MID","PID")
//.dropDuplicates("MID","PID","ID")
.dropDuplicates("ID")
.dropDuplicates()

m_cast = [index: bigint, MID: string ... 2 more fields]


[index: bigint, MID: string ... 2 more fields]

# Sample data after data cleaning for m_cast

In [30]:
m_cast.printSchema
m_cast.show(2)
val m_c2Count=m_cast.count

root
 |-- index: long (nullable = true)
 |-- MID: string (nullable = true)
 |-- PID: string (nullable = true)
 |-- ID: long (nullable = true)

+-----+---------+----------+---+
|index|      MID|       PID| ID|
+-----+---------+----------+---+
|   26|tt2388771|nm10302077| 26|
|   29|tt2388771|nm10302080| 29|
+-----+---------+----------+---+
only showing top 2 rows



m_c2Count = 82837


82837

# Print difference of total count after and before data cleaning

In [33]:
val diff_m_cast_count=m_c1Count-m_c2Count
println(diff_m_cast_count+" d")

0 d


diff_m_cast_count = 0


0

In [56]:
val finalResult=m_cast.dropDuplicates("MID","PID").groupBy("MID").count().join(movies,"MID").select("MID","title","count").sort(desc("count"))
finalResult.show

+---------+--------------------+-----+
|      MID|               title|count|
+---------+--------------------+-----+
|tt5164214|       Ocean's Eight|  238|
|tt0451631|            Apaharan|  233|
|tt6173990|                Gold|  215|
|tt1188996|     My Name Is Khan|  213|
|tt3498820|Captain America: ...|  191|
|tt1981128|            Geostorm|  170|
|tt1573482|             Striker|  165|
|tt1190080|                2012|  154|
|tt2120120|              Pixels|  144|
|tt2510874|Yamla Pagla Deewa...|  140|
|tt0848228|        The Avengers|  138|
|tt4559046|         Housefull 3|  129|
|tt3495026|                 Fan|  127|
|tt0215196|     Split Wide Open|  126|
|tt3863552|   Bajrangi Bhaijaan|  124|
|tt2575290|       Train Station|  122|
|tt6926486|               Daddy|  121|
|tt1647668|  Million Dollar Arm|  117|
|tt0086034|           Octopussy|  116|
|tt1833673|             Dhoom:3|  115|
+---------+--------------------+-----+
only showing top 20 rows



finalResult = [MID: string, title: string ... 1 more field]


[MID: string, title: string ... 1 more field]

# validation 1

In [52]:
movies.filter($"MID"==="tt2355921").show
m_cast.filter($"MID"==="tt2355921").show

+-----+---------+-------------+----+------+---------+
|index|      MID|        title|year|rating|num_votes|
+-----+---------+-------------+----+------+---------+
| 3123|tt2355921|The Wish Fish|2012|   3.4|       60|
+-----+---------+-------------+----+------+---------+

+-----+---------+----+-----+
|index|      MID| PID|   ID|
+-----+---------+----+-----+
|77014|tt2355921|null|77014|
+-----+---------+----+-----+



In [53]:
m_cast.groupBy("MID").count().join(movies,"MID").select("MID","title","count").sort(desc("count")).explain

== Physical Plan ==
*(10) Sort [count#1577L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(count#1577L DESC NULLS LAST, 200)
   +- *(9) Project [MID#485, title#71, count#1577L]
      +- *(9) BroadcastHashJoin [MID#485], [MID#78], Inner, BuildRight
         :- *(9) HashAggregate(keys=[MID#485], functions=[count(1)])
         :  +- Exchange hashpartitioning(MID#485, 200)
         :     +- *(4) HashAggregate(keys=[MID#485], functions=[partial_count(1)])
         :        +- *(4) HashAggregate(keys=[index#480L, MID#485, PID#490, ID#475L], functions=[])
         :           +- *(4) HashAggregate(keys=[index#480L, MID#485, PID#490, ID#475L], functions=[])
         :              +- *(4) Filter isnotnull(MID#485)
         :                 +- SortAggregate(key=[ID#475L], functions=[first(index#480L, false), first(MID#485, false), first(PID#490, false)])
         :                    +- *(3) Sort [ID#475L ASC NULLS FIRST], false, 0
         :                       +- Exchange hashpart

In [54]:
m_cast.groupBy("MID").count().join(movies.select("MID","title"),"MID")
.select("MID","title","count").sort(desc("count")).explain

== Physical Plan ==
*(10) Sort [count#1632L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(count#1632L DESC NULLS LAST, 200)
   +- *(9) Project [MID#485, title#71, count#1632L]
      +- *(9) BroadcastHashJoin [MID#485], [MID#78], Inner, BuildRight
         :- *(9) HashAggregate(keys=[MID#485], functions=[count(1)])
         :  +- Exchange hashpartitioning(MID#485, 200)
         :     +- *(4) HashAggregate(keys=[MID#485], functions=[partial_count(1)])
         :        +- *(4) HashAggregate(keys=[index#480L, MID#485, PID#490, ID#475L], functions=[])
         :           +- *(4) HashAggregate(keys=[index#480L, MID#485, PID#490, ID#475L], functions=[])
         :              +- *(4) Filter isnotnull(MID#485)
         :                 +- SortAggregate(key=[ID#475L], functions=[first(index#480L, false), first(MID#485, false), first(PID#490, false)])
         :                    +- *(3) Sort [ID#475L ASC NULLS FIRST], false, 0
         :                       +- Exchange hashpart