## Data Programming - Spark: 인기 영화 찾기

### 실습 목표

MovieLens 데이터셋을 사용하여 인기 있는 영화를 찾고, Repartition Join과 Broadcast Join의 성능을 비교합니다.

**참고:**

* `../data/ml-100k/u.data` 와 `../data/ml-100k/u.item` 파일 경로를 실제 파일 경로로 변경해야 합니다.



### Colab 환경 설정

In [None]:
#!sudo apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz
!tar xf spark-3.2.4-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
# findspark를 사용해 Spark 초기화
import findspark
findspark.init("/content/spark-3.2.4-bin-hadoop3.2")

### 1. Repartition Join

#### 개념 설명

Repartition Join은 데이터를 키로 분할하여 조인하는 방법입니다.  `join()` 함수는 두 RDD를 영화 ID를 기준으로 결합합니다.

#### Spark 설정 및 컨텍스트 생성

### TODO: 참고 주석과 pandas에 대한 지식을 기반으로 빈칸을 채워주세요.

In [33]:
from pyspark.sql import SparkSession

# SparkSession 생성
# SparkSession은 DataFrame과 SQL 작업의 메인 엔트리 포인트입니다. 'PopularMovies_Repartition'라는 이름으로 세션을 생성합니다.
# 참고: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html
spark = SparkSession.builder.master("local").appName("PopularMovies_Repartition").getOrCreate()

# SparkContext에 접근이 필요한 경우, SparkSession에서 sparkContext 속성을 통해 접근할 수 있습니다.
sc = spark.sparkContext

#### 데이터 불러오기 및 전처리

1. **u.data 파일**: 영화 평가 수 집계

In [None]:
from pyspark.sql.functions import col

# u.data 파일을 읽고 스키마 정의
# read.option(): 파일 읽기 시 옵션을 설정합니다.
# schema: DataFrame의 스키마(데이터 타입 및 열 이름)를 지정하여 데이터의 구조를 정의합니다.
# delimiter 옵션을 사용해 탭("\t")을 구분자로 설정합니다.
# 참고: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html
data = spark.read.option("delimiter", "\t").csv("./ml-100k/u.data", schema="UserID INT, MovieID INT, Rating INT, Timestamp LONG")

# 영화 ID별 평가 수 집계
# groupBy("column"): 지정한 열을 기준으로 데이터를 그룹화합니다.
# count(): 각 그룹에 속하는 데이터 개수를 계산하여 새 열(count)을 생성합니다.
# 참고: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.groupBy.html
# 참고: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.GroupedData.count.html
movieCounts = data.__________("MovieID").count()

# 결과 출력
# show(n): DataFrame의 상위 n개 행을 출력합니다. 기본값은 20개이며, n 값을 조정할 수 있습니다.
# 참고: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.show.html
movieCounts.show()

+-------+-----+
|MovieID|count|
+-------+-----+
|    496|  231|
|    471|  221|
|    463|   71|
|    148|  128|
|   1342|    2|
|    833|   49|
|   1088|   13|
|   1591|    6|
|   1238|    8|
|   1580|    1|
|   1645|    1|
|    392|   68|
|    623|   39|
|    540|   43|
|    858|    3|
|    737|   59|
|    243|  132|
|   1025|   44|
|   1084|   21|
|   1127|   11|
+-------+-----+
only showing top 20 rows



2. **u.item 파일**: 영화 ID와 이름 매칭

In [None]:
# u.item 파일을 읽고 스키마 정의
# read.option(): 파일을 읽을 때 특정 옵션을 설정합니다. 여기서는 "delimiter" 옵션을 사용하여 필드 구분자로 "|"을 지정합니다.
# schema: 스키마를 지정하여 각 열의 데이터 타입을 명시합니다. MovieID는 정수형, Title은 문자열로 설정합니다.
# 참고: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html
item_df = spark.read.option("delimiter", "|").csv("./ml-100k/u.item", schema="MovieID INT, Title STRING")

# 필요한 열만 선택하여 (MovieID, Title) 형식의 DataFrame 생성
# select("column1", "column2", ...): DataFrame에서 필요한 열만 선택하여 새로운 DataFrame을 생성합니다.
# 참고: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.select.html
id_name_df = item_df.__________("__________", "__________")

# 결과 출력
id_name_df.show()

+-------+--------------------+
|MovieID|               Title|
+-------+--------------------+
|      1|    Toy Story (1995)|
|      2|    GoldenEye (1995)|
|      3|   Four Rooms (1995)|
|      4|   Get Shorty (1995)|
|      5|      Copycat (1995)|
|      6|Shanghai Triad (Y...|
|      7|Twelve Monkeys (1...|
|      8|         Babe (1995)|
|      9|Dead Man Walking ...|
|     10|  Richard III (1995)|
|     11|Seven (Se7en) (1995)|
|     12|Usual Suspects, T...|
|     13|Mighty Aphrodite ...|
|     14|  Postino, Il (1994)|
|     15|Mr. Holland's Opu...|
|     16|French Twist (Gaz...|
|     17|From Dusk Till Da...|
|     18|White Balloon, Th...|
|     19|Antonia's Line (1...|
|     20|Angels and Insect...|
+-------+--------------------+
only showing top 20 rows



3. **조인 수행**

In [None]:
# 두 DataFrame을 MovieID 기준으로 조인
# join(other, on): 두 DataFrame을 조인할 때 사용됩니다. `on` 파라미터로 조인할 열을 지정하며, 같은 MovieID 값을 기준으로 조인합니다.
# 결과로 영화 제목(Title)과 평가 수(count) 열을 포함하는 DataFrame을 생성합니다.
# 참고: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.join.html
results = movieCounts.__________(id_name_df, on="__________").select("Title", "count")

#### 결과 확인

In [27]:
# 결과 출력
for result in results.collect()[:20]:
    print(result)

Row(Title="It's a Wonderful Life (1946)", count=231)
Row(Title='Courage Under Fire (1996)', count=221)
Row(Title='Secret of Roan Inish, The (1994)', count=71)
Row(Title='Ghost and the Darkness, The (1996)', count=128)
Row(Title='Convent, The (Convento, O) (1995)', count=2)
Row(Title='Bulletproof (1996)', count=49)
Row(Title='Double Team (1997)', count=13)
Row(Title='Duoluo tianshi (1995)', count=6)
Row(Title='Full Speed (1996)', count=8)
Row(Title='Liebelei (1933)', count=1)
Row(Title='Butcher Boy, The (1998)', count=1)
Row(Title='Man Without a Face, The (1993)', count=68)
Row(Title='Angels in the Outfield (1994)', count=39)
Row(Title='Money Train (1995)', count=43)
Row(Title='Amityville: Dollhouse (1996)', count=3)
Row(Title='Sirens (1994)', count=59)
Row(Title='Jungle2Jungle (1997)', count=132)
Row(Title='Fire Down Below (1997)', count=44)
Row(Title='Anne Frank Remembered (1995)', count=21)
Row(Title='Truman Show, The (1998)', count=11)


#### Repartition Join 요약

* 데이터를 재분배해야 하므로 대규모 데이터셋에서는 성능 비용이 발생할 수 있습니다.
* 데이터가 매우 큰 경우 비효율적일 수 있지만, 모든 키가 적절히 분산되어 있으면 사용할 수 있습니다.


### 2. Broadcast Join

#### 개념 설명

Broadcast Join은 작은 데이터를 클러스터의 모든 노드에 복사(브로드캐스트)하여 효율적인 조인을 수행하는 방법입니다.

#### Spark 설정 및 컨텍스트 생성

### TODO: 참고 주석과 pandas에 대한 지식을 기반으로 빈칸을 채워주세요.

#### 영화 이름 로드 함수

In [36]:
def loadMovieNames():
    movieNames = {}
    with open("./ml-100k/u.item") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

# 영화 이름 딕셔너리 생성 및 브로드캐스트
# spark.sparkContext.broadcast(value): Spark 애플리케이션 전체에 걸쳐 공유할 수 있는 변수를 생성합니다.
# `broadcast` 변수는 각 워커 노드에 저장되어 네트워크 오버헤드를 줄입니다. 여기서는 loadMovieNames() 함수에서 생성된 영화 이름 딕셔너리를 브로드캐스트 변수로 만듭니다.
# 참고: https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#pyspark.SparkContext.broadcast
nameDict = spark.sparkContext.broadcast(loadMovieNames())

#### 데이터 불러오기 및 전처리

1. **u.data 파일**: 영화 평가 수 집계

In [None]:
# u.data 파일을 DataFrame으로 로드하고 스키마 정의
# read.option("key", "value"): 파일을 읽을 때 특정 옵션을 설정합니다. 여기서는 필드 구분자를 탭("\t")으로 설정합니다.
# csv(path, schema): 지정된 경로의 CSV 파일을 읽고, 스키마를 정의하여 각 열의 데이터 타입을 명시합니다.
# 참고: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html
data_df = spark.read.option("delimiter", "\t").csv("./ml-100k/u.data", schema="UserID INT, MovieID INT, Rating INT, Timestamp LONG")

# 영화 ID별 평가 수를 집계
# groupBy("column"): 지정한 열을 기준으로 데이터를 그룹화합니다. 여기서는 MovieID 열로 그룹화합니다.
# count(): 그룹별로 속한 데이터 개수를 세어 count 열을 생성합니다.
# 참고: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.groupBy.html
# 참고: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.GroupedData.count.html
movieCounts = data_df.__________("MovieID").count()

2. **영화 이름과 조인**

In [None]:
# 영화 이름을 브로드캐스트 변수에서 가져오는 UDF 정의
# udf(f, returnType): 사용자 정의 함수를 Spark UDF로 등록하여 DataFrame 열에 적용할 수 있게 합니다.
# 여기서는 영화 ID를 받아서 해당 영화 이름을 반환하는 함수 getMovieName을 UDF로 등록합니다.
# 참고: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.udf.html
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# 영화 ID로 영화 이름을 검색하는 함수 정의 및 UDF 생성
# nameDict.value.get(movie_id, "Unknown"): Broadcast 변수를 사용하여 주어진 영화 ID에 해당하는 영화 이름을 가져옵니다.
# 만약 해당 영화 ID가 존재하지 않으면 기본값 "Unknown"을 반환합니다.
# 참고: https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#pyspark.Broadcast.value
def getMovieName(movie_id):
    return nameDict.value.get(movie_id, "Unknown")

# StringType: UDF 반환 타입을 문자열로 지정합니다.
# 참고: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.StringType.html
getMovieNameUDF = __________(getMovieName, StringType())


In [None]:
# 영화 ID별 평가 수에 영화 이름 추가
# withColumn(colName, col): 새로운 열을 추가하거나 기존 열을 업데이트합니다. 여기서는 "MovieName"이라는 새 열을 추가하고, 영화 ID를 기반으로 영화 이름을 가져오는 UDF를 적용합니다.
# 참고: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withColumn.html
results = movieCounts.__________("MovieName", getMovieNameUDF(col("__________"))).select("MovieName", "count")

# select("column1", "column2"): 필요한 열만 선택하여 최종 결과를 생성합니다. 여기서는 "MovieName"과 "count" 열을 선택합니다.
# 참고: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.select.html


#### 결과 확인

In [38]:
# 결과 출력
for result in results.collect()[:20]:
    print(result)

Row(MovieName="It's a Wonderful Life (1946)", count=231)
Row(MovieName='Courage Under Fire (1996)', count=221)
Row(MovieName='Secret of Roan Inish, The (1994)', count=71)
Row(MovieName='Ghost and the Darkness, The (1996)', count=128)
Row(MovieName='Convent, The (Convento, O) (1995)', count=2)
Row(MovieName='Bulletproof (1996)', count=49)
Row(MovieName='Double Team (1997)', count=13)
Row(MovieName='Duoluo tianshi (1995)', count=6)
Row(MovieName='Full Speed (1996)', count=8)
Row(MovieName='Liebelei (1933)', count=1)
Row(MovieName='Butcher Boy, The (1998)', count=1)
Row(MovieName='Man Without a Face, The (1993)', count=68)
Row(MovieName='Angels in the Outfield (1994)', count=39)
Row(MovieName='Money Train (1995)', count=43)
Row(MovieName='Amityville: Dollhouse (1996)', count=3)
Row(MovieName='Sirens (1994)', count=59)
Row(MovieName='Jungle2Jungle (1997)', count=132)
Row(MovieName='Fire Down Below (1997)', count=44)
Row(MovieName='Anne Frank Remembered (1995)', count=21)
Row(MovieName='Tru

#### Broadcast Join 요약

* 작은 데이터셋을 모든 노드에 복사하여 더 효율적인 조인을 제공합니다.
* 대규모 데이터를 조인할 때 성능을 크게 향상시킬 수 있습니다.


### 결론 및 비교

* **Repartition Join**: 데이터가 클수록 성능이 저하될 수 있으며, 재분배 비용이 큽니다. 그러나 모든 데이터를 고르게 분산시키는 데 적합합니다.
* **Broadcast Join**: 작은 데이터셋을 브로드캐스트하면 성능이 크게 향상됩니다. 대규모 데이터를 조인할 때 효율적입니다.
