In [1]:
!pip install pyspark
!apt-get install openjdk-8-jdk-headless -qq

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=6171c8529e87d5e174c9f6d2e23fdcf3a317b9a273e02db40a15cf90a9d0f003
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
Selecting previously unselected package libxtst6:amd64.
(Reading database ... 120899 files and directories currently installed.)
Preparing to unpack .../libxtst6_2%3a1.2.3-1build4_amd64.deb ...
Unpacking libxtst6:amd64 (2:1.2.3-1build4) ...
Selecting previously unselected pa

In [2]:
import pyspark
import numpy as np
import pandas as pd
import os
from pyspark.context import SparkContext
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, broadcast
from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

In [7]:
import os
from google.colab import drive
drive.mount('/content/gdrive')

path= "/content/gdrive/MyDrive/광운대학교/4학년1학기/빅데이터처리및응용/"
PATH = "/content/gdrive/MyDrive/광운대학교/4학년1학기/빅데이터처리및응용/"

os.listdir(path)

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


['ratings_refined.csv',
 '1210 all_pp.csv',
 '1207 OpenDF.ipynb',
 'movies_refined.csv',
 '1210 EucSim_Genres.csv',
 '1210 CosSim_Genres.csv',
 '1210 CosSim_Title.csv',
 'Spark_story.ipynb',
 '불Finda_Spark_movie_recommendation (1).ipynb',
 '1210 CosSim_Story.csv',
 'Spark_EucSim_genre.ipynb',
 '1210 CosSim_Tag.csv',
 'Spark_EucSim_All.ipynb']

In [8]:
CosSim_Tag  = pd.read_csv(path + '1210 CosSim_Tag.csv')
k = pd.read_csv(path + "1210 all_pp.csv")
movie = pd.read_csv(path + "movies_refined.csv")
rating = pd.read_csv(path + 'ratings_refined.csv')

In [9]:
new_data = pd.merge(movie, k, on='movieId', how='right')
new_df = pd.merge(movie, k, on='movieId', how='left')
new_ratings = pd.merge(rating, k, on='movieId', how='left')
title_list = new_data['title_x'].tolist()
movie_id_list = new_data['movieId'].tolist()

In [10]:
CosSim_Tag_id  = CosSim_Tag.set_index(pd.Index(movie_id_list)).rename(columns=dict(zip(CosSim_Tag.columns, movie_id_list)))
CosSim_Tag  = CosSim_Tag.set_index(pd.Index(title_list)).rename(columns=dict(zip(CosSim_Tag.columns, title_list)))

In [11]:
new_ratings_refined = rating[rating['movieId'].isin(movie_id_list)]
new_ratings_refined = pd.merge(new_ratings_refined, movie, on='movieId', how='left')
new_ratings_refined = new_ratings_refined[['userId', 'movieId', 'rating', 'title']]

In [12]:
from pyspark import SparkContext, SparkConf

def calculate_predictions(sub_sim_mat, watched_movies_y, sim_N):
    # Initialize SparkContext
    conf = SparkConf().setAppName("ColabApp").setMaster("local[*]")\
                      .set('spark.executor.memory', '4g')\
                      .set('spark.driver.memory', '4g')
    sc = SparkContext.getOrCreate(conf=conf)

    # Create RDDs for sub_sim_mat, watched_movies_y, and sim_N
    sub_sim_mat_rdd = sc.parallelize(sub_sim_mat, numSlices=4)
    watched_movies_y_rdd = sc.parallelize(watched_movies_y, numSlices=4)
    sim_N_bc = sc.broadcast(sim_N)

    # Calculate pred_y using Spark transformations and actions
    def calculate_pred_y(sub_sim_mat_row):
        return np.dot(sub_sim_mat_row, watched_movies_y) / sim_N_bc.value

    pred_y_rdd = sub_sim_mat_rdd.map(calculate_pred_y)
    pred_y = pred_y_rdd.collect()

    # Flatten the result if necessary
    pred_y_flat = np.array(pred_y).flatten()

    # Stop SparkContext
    sc.stop()

    return pred_y_flat


In [13]:
from tqdm.notebook import tqdm

def modeling_spark(similarity_matrix, data):
    # 빈 데이터프레임
    df_pred_all = pd.DataFrame()
    # 영화 이름
    titles = title_list
    # 모든 사용자
    all_users = sorted(data['userId'].unique())
    # 영화 개수
    n_titles = len(titles)

    for user in tqdm(all_users):
        idx = data[data['userId'] == user].index

        # 유사도
        watched_movies = data.loc[idx, 'movieId'].tolist()
        sub_sim_mat = similarity_matrix.loc[watched_movies]
        sub_sim_mat = sub_sim_mat.T.to_numpy()
        sim_N = np.sum(sub_sim_mat, axis=1) + 1

        # 평점 예측
        watched_movies_y = data.loc[idx, 'rating']
        watched_movies_y = np.array(watched_movies_y.tolist()).reshape(-1, 1)

        # pred_y 계산 (PySpark)
        pred_y_spark = calculate_predictions(sub_sim_mat, watched_movies_y, sim_N)

        # pred_y를 NumPy 배열로 변환하고 계산
        pred_y = np.array(pred_y_spark)
        users_list = [user] * n_titles
        cur_pred = pd.DataFrame(zip(users_list, titles, pred_y),
                                columns=['userId', 'title', 'pred_rating'])

        # 결과 기록
        df_pred_all = pd.concat([df_pred_all, cur_pred], axis=0)

    return df_pred_all


In [14]:
df_pred_cos_story_sp = modeling_spark(CosSim_Tag_id, new_ratings_refined)

  0%|          | 0/610 [00:00<?, ?it/s]

In [15]:
df_pred_cos_story_sp.to_csv(PATH + 'df_pred_cos_tag_sp.csv', index=False)

In [16]:
df_pred_cos_story_sp.head()

Unnamed: 0,userId,title,pred_rating
0,1,Toy Story,4.324068
1,1,Jumanji,3.925345
2,1,Grumpier Old Men,3.926638
3,1,Waiting to Exhale,4.262462
4,1,Father of the Bride Part II,3.597066


In [17]:
df_pred_cos_story_sp.shape

(5915170, 3)