In [1]:
import os
os.environ['PYSPARK_PYTHON'] = "C:/Users/user/anaconda3/envs/torchenv/python.exe"
os.environ['PYSPARK_DRIVER_PYTHON'] = "C:/Users/user/anaconda3/envs/torchenv/python.exe"

In [19]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructField, StructType, IntegerType, LongType
import codecs # 브로드캐스트에 필요한 u.item 파일을 로드하는데 사용할 것 

### 파일 로드 함수 정의

In [20]:
def load_movie_names():
  movienames = {}
  
  with codecs.open("C:/nvidia_course/nvidia-course/week1/day5/AdvancedSpark/ml-100k/u.item", 'r',
                   encoding = 'ISO-8859-1', errors = 'ignore') as f:
    for line in f:
      fields = line.split("|") # 해당 데이터는 |로 구분되어있으므로 구분자로 분리
      
      # movienames라는 딕셔너리의 키를 영화 ID, value 값을 영화 제목으로 저장 
      movienames[int(fields[0])] = fields[1]
      
  return movienames

In [21]:
spark = SparkSession.builder\
                    .master('local')\
                    .appName('popular-movie-dataframe')\
                    .getOrCreate()
spark

### `spark.sparkContext`로 RDD API 활용

- `broadcast`
  - 클러스터의 모든 노드에 공통 데이터를 효율적으로 공유할 수 있게 해주는 변수, 전체 영화 목록을 중복으로 전달받지 않고 공유된 참조만 사용  
  - 한 번만 클러스터로 전송해서, 모든 워커가 그 값을 메모리에 캐시하고 공유하도록 함
  - **broadcast** 객체를 반환하므로 내부 딕셔너리에 접근하려면 `.value`로 접근해야됨 

In [22]:
name_dict = spark.sparkContext.broadcast(load_movie_names())
name_dict

<pyspark.broadcast.Broadcast at 0x1f1affc8c10>

In [23]:
schema = StructType([
  StructField('userID', IntegerType(), True),
  StructField('movieID', IntegerType(), True),
  StructField('rating', IntegerType(), True),
  StructField('timestamp', LongType(), True)
])

In [24]:
df = spark.read.schema(schema = schema).option('sep', '\t').csv("file:///nvidia_course/nvidia-course/week1/day5/AdvancedSpark/ml-100k/u.data")
df.show(10)

+------+-------+------+---------+
|userID|movieID|rating|timestamp|
+------+-------+------+---------+
|   196|    242|     3|881250949|
|   186|    302|     3|891717742|
|    22|    377|     1|878887116|
|   244|     51|     2|880606923|
|   166|    346|     1|886397596|
|   298|    474|     4|884182806|
|   115|    265|     2|881171488|
|   253|    465|     5|891628467|
|   305|    451|     3|886324817|
|     6|     86|     3|883603013|
+------+-------+------+---------+
only showing top 10 rows



### 영화 ID를 기준으로 빈도수 추출

In [25]:
movie_cnt = df.groupby('movieID').count()
movie_cnt.show(10)

+-------+-----+
|movieID|count|
+-------+-----+
|    496|  231|
|    471|  221|
|    463|   71|
|    148|  128|
|   1342|    2|
|    833|   49|
|   1088|   13|
|   1591|    6|
|   1238|    8|
|   1580|    1|
+-------+-----+
only showing top 10 rows



### 영화 이름 조회 사용자 정의 함수
- Spark에서는 사용자 정의 함수를 **UDF(User Defined Function)** 로 등록해야함
- Spark SQL에서 쓸 수 있도록 만든 것이므로, `.withColumn()`과 같은 연산 사용 가능

In [26]:
def look_up_name(movie_id):
  return name_dict.value[movie_id] # broadcast 객체에서 내부 딕셔너리의 제목 조회

### UDF를 사용해서 movieID가 포함된 새로운 DataFrame 생성

In [28]:
look_up_name_udf = F.udf(look_up_name) # UDF로 등록

movies_with_names = movie_cnt.withColumn('movieTitle', look_up_name_udf(F.col('movieID')))

In [29]:
sorted_movie_with_names = movies_with_names.orderBy(F.desc("count"))
sorted_movie_with_names.show(10)

+-------+-----+--------------------+
|movieID|count|          movieTitle|
+-------+-----+--------------------+
|     50|  583|    Star Wars (1977)|
|    258|  509|      Contact (1997)|
|    100|  508|        Fargo (1996)|
|    181|  507|Return of the Jed...|
|    294|  485|    Liar Liar (1997)|
|    286|  481|English Patient, ...|
|    288|  478|       Scream (1996)|
|      1|  452|    Toy Story (1995)|
|    300|  431|Air Force One (1997)|
|    121|  429|Independence Day ...|
+-------+-----+--------------------+
only showing top 10 rows



In [30]:
spark.stop()