## Import Modules and Create Spark Session <a class="anchor" id="import_module"></a>

In [1]:
import org.apache.spark.sql.SparkSession //For Spark Session 
import org.apache.spark.sql.types.{IntegerType, StringType, DoubleType, StructType} //For make a schema

Intitializing Scala interpreter ...

Spark Web UI available at http://Hendry:4040
SparkContext available as 'sc' (version = 3.3.0, master = local[*], app id = local-1667134957473)
SparkSession available as 'spark'


import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, DoubleType, StructType}


## Create Schema <a class="anchor" id="create_schema"></a>

In [3]:
val movies_schema = (new StructType() //make a new class for movies schema
                     .add("Actor",StringType,true) //define column Actor as string
                     .add("Movie Title",StringType,true) //define Movie Title Actor as string
                     .add("Release Year",IntegerType,true)) //define Release Year Actor as string

val movie_ratings_schema = (new StructType() //make a new class for movie_ratings schema
                            .add("Rating",DoubleType,true) //define column Rating as double
                            .add("Movie Title",StringType,true) //define column Movie Title as string
                            .add("Release Year",IntegerType,true)) //define column Release Year as integer

movies_schema: org.apache.spark.sql.types.StructType = StructType(StructField(Actor,StringType,true),StructField(Movie Title,StringType,true),StructField(Release Year,IntegerType,true))
movie_ratings_schema: org.apache.spark.sql.types.StructType = StructType(StructField(Rating,DoubleType,true),StructField(Movie Title,StringType,true),StructField(Release Year,IntegerType,true))


## Read Dataset <a class="anchor" id="read_dataset"></a>

In [4]:
val movies = (spark.read //spark.read implies that we want to read a data
              .options(Map("header"->"true","inferSchema"->"true","delimiter"->"\t")) //read data with header, infer schema, and define the dellimiter as ';'
              .schema(movies_schema) //add a schema that we created before
              .csv("C:\\Users\\isalo\\OneDrive\\Documents\\Tugas Spark\\movies\\movies.tsv")) //define file path

val movies_ratings = (spark.read
                      .options(Map("header"->"true","inferSchema"->"true","delimiter"->"\t"))
                      .schema(movie_ratings_schema)
                      .csv("C:\\Users\\isalo\\OneDrive\\Documents\\Tugas Spark\\movies\\movie-ratings.tsv"))

movies: org.apache.spark.sql.DataFrame = [Actor: string, Movie Title: string ... 1 more field]
movies_ratings: org.apache.spark.sql.DataFrame = [Rating: double, Movie Title: string ... 1 more field]


## Create Temporary Table <a class="anchor" id="create_temp_table"></a>

In [5]:
movies.createOrReplaceTempView("movies") //store movies table into spark session
movies_ratings.createOrReplaceTempView("movie_ratings") //store movie_ratings table into spark session

In [6]:
spark.catalog.listTables.show //check the available table in spark session

+-------------+--------+-----------+---------+-----------+
|         name|database|description|tableType|isTemporary|
+-------------+--------+-----------+---------+-----------+
|movie_ratings|    null|       null|TEMPORARY|       true|
|       movies|    null|       null|TEMPORARY|       true|
+-------------+--------+-----------+---------+-----------+



## Priview Schema <a class="anchor" id="preview_schema"></a>

In [7]:
movies.printSchema //show movies table schema 

root
 |-- Actor: string (nullable = true)
 |-- Movie Title: string (nullable = true)
 |-- Release Year: integer (nullable = true)



In [8]:
movies_ratings.printSchema //show movie_ratings table schema 

root
 |-- Rating: double (nullable = true)
 |-- Movie Title: string (nullable = true)
 |-- Release Year: integer (nullable = true)



## 1.	Compute the number of movies each actor was in. <a class="anchor" id="no_1"></a>
- The output should have two columns: actor, count. 
- The output should be ordered by the count in descending order

In [14]:
spark.sql("""
SELECT `Actor`, COUNT(`Actor`) as `Count`
FROM movies 
GROUP BY `Actor`
ORDER BY `Count` DESC
""").show()

22/10/30 23:16:06 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: McClure, Marc (I)
 Schema: Actor
Expected: Actor but found: McClure, Marc (I)
CSV file: file:///C:/Users/isalo/OneDrive/Documents/Tugas%20Spark/movies/movies.tsv
+-------------------+-----+
|              Actor|Count|
+-------------------+-----+
|   Tatasciore, Fred|   38|
|      Welker, Frank|   38|
| Jackson, Samuel L.|   32|
|      Harnell, Jess|   31|
|        Damon, Matt|   27|
|      Willis, Bruce|   27|
|  Cummings, Jim (I)|   26|
|         Hanks, Tom|   25|
|   Lynn, Sherry (I)|   25|
|    Bergen, Bob (I)|   25|
|    McGowan, Mickie|   25|
|      Proctor, Phil|   24|
|        Cruise, Tom|   23|
|         Pitt, Brad|   23|
|   Wilson, Owen (I)|   23|
|       Depp, Johnny|   22|
|Freeman, Morgan (I)|   22|
|     Morrison, Rana|   22|
|Williams, Robin (I)|   22|
|      Diaz, Cameron|   21|
+-------------------+-----+
only showing top 20 rows



### Code Breakdown
1. **SELECT `Actor`, COUNT(`Actor`) as `Count`** : Menseleksi kolom Actor dan menampilkan count untuk kolom actor
2. **FROM movies** : Mengambil data dari tabel movies
3. **GROUP BY `Actor`** : mengelompokkan berdasarkan nama Actor
4. **ORDER BY `Count` DESC** : Mengurutkan count secara descending (Besar -> Kecil)

## 2.	Compute the highest-rated movie per year and include all the actors played in that movie. <a class="anchor" id="no_2"></a>
- The output should have only one movie per year, and it should contain four columns: year, movie title, rating, a semicolon-separated list of actor names. 
- This question requires a join between movies.tsv and movie-ratings.tsv files. 
- There are two approaches to this problem. 
  1. The first is to figure out the highest-rated movies per year and then join with a list of actors. 
  2. The second one is to perform the join first and then figure out the highest-rated movies per year and a list of actors. 

  The result of each approach is different from the other one. Why do you think that is?

### 2.1. The first is to figure out the highest-rated movies per year and then join with a list of actors. 

In [10]:
spark.sql("""
WITH high_rating_movies_per_year AS ( 
SELECT `Release Year`, `Rating`, `Movie Title` 
FROM (SELECT *, ROW_NUMBER() OVER(PARTITION BY `Release Year` ORDER BY `Rating` DESC) AS `row_n` FROM movie_ratings) 
WHERE row_n = 1
), movie_actors AS(
SELECT `Movie Title`, CONCAT_WS(';', COLLECT_SET(`Actor`)) AS `Actor List` 
FROM movies
GROUP BY `Movie Title`
)

SELECT Movie.`Release Year`, Movie.`Rating`, Movie.`Movie Title`, Actor.`Actor List` 
FROM high_rating_movies_per_year AS `Movie`
INNER JOIN movie_actors AS `Actor` ON Movie.`Movie Title` = Actor.`Movie Title`
ORDER BY Movie.`Release Year`
""").show()

22/10/30 20:03:21 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: 1.6339, 'Crocodile' Dundee II, 1988
 Schema: Rating, Movie Title, Release Year
Expected: Rating but found: 1.6339
CSV file: file:///C:/Users/isalo/OneDrive/Documents/Tugas%20Spark/movies/movie-ratings.tsv
22/10/30 20:03:22 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: McClure, Marc (I), Freaky Friday
 Schema: Actor, Movie Title
Expected: Actor but found: McClure, Marc (I)
CSV file: file:///C:/Users/isalo/OneDrive/Documents/Tugas%20Spark/movies/movies.tsv
+------------+-------+--------------------+--------------------+
|Release Year| Rating|         Movie Title|          Actor List|
+------------+-------+--------------------+--------------------+
|        1940| 7.8557|           Pinocchio|Dori, Sandro;Idle...|
|        1953| 5.4756|           Peter Pan|Palmer, Geoffrey ...|
|        1961| 0.6726|One Hundred and O...|Wickes, Mary;Wrig...|
|        1979|13.5028|       

#### Code Breakdown
1. **WITH high_rating_movies_per_year AS**: membuat tabel high_rating_movies_per_year
    1. **SELECT `Release Year`, `Rating`, `Movie Title`**: Menseleksi kolom `Release Year`, `Rating`, `Movie Title`
    2. **FROM (SELECT \*, ROW_NUMBER() OVER(PARTITION BY `Release Year` ORDER BY `Rating` DESC) AS `row_n` FROM movie_ratings)**: 
       Menggunakan tabel yang berisi semua kolom yang diseleksi dan tambahan *row number* yang berdasarkan pada `Release Year` dan urutan terbesar dari `Rating`. data tabel diambil dari tabel movie_ratings
    3. **WHERE row_n = 1**: Mengambil setiap row number yang bernilai 1 
2. **movie_actors AS**: membuat tabel movie_actors
    1. **SELECT `Movie Title`, CONCAT_WS(';', COLLECT_SET(`Actor`)) AS `Actor List`**: Menseleksi `Movie Title` dan mengambil setiap actor untuk dijadikan list yang dipisahkan dengan tanda';' 
    2. **FROM movies**: Data diambil dari table movies
    3. **GROUP BY `Movie Title`**: Setiap list actor dikelompokkan berdasarkan `Movie Title`
3. **SELECT Movie.`Release Year`, Movie.`Rating`, Movie.`Movie Title`, Actor.`Actor List`**: Menseleksi kolom `Release Year`, `Rating`, `Movie Title` dari tabel Movie dan `Actor List` dari tabel Actor
4. **FROM high_rating_movies_per_year AS `Movie`**: Mendefinisikan tabel Movie adalah alias dari tabel high_rating_movies_per_year
5. **INNER JOIN movie_actors AS `Actor` ON Movie.`Movie Title` = Actor.`Movie Title`**: Mendefinisikan tabel Actor adalah alias dari tabel movie_actors, lalu digabungkan dengan tabel Actor berdasarkan irisan Movie Title
6. **ORDER BY Movie.`Release Year`**: Data diurutkan berdasarkan Release Year dari tabel Movie

### 2.2. The second one is to perform the join first and then figure out the highest-rated movies per year and a list of actors. <a class="anchor" id="no_2_2"></a>

In [11]:
spark.sql("""
WITH movie_rate AS (
SELECT table1.`Release Year`, table1.`Rating`, table1.`Movie Title`, table2.`Actor` 
FROM movie_ratings AS `table1`
INNER JOIN movies AS `table2` ON table1.`Movie Title` = table2.`Movie Title`
), moviesRating AS (
SELECT `Release Year`, MAX(`Rating`) AS `Rating`, `Movie Title`, CONCAT_WS(';', COLLECT_SET(`Actor`)) AS `Actor List` 
FROM movie_rate
GROUP BY `Movie Title`, `Release Year`
ORDER BY `Release Year`
)

SELECT `Release Year`, `Rating`, `Movie Title`, `Actor List`
FROM (SELECT *, ROW_NUMBER() OVER(PARTITION BY `Release Year` ORDER BY `Rating`) AS `row_n` FROM moviesRating)
WHERE row_n = 1
""").show()

22/10/30 20:03:26 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: 1.6339, 'Crocodile' Dundee II, 1988
 Schema: Rating, Movie Title, Release Year
Expected: Rating but found: 1.6339
CSV file: file:///C:/Users/isalo/OneDrive/Documents/Tugas%20Spark/movies/movie-ratings.tsv
22/10/30 20:03:26 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: McClure, Marc (I), Freaky Friday
 Schema: Actor, Movie Title
Expected: Actor but found: McClure, Marc (I)
CSV file: file:///C:/Users/isalo/OneDrive/Documents/Tugas%20Spark/movies/movies.tsv
+------------+------+--------------------+--------------------+
|Release Year|Rating|         Movie Title|          Actor List|
+------------+------+--------------------+--------------------+
|        1940|7.8557|           Pinocchio|Dori, Sandro;Idle...|
|        1953|5.4756|           Peter Pan|Palmer, Geoffrey ...|
|        1961|0.6726|One Hundred and O...|Wickes, Mary;Wrig...|
|        1967|1.3485|     The Jungl

#### Code Breakdown 
1. **WITH movie_rate AS**: membuat tabel movie_rate
    1. **SELECT table1.`Release Year`, table1.`Rating`, table1.`Movie Title`, table2.`Actor`**: Menseleksi `Release Year`, `Rating` dari table1 dan menseleksi `Movie Title` dan `Actor` dari table2
    2. **FROM movie_ratings AS `table1`**: Mendefiniskan table1 merupakan alias dari table movie_ratings
    3. **INNER JOIN movies AS `table2` ON table1.`Movie Title` = table2.`Movie Title`**: Mendefiniskan table2 merupakan alias dari table movies, lalu menggabungkan table1 dan table2 dengan menggunakan irisan dari `Movie Title`
2. **moviesRating AS**: membuat tabel moviesRating 
    1. **SELECT `Release Year`, MAX(`Rating`) AS `Rating`, `Movie Title`, CONCAT_WS(';', COLLECT_SET(`Actor`)) AS `Actor List`**: Menseleksi `Release Year`, `Rating` yang bernilai maximum, `Movie Title`, dan list actor yang dipisahkan dengan ';'
    2. **FROM movie_rate**: Mengambil data dari table movie_rate
    3. **GROUP BY `Movie Title`, `Release Year`**: Data dikelompokkan berdasarkan `Movie Title`
    4. **ORDER BY `Release Year`**: Mengurutkan data berdasarkan `Release Year`
3. **SELECT `Release Year`, `Rating`, `Movie Title`, `Actor List`**: Menseleksi `Release Year`, `Rating`, `Movie Title`, dan `Actor List`
4. **FROM (SELECT *, ROW_NUMBER() OVER(PARTITION BY `Release Year` ORDER BY `Rating`) AS `row_n` FROM moviesRating)**: mengambil data dari tabel dengan isi kolom yang sudah diseleksi dan _row number_ yang berdasarkan pada `Release Year` dan urutan terbesar dari `Rating`. data tabel diambil dari tabel moviesRating
5. **WHERE row_n = 1**: Menampilkan data yang memiliki _row number_ = 1

### Conclusion
The result of each approach is different from the other one. Why do you think that is?

Data yang ditampilkan dari query pendekatan 1, lebih sedikit dari pendekatan 2. Hal ini dikarenakan pendekatan 1 kita melakukan filtering terlebih dahulu untuk movie rating, lalu kita menggabungkannya berdasarkan irisan dengan actor list, sehingga hasil irisannya lebih sedikit dibandingkan dengan pendekatan 2 yang hasil irisan datanya masih belum ada filter sama sekali.

## 3. Determine which pair of actors worked together most. Working together is defined as appearing in the same movie. <a class="anchor" id="no_3"></a>
- The output should have three columns: actor1, actor2, and count. 
- The output should be sorted by the count in descending order. 

The solution to this question requires doing self-join

In [12]:
spark.sql("""
WITH actor_pair AS (
SELECT actor1.`Actor` AS `Actor 1`, actor2.`Actor` AS `Actor 2`
FROM movies AS actor1 CROSS JOIN movies AS actor2
WHERE actor1.`Actor` <> actor2.`Actor` AND actor1.`Movie Title` = actor2.`Movie Title`
)

SELECT `Actor 1`,`Actor 2`, COUNT(*) AS `Count` 
FROM actor_pair
GROUP BY `Actor 1`,`Actor 2`
ORDER BY `Count` DESC
""").show()

22/10/30 20:03:31 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: McClure, Marc (I), Freaky Friday
 Schema: Actor, Movie Title
Expected: Actor but found: McClure, Marc (I)
CSV file: file:///C:/Users/isalo/OneDrive/Documents/Tugas%20Spark/movies/movies.tsv
22/10/30 20:03:31 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: McClure, Marc (I), Freaky Friday
 Schema: Actor, Movie Title
Expected: Actor but found: McClure, Marc (I)
CSV file: file:///C:/Users/isalo/OneDrive/Documents/Tugas%20Spark/movies/movies.tsv
+------------------+-----------------+-----+
|           Actor 1|          Actor 2|Count|
+------------------+-----------------+-----+
|   McGowan, Mickie| Lynn, Sherry (I)|   23|
|  Lynn, Sherry (I)|  McGowan, Mickie|   23|
|   McGowan, Mickie|  Bergen, Bob (I)|   19|
|   Bergen, Bob (I)|  McGowan, Mickie|   19|
|  Lynn, Sherry (I)|  Bergen, Bob (I)|   19|
|   Bergen, Bob (I)| Lynn, Sherry (I)|   19|
|   Angel, Jack (I)| Lynn, Sh

#### Code Breakdown
 1. **WITH actor_pair AS**: Membuat table actor_pair
 2. **SELECT actor1.`Actor` AS `Actor 1`, actor2.`Actor` AS `Actor 2`** : Menseleksi Actor dari actor1 dan Actor dari actor 2
 3. **FROM movies AS actor1 CROSS JOIN movies AS actor2** : Mengambil dan menggabungkan data actor1 dan actor 2 dari tabel movies 
 4. **WHERE actor1.`Actor` <> actor2.`Actor` AND actor1.`Movie Title` = actor2.`Movie Title`** : Membuat kondisi dimana Actor tidak disandingkan dengan dirinya sendiri dan Actor muncul pada Movie Title yang sama 
 5. **SELECT `Actor 1`, `Actor 2`, COUNT(*) AS `Count`** : Menampilkan `Actor 1`,`Actor 2`, dan `Count` dari gabungan `Actor 1` dan `Actor 2`
 6. **FROM actor_pair** : Mengambil data dari tabel actor_pairs
 7. **GROUP BY `Actor 1`,`Actor 2`** : Mengelompokkan berdasarkan `Actor 1` dan `Actor 2` 
 8. **ORDER BY `Count` DESC** : Menampilkan data berdasarkan nilai Count dari yang terbesar  