## Pandas API on Spark 사용해보기

- 유저 별로 많이 사용한 태그 상위 3개를 구하기
- 유저 별로 상위 3개 태그가 포함된 영화들과 포함되지 않은 영화들의 평균평점을 각각 구하기
- 만약, 태그가 포함된 영화들의 평균평점이 포함되지 않은 영화들의 평균평점보다 높다면

  =>해당 유저는 상위 태그에 대한 선호가 있고, 해당 태그를 가지는 영화를 찾아보려는 경향이 있음

In [1]:
# 스파크 드라이버와 워커의 파이썬 버전이 다를 경우, 
# PYSPARK_PYTHON 와 PYSPARK_DRIVER_PYTHON 를 환경변수 지정 
# 예시 : .zshrc 파일에 아래 코드 입력
#           export PYSPARK_PYTHON=$HOME/.pyenv/versions/3.12.1/bin/python3.12
#           export PYSPARK_DRIVER_PYTHON=$HOME/.pyenv/versions/3.12.1/bin/python3.12
# 위의 설정을 해도 오류가 날 경우, 스크립트에 아래 코드 입력
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
# 스파크 세션 생성
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Spark study - 231209") \
    .getOrCreate()

131072x1 화면 크기가 잘못됐습니다. 문제가 예상됩니다
23/12/11 21:00:38 WARN Utils: Your hostname, KJH-DESKTOP resolves to a loopback address: 127.0.1.1; using 192.168.69.220 instead (on interface eth0)
23/12/11 21:00:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/11 21:00:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# 데이터프레임 생성
df_genome_scores = spark.read.csv("file:///home/kjh/data/ml-latest/genome-scores.csv", header=True, inferSchema=True)
df_genome_tags = spark.read.csv("file:///home/kjh/data/ml-latest/genome-tags.csv", header=True, inferSchema=True)
df_movies = spark.read.csv("file:///home/kjh/data/ml-latest/movies.csv", header=True, inferSchema=True)
df_links = spark.read.csv("file:///home/kjh/data/ml-latest/links.csv", header=True, inferSchema=True)
df_ratings = spark.read.csv("file:///home/kjh/data/ml-latest/ratings.csv", header=True, inferSchema=True)
df_tags = spark.read.csv("file:///home/kjh/data/ml-latest/tags.csv", header=True, inferSchema=True)

                                                                                

In [4]:
# 필요한 데이터프레임 확인
df_ratings.show(4)
df_tags.show(4)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      1|   4.0|1225734739|
|     1|    110|   4.0|1225865086|
|     1|    158|   4.0|1225733503|
|     1|    260|   4.5|1225735204|
+------+-------+------+----------+
only showing top 4 rows

+------+-------+-------------+----------+
|userId|movieId|          tag| timestamp|
+------+-------+-------------+----------+
|    10|    260| good vs evil|1430666558|
|    10|    260|Harrison Ford|1430666505|
|    10|    260|       sci-fi|1430666538|
|    14|   1221|    Al Pacino|1311600756|
+------+-------+-------------+----------+
only showing top 4 rows



In [5]:
# Pandas API on Spark 사용
import pandas as pd
import numpy as np
import pyspark.pandas as ps



In [6]:
# 스파크 데이터프레임을 판다스 on Spark 데이터프레임으로 변환
psdf_ratings = df_ratings.pandas_api()
psdf_tags = df_tags.pandas_api()

In [7]:
# 위의 df_ratings 와 비교 시 출력모양 다름. 출력메소드 차이(show()와 head())
psdf_ratings.head()

                                                                                

Unnamed: 0,userId,movieId,rating,timestamp
0,1,1,4.0,1225734739
1,1,110,4.0,1225865086
2,1,158,4.0,1225733503
3,1,260,4.5,1225735204
4,1,356,5.0,1225735119


In [10]:
# to_datetime 메소드 : timestapmp 를 datetime 형식으로 변환
# errors='coerce' : 변환 과정에서 오류가 발생하면 그 행은 무시하고 진행. 주로 변환할 수 없는 값이 나왔을 시 오류발생. 그 값은 NaT로 대체
# unit='s' : timestamp 의 단위 지정 's' 는 초 단위
psdf_ratings['rating_datetime'] = ps.to_datetime(psdf_ratings['timestamp'], errors='coerce', unit='s')
psdf_tags['tag_datetime'] = ps.to_datetime(psdf_tags['timestamp'], errors='coerce', unit='s')

In [11]:
psdf_ratings.head()

                                                                                

Unnamed: 0,userId,movieId,rating,timestamp,rating_datetime
0,1,1,4.0,1225734739,2008-11-03 17:52:19
1,1,110,4.0,1225865086,2008-11-05 06:04:46
2,1,158,4.0,1225733503,2008-11-03 17:31:43
3,1,260,4.5,1225735204,2008-11-03 18:00:04
4,1,356,5.0,1225735119,2008-11-03 17:58:39


In [13]:
# drop() : timestamp 컬럼 삭제
# axis : 삭제 기준 설정. 0은 행 기준, 1은 컬럼 기준. 기본값은 0
psdf_ratings = psdf_ratings.drop('timestamp', axis=1)
psdf_tags = psdf_tags.drop('timestamp', axis=1)

In [14]:
psdf_tags.head()

                                                                                

Unnamed: 0,userId,movieId,tag,tag_datetime
0,10,260,good vs evil,2015-05-03 15:21:36
1,10,260,Harrison Ford,2015-05-03 15:21:36
2,10,260,sci-fi,2015-05-03 15:21:36
3,14,1221,Al Pacino,2011-07-25 13:32:48
4,14,1221,mafia,2011-07-25 13:32:48


In [15]:
# merge() : 데이터프레임 합치기
psdf_user = ps.merge(psdf_ratings, psdf_tags, on=['userId', 'movieId'], how='inner')

In [16]:
psdf_user.head(10)

23/12/11 21:01:38 WARN AttachDistributedSequenceExec: clean up cached RDD(116) in AttachDistributedSequenceExec(356)
23/12/11 21:01:39 WARN AttachDistributedSequenceExec: clean up cached RDD(126) in AttachDistributedSequenceExec(361)
                                                                                

Unnamed: 0,userId,movieId,rating,rating_datetime,tag,tag_datetime
0,26,296,4.5,2015-04-18 22:59:59,crime,2015-04-18 23:15:12
1,26,296,4.5,2015-04-18 22:59:59,cult film,2015-04-18 23:15:12
2,26,296,4.5,2015-04-18 22:59:59,quentin tarantino,2015-04-18 23:15:12
3,37,1036,5.0,2020-01-04 18:53:43,action,2020-01-04 18:52:48
4,37,1036,5.0,2020-01-04 18:53:43,Alan Rickman,2020-01-04 18:54:56
5,37,1036,5.0,2020-01-04 18:53:43,classic,2020-01-04 18:54:56
6,37,1036,5.0,2020-01-04 18:53:43,tense,2020-01-04 18:54:56
7,37,69844,4.5,2020-01-04 20:03:48,Alan Rickman,2020-01-04 20:03:12
8,37,69844,4.5,2020-01-04 20:03:48,fantasy,2020-01-04 20:03:12
9,37,92259,5.0,2020-01-04 19:24:38,friendship,2020-01-04 19:24:48


In [18]:
# isna().sum() : 결측치 확인
psdf_user.isna().sum()

23/12/11 21:02:30 WARN AttachDistributedSequenceExec: clean up cached RDD(242) in AttachDistributedSequenceExec(1480)
23/12/11 21:02:31 WARN AttachDistributedSequenceExec: clean up cached RDD(252) in AttachDistributedSequenceExec(1485)
                                                                                

userId             0
movieId            0
rating             0
rating_datetime    0
tag                0
tag_datetime       3
dtype: int64

In [19]:
# dropna() : 결측치 제거
psdf_user_dropna = psdf_user.dropna()

In [21]:
# 각 유저 별 영화 갯수 추출
user_movie_counts = psdf_user_dropna.groupby('userId')['movieId'].nunique()

In [22]:
# 영화 10개 이상 본 유저만 필터
# Pandas API on Spark 는 기존 판다스와 다르게 index가 호환이 안되기 때문에 numpy로 바꿈
user_movie_counts_10more = user_movie_counts[user_movie_counts >= 10].index.to_numpy()

23/12/11 21:03:00 WARN AttachDistributedSequenceExec: clean up cached RDD(310) in AttachDistributedSequenceExec(1978)
23/12/11 21:03:01 WARN AttachDistributedSequenceExec: clean up cached RDD(320) in AttachDistributedSequenceExec(1983)
                                                                                

In [24]:
# 10개 이상 본 유저로 데이터프레임 필터링
psdf_10more = psdf_user_dropna[psdf_user_dropna['userId'].isin(user_movie_counts_10more)]

In [26]:
# sort_values() : userId 로 정렬
psdf_orderbyuserId = psdf_10more.sort_values(by='userId')

In [29]:
# 각 유저 별 빈도가 높은 상위 3개 tag 추출
# userId와 tag를 기준으로 그룹화(groupby()) 하여 각 그룹의 크기를 계산(size()) 하고, 그것을 count 열로 저장(reset_index())
tag_counts = psdf_orderbyuserId.groupby(['userId', 'tag']).size().reset_index(name='count')
# userId 로 그룹화하여 count가 가장 높은 3개를 추출(nlargest())
top_tags = tag_counts.groupby('userId').apply(lambda x: x.nlargest(3, 'count')).reset_index(drop=True)

23/12/11 21:03:48 WARN AttachDistributedSequenceExec: clean up cached RDD(442) in AttachDistributedSequenceExec(3044)
23/12/11 21:03:48 WARN AttachDistributedSequenceExec: clean up cached RDD(432) in AttachDistributedSequenceExec(3039)
                                                                                

In [30]:
# Pandas API on Spark 데이터프레임의 type
type(top_tags)

pyspark.pandas.frame.DataFrame

In [31]:
# 상위 태그 중 의미없는 카운트 1은 제외
top_tags_drop1 = top_tags.query('count != 1')

In [34]:
# 사용자별 태그 목록 생성
# 딕셔너리로 바꾸는 이유는 연산속도를 빠르게 하기 위해
top_tags_per_user = top_tags_drop1.groupby('userId')['tag'].apply(list).to_dict()

23/12/11 22:04:37 WARN AttachDistributedSequenceExec: clean up cached RDD(588) in AttachDistributedSequenceExec(4353)
23/12/11 22:04:37 WARN AttachDistributedSequenceExec: clean up cached RDD(578) in AttachDistributedSequenceExec(4348)
23/12/11 22:04:38 WARN AttachDistributedSequenceExec: clean up cached RDD(605) in AttachDistributedSequenceExec(4729)
23/12/11 22:04:59 WARN AttachDistributedSequenceExec: clean up cached RDD(636) in AttachDistributedSequenceExec(4871)
23/12/11 22:04:59 WARN AttachDistributedSequenceExec: clean up cached RDD(626) in AttachDistributedSequenceExec(4866)
23/12/11 22:05:00 WARN AttachDistributedSequenceExec: clean up cached RDD(653) in AttachDistributedSequenceExec(5308)
23/12/11 22:05:07 WARN AttachDistributedSequenceExec: clean up cached RDD(662) in AttachDistributedSequenceExec(5394)
                                                                                

In [37]:
# top_tags_per_user가 딕셔너리, psdf_orderbyuserId는 데이터프레임이기 때문에 추가 설정 필요
from pyspark.pandas.config import set_option

# 서로 다른 데이터프레임 간의 연산 허용
set_option("compute.ops_on_diff_frames", True)

In [38]:
# psdf_orderbyuserId의 각 행에 대해 태그가 사용자별 태그 목록에 있는지 확인
psdf_orderbyuserId['tag_included'] = psdf_orderbyuserId.apply(
    lambda row: row['tag'] in top_tags_per_user.get(row['userId'], []),
    axis=1
)

23/12/11 22:12:25 WARN AttachDistributedSequenceExec: clean up cached RDD(720) in AttachDistributedSequenceExec(5720)
23/12/11 22:12:25 WARN AttachDistributedSequenceExec: clean up cached RDD(710) in AttachDistributedSequenceExec(5715)
                                                                                

In [39]:
psdf_orderbyuserId = psdf_orderbyuserId.sort_values(by=['userId', 'movieId'])

In [42]:
psdf_orderbyuserId.head()

23/12/12 02:44:30 WARN AttachDistributedSequenceExec: clean up cached RDD(873) in AttachDistributedSequenceExec(8090)
23/12/12 02:44:32 WARN AttachDistributedSequenceExec: clean up cached RDD(863) in AttachDistributedSequenceExec(8085)
23/12/12 02:44:35 WARN AttachDistributedSequenceExec: clean up cached RDD(887) in AttachDistributedSequenceExec(8718)
23/12/12 02:44:37 WARN AttachDistributedSequenceExec: clean up cached RDD(898) in AttachDistributedSequenceExec(8774)
                                                                                

Unnamed: 0,userId,movieId,rating,rating_datetime,tag,tag_datetime,tag_included
1515598,37,47,5.0,2020-01-04 19:47:14,twist ending,2020-01-04 19:48:16,False
1515595,37,47,5.0,2020-01-04 19:47:14,Kevin Spacey,2020-01-04 19:48:16,False
1515597,37,47,5.0,2020-01-04 19:47:14,powerful ending,2020-01-04 19:48:16,False
1515596,37,47,5.0,2020-01-04 19:47:14,Morgan Freeman,2020-01-04 19:48:16,False
1515599,37,165,4.0,2020-01-04 18:55:02,action,2020-01-04 18:54:56,True


In [43]:
# 유저별로 tag_included에 따른 평균평점 계산
average_ratings = psdf_orderbyuserId.groupby(['userId', 'tag_included'])['rating'].mean().reset_index()



In [44]:
# 새로운 데이터프레임 생성 및 컬럼명 설정
result_df = average_ratings.rename(columns={'rating': '평균평점'})

In [47]:
result_df = result_df.sort_values(by='userId')

In [50]:
result_df.head(20)

23/12/12 04:31:48 WARN AttachDistributedSequenceExec: clean up cached RDD(1209) in AttachDistributedSequenceExec(15101)
23/12/12 04:31:49 WARN AttachDistributedSequenceExec: clean up cached RDD(1199) in AttachDistributedSequenceExec(15096)
23/12/12 04:31:51 WARN AttachDistributedSequenceExec: clean up cached RDD(1223) in AttachDistributedSequenceExec(15908)
23/12/12 04:31:53 WARN AttachDistributedSequenceExec: clean up cached RDD(1234) in AttachDistributedSequenceExec(15964)
                                                                                

Unnamed: 0,userId,tag_included,평균평점
1760,37,False,4.322034
2151,37,True,4.568182
1708,137,True,4.7
2160,137,False,4.690367
1557,302,False,4.320755
1598,302,True,4.409091
1616,370,False,2.85
1682,418,True,4.8
1908,418,False,4.892857
1521,499,True,3.055556
