# Spark Processing

### 1. review Processing

In [1]:
from google.cloud import storage

client = storage.Client()
bucket = client.bucket("movie_info_and_reviews")
list(bucket.list_blobs(prefix="movie_reviews/"))

[<Blob: movie_info_and_reviews, movie_reviews/, 1742622341927184>,
 <Blob: movie_info_and_reviews, movie_reviews/cgv_reviews/, 1742808190496410>,
 <Blob: movie_info_and_reviews, movie_reviews/cgv_reviews/3일_cgv_reviews.csv, 1743404394596607>,
 <Blob: movie_info_and_reviews, movie_reviews/cgv_reviews/500일의 썸머_cgv_reviews.csv, 1742895473405637>,
 <Blob: movie_info_and_reviews, movie_reviews/cgv_reviews/9월 5일: 위험한 특종_cgv_reviews.csv, 1742822802547895>,
 <Blob: movie_info_and_reviews, movie_reviews/cgv_reviews/검은 수녀들_cgv_reviews.csv, 1743365355822102>,
 <Blob: movie_info_and_reviews, movie_reviews/cgv_reviews/고독한 미식가 더 무비_cgv_reviews.csv, 1743352623609840>,
 <Blob: movie_info_and_reviews, movie_reviews/cgv_reviews/괜찮아 괜찮아 괜찮아!_cgv_reviews.csv, 1743366802095322>,
 <Blob: movie_info_and_reviews, movie_reviews/cgv_reviews/귀신경찰_cgv_reviews.csv, 1742943230229472>,
 <Blob: movie_info_and_reviews, movie_reviews/cgv_reviews/그 시절, 우리가 좋아했던 소녀_cgv_reviews.csv, 1743366818196011>,
 <Blob: movie_info_a

# spark 연결

In [1]:
### 초기 설정
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, input_file_name, regexp_extract, col, regexp_replace
from pyspark.sql.types import StringType, ArrayType
from dotenv import load_dotenv
import os

load_dotenv()

GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
GOOGLE_CONNECTOR_HADOOP = os.getenv("GOOGLE_CONNECTOR_HADOOP")

In [2]:
spark = SparkSession.builder\
        .appName("ReviewPreprocessing")\
        .config("spark.jars.packages", "com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.16") \
        .config("spark.jars", GOOGLE_CONNECTOR_HADOOP)\
        .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")\
        .config("spark.hadoop.google.cloud.auth.service.account.enable", "true")\
        .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", GOOGLE_APPLICATION_CREDENTIALS)\
        .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
        .getOrCreate()

25/04/08 18:01:35 WARN Utils: Your hostname, haebinui-MacBookAir.local resolves to a loopback address: 127.0.0.1; using 192.168.219.101 instead (on interface en0)
25/04/08 18:01:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/haebin/Desktop/project/movie_review_project/Movie_analysis/.venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/haebin/.ivy2/cache
The jars for the packages stored in: /Users/haebin/.ivy2/jars
com.google.cloud.bigdataoss#gcs-connector added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2363bc77-cdd3-4ad9-bf7f-74ff1e848596;1.0
	confs: [default]
	found com.google.cloud.bigdataoss#gcs-connector;hadoop3-2.2.16 in central
	found com.google.api-client#google-api-client-jackson2;2.0.1 in central
	found com.google.api-client#google-api-client;2.0.1 in central
	found com.google.oauth-client#google-oauth-client;1.34.1 in central
	found com.google.http-client#google-http-client;1.42.3 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in central
	found org.apache.httpcomponents#httpcore;4.4.15 in central
	found commons-logging#commons-logging;1.2 in central
	found commons-codec#commons-codec;1.15 in central
	found com.google.code.findbugs#jsr305;3.0.2 in central
	found com.google.errorprone#error_prone_annotations;2.16 in centr

In [4]:
### 테스트
df = spark.read.option("header", True).csv("gs://movie_info_and_reviews/movie_reviews/watcha_reviews/")
df.show(5)

[Stage 2:>                                                          (0 + 1) / 1]

+--------------------------------+---------------------------------+-----------------------------------+
|                              id|                             star|                            context|
+--------------------------------+---------------------------------+-----------------------------------+
|                          이윤영|                              2.0|   감독 특유의 만화적인 연출, 풍...|
|  <미키 17>은 나쁘게 말하자면...|    SF 장르 영화로 보자니 지나...|    정작 그 손길의 본질은 들어있...|
|그래도 나쁘진 않다. 앞서 언급...| 상당히 직관적인 풍자는 편안한...|                               NULL|
|     + <미키 17>은 특정한 키워드|                             대사|   도상 등을 반복적으로 노출시켜...|
|                   이동진 평론가|                              4.0|파들어갈수록 넓어지는 흥미진진한...|
+--------------------------------+---------------------------------+-----------------------------------+
only showing top 5 rows



                                                                                

## merged_reviews_parquet 생성 후 gcs 업로드

In [3]:
### 전처리
def review_processing(path, platform):
    
    # 데이터 로드
    df = spark.read\
            .option("inferSchema", "true")\
            .option("recursiveFileLookup", "true") \
            .option("header",True)\
            .csv(path)
    
    
    # 컬럼 이름 통합
    column_mapping = {
        "name" : "id",
        "id" : "id",
        "review_date" : "date",
        "context" : "context",
        "star" : "star",
        "rating" : "star"
    }
    
    for old_col, new_col in column_mapping.items():
        if old_col in df.columns:
            df = df.withColumnRenamed(old_col, new_col)
    
    # 필요 없는 컬럼 제거
    if "who" in df.columns:
        df = df.drop("who")

    
    # 누락된 컬럼
    if "date" not in df.columns:
        df = df.withColumn("date", lit(None).cast(StringType()))
    if "star" not in df.columns:
        df = df.withColumn("star", lit(None).cast(StringType()))
    if "recommend_tags" not in df.columns and platform == "megabox":
        df = df.withColumn("recommend_tags", lit(None).cast(ArrayType(StringType())))
        
    df = df.withColumn("platform", lit(platform))
    
    # 영화명 추출(파일명에서 추출)
    df = df.withColumn("filename", input_file_name())
    df = df.withColumn("movieNm", regexp_extract("filename", f"/([^/]+)_{platform}_reviews.csv", 1))
    
    # 영화명 디코딩
    df = df.withColumn("movieNm", regexp_replace("movieNm", "%20", " "))
    df = df.withColumn("movieNm", regexp_replace("movieNm", "%5B", "["))
    df = df.withColumn("movieNm", regexp_replace("movieNm", "%5D", "]"))
    df = df.withColumn("movieNm", regexp_replace("movieNm", "%2C", ","))
    df = df.withColumn("movieNm", regexp_replace("movieNm", "%26", "&"))
    
    result_cols = ["movieNm", "id", "context", "star", "date", "platform"]
    if "recommend_tags" in df.columns:
        result_cols.append("recommend_tags")
        
    df = df.select(*result_cols)
    
    return df

watcha_df = review_processing("gs://movie_info_and_reviews/movie_reviews/watcha_reviews/", "watcha")
megabox_df = review_processing("gs://movie_info_and_reviews/movie_reviews/megabox_reviews/", "megabox")
cgv_df = review_processing("gs://movie_info_and_reviews/movie_reviews/cgv_reviews/", "cgv")
cine21_df = review_processing("gs://movie_info_and_reviews/movie_reviews/cine_reviews/", "cine")

# 통합
all_reviews_df = watcha_df.unionByName(cgv_df, allowMissingColumns=True)\
    .unionByName(cine21_df, allowMissingColumns=True)\
        .unionByName(megabox_df, allowMissingColumns=True)

# 저장
all_reviews_df.write.mode("overwrite").parquet("gs://movie_info_and_reviews/movie_reviews/merged_reviews.parquet")

25/04/08 17:19:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/04/08 17:20:15 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/04/08 17:20:54 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [None]:
### 빅쿼리 업로드
from google.cloud import bigquery
from google.oauth2 import service_account
from dotenv import load_dotenv
import os
import pandas as pd

load_dotenv()

GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")

gcs_path = "gs://movie_info_and_reviews/movie_reviews/merged_reviews.parquet/*.parquet"

# bigquery 설정
project_id = "<project_id>"     # gcp 프로젝트 id
dataset_id = "movie_dashboard"  # 빅쿼리 데이터셋
table_id = "review_data"    # 빅쿼리 테이블명
key_path = GOOGLE_APPLICATION_CREDENTIALS       # 서비스 계정 키 경로

# 자격증명 및 클라이언트 초기화
credentials = service_account.Credentials.from_service_account_file(key_path)
client = bigquery.Client(credentials=credentials, project=project_id)

# 업로드 설정
job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.PARQUET,
        write_disposition="WRITE_TRUNCATE"  # 기존 테이블 덮어쓰기
    )

# 테이블 id 전체 경로
table_ref = f"{project_id}.{dataset_id}.{table_id}"

# 업로드
load_job = client.load_table_from_uri(
        gcs_path, table_ref, job_config=job_config
    )
load_job.result()

print(f"Bigquery 업로드 완료 : {table_ref}")

Bigquery 업로드 완료 : global-cursor-454210-i1.movie_dashboard.review_data


# 분석하기

In [10]:
### 별점
from pyspark.sql.functions import col, avg, count

df.filter(col("star").isNotNull())\
    .groupBy("platform")\
    .agg(avg(col("star")).alias("avg_star"),count("*").alias("review_count"))\
    .orderBy("avg_star",ascending=False)\
    .show()



+--------+------------------+------------+
|platform|          avg_star|review_count|
+--------+------------------+------------+
|    cine| 7.609803921568627|         759|
| megabox| 4.284972904684332|      150424|
|  watcha|3.7738492155798506|       45187|
+--------+------------------+------------+



                                                                                

In [11]:
### 트렌드 분석
from pyspark.sql.functions import to_date

df_date = df.withColumn("date", to_date("date", "yyyy.MM.dd"))
df_date.groupBy("date").count().orderBy("date").show()



+----------+-----+
|      date|count|
+----------+-----+
|      NULL|81241|
|2011-10-02|    1|
|2011-10-18|    1|
|2011-10-21|    1|
|2011-10-30|    1|
|2011-10-31|    1|
|2011-11-08|    1|
|2011-11-22|    1|
|2011-11-23|    1|
|2011-11-24|    1|
|2011-11-26|    1|
|2011-11-28|    1|
|2012-01-01|    1|
|2012-01-05|    1|
|2012-02-18|    1|
|2012-02-19|    1|
|2012-02-21|    1|
|2012-03-13|    1|
|2012-03-20|    1|
|2012-03-22|    1|
+----------+-----+
only showing top 20 rows



                                                                                

In [13]:
### 영화별 리뷰 수 / 플랫폼별 비중
df.groupBy("movieNm", "platform").count().orderBy("count", ascending=False).show()



+------------------------------+--------+-----+
|                       movieNm|platform|count|
+------------------------------+--------+-----+
|                     미키%2017|     cgv|22035|
|                        위키드|     cgv|18435|
|캡틴%20아메리카:%20브레이브...|     cgv|17833|
|                 검은%20수녀들|     cgv|17524|
|                    모아나%202|     cgv|16178|
|                        하얼빈| megabox|13756|
|                     미키%2017| megabox|12628|
|                        퇴마록|     cgv|11968|
|                        위키드| megabox|11745|
|                    서브스턴스|     cgv|11284|
|         무파사:%20라이온%20킹|     cgv|10810|
|       말할%20수%20없는%20비밀|     cgv|10086|
|                        대가족| megabox| 9019|
|                        소방관| megabox| 9019|
|보고타:%20마지막%20기회의%20땅|     cgv| 8378|
|                       히트맨2|     cgv| 7919|
|             힘내라%20대한민국| megabox| 7786|
|                       히트맨2| megabox| 7786|
|캡틴%20아메리카:%20브레이브...| megabox| 6823|
|       말할%20수%20없는%20비밀| megabox| 6

                                                                                

In [14]:
# 워드클라우드용
review_texts = df.select("context").rdd.map(lambda row: row["context"]).collect()

                                                                                

In [15]:
# 메가박스 추천 키워드
from os import truncate


megabox_recommend_df = df.filter(df.platform == "megabox")
megabox_recommend_df.select("recommend_tags").show(truncate=False)



+------------------+
|recommend_tags    |
+------------------+
|[]                |
|['스토리']        |
|[]                |
|['영상미', '배우']|
|['영상미', '배우']|
|['스토리']        |
|[]                |
|['스토리']        |
|['배우']          |
|['연출', '스토리']|
|['연출', '배우']  |
|[]                |
|['영상미', '배우']|
|[]                |
|[]                |
|['연출', '배우']  |
|[]                |
|['연출', '배우']  |
|['배우']          |
|[]                |
+------------------+
only showing top 20 rows



                                                                                

## megabox 추천 키워드 parquet 생성 후 업로드

In [4]:
### 메가박스 추천 키워드 분석
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, count, from_json
from pyspark.sql.types import ArrayType, StringType

spark = SparkSession.builder\
                    .appName("MegaboxProcess")\
                    .getOrCreate()
                    
df = spark.read.parquet("gs://movie_info_and_reviews/movie_reviews/merged_reviews.parquet")

df = df.withColumn(
    "recommend_tags",
    from_json("recommend_tags", ArrayType(StringType()))
)

megabox_keyword_df = df.filter((col("platform") == "megabox") & (col("recommend_tags").isNotNull()))

explode_keyword_df = megabox_keyword_df.withColumn("keyword", explode("recommend_tags"))

keyword_summary = explode_keyword_df.groupBy("movieNm", "keyword").agg(count("*").alias("count"))

keyword_summary.write.mode("overwrite").parquet("gs://movie_info_and_reviews/movie_reviews/megabox_keywords.parquet")

25/04/01 22:56:03 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

## 리뷰 전처리 후 parquet 생성 후 gcs업로드

In [4]:
### 리뷰 전처리
from pyspark.sql.functions import col, when, length, trim, to_date
from pyspark.sql.types import DoubleType

spark = SparkSession.builder\
                    .appName("Reviewprocess")\
                    .getOrCreate()

df = spark.read.parquet("gs://movie_info_and_reviews/movie_reviews/merged_reviews.parquet")

# null, 빈 문자열 제거
df = df.filter(
    (col("context").isNotNull()) & (length(trim(col("context"))) > 0)
)

# 중복 제거 (id, context, movieNm)
df = df.dropDuplicates(['id', 'context', 'movieNm'])

# 별점 있는 경우만 숫자로 변환
df = df.withColumn("star", when(col("star").isNotNull(), col("star").cast(DoubleType())))

# 별점이 존재하는 경우에만 0~5 필터 적용
df = df.filter(
    (col("star").isNull()) | ((col("star") >= 0) & (col("star") <= 5))
)

# date 없는 경우 처리
df = df.withColumn("date", when(col("date") == "", None).otherwise(col("date")))

# 날짜 포맷을 'yyyy-MM-dd' 형식으로 변환
# 예: '2025.03.19' → '2025-03-19'
df = df.withColumn("date", to_date(col("date"), "yyyy.MM.dd"))

# 리뷰 길이 필터링 (너무 짧은건 없애기...의미 없을 확률이 높으니까)
df = df.filter(length(col("context")) >= 10)

# 저장하기
df.write.mode("overwrite").parquet("gs://movie_info_and_reviews/movie_reviews/cleaned_reviews.parquet")

25/04/08 18:02:49 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [None]:
# 전처리한 리뷰 데이터, 통합한 데이터 parquet 형식
from google.cloud import bigquery
from google.cloud import bigquery
from google.oauth2 import service_account

# bigquery 설정
project_id = "<project_id>"     # gcp 프로젝트 id
dataset_id = "movie_reviews"  # 빅쿼리 데이터셋
#table_id = "review_data"    # 빅쿼리 테이블명
key_path = GOOGLE_APPLICATION_CREDENTIALS       # 서비스 계정 키 경로

def upload_parquet_to_bigquery(gcs_path: str, table_id: str):
    '''
    GCS에 있는 Parquet 파일을 BigQuery로 업로드하는 함수.
    '''
    # 자격증명 및 클라이언트 초기화
    credentials = service_account.Credentials.from_service_account_file(key_path)
    client = bigquery.Client(credentials=credentials, project=project_id)

    
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.PARQUET,
        write_disposition="WRITE_TRUNCATE"  # 기존 테이블 덮어쓰기
    )
    # 테이블 id 전체 경로
    table_ref = f"{project_id}.{dataset_id}.{table_id}"
    
    load_job = client.load_table_from_uri(
        gcs_path, table_ref, job_config=job_config
    )
    
    load_job.result()
    print(f"[DONE] {gcs_path} -> {table_id} 업로드 완료")

upload_parquet_to_bigquery(
    gcs_path="gs://movie_info_and_reviews/movie_reviews/cleaned_reviews.parquet/*.parquet",
    table_id="cleaned_reviews"
)

[DONE] gs://movie_info_and_reviews/movie_reviews/cleaned_reviews.parquet/*.parquet -> cleaned_reviews 업로드 완료


## 워드 클라우드용 parquet 생성 후 gcs 업로드 & bigQuery 업로드

In [1]:
## 전체 리뷰 워드 클라우드
from numpy import broadcast
import pandas as pd
import re, os
from wordcloud import WordCloud
import matplotlib.pyplot as plt
from collections import Counter
from dotenv import load_dotenv 
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, split, lower, regexp_replace, trim
from pyspark.sql.types import StringType

load_dotenv()

GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
GOOGLE_CONNECTOR_HADOOP = os.getenv("GOOGLE_CONNECTOR_HADOOP")

spark = SparkSession.builder\
        .appName("ReviewWordCloud")\
        .config("spark.jars.packages", "com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.16") \
        .config("spark.jars", GOOGLE_CONNECTOR_HADOOP)\
        .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")\
        .config("spark.hadoop.google.cloud.auth.service.account.enable", "true")\
        .config("spark.hadoop.google.cloud.auth.service.account.json.keyfile", GOOGLE_APPLICATION_CREDENTIALS)\
        .config("spark.hadoop.fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
        .getOrCreate()

df = spark.read.parquet("gs://movie_info_and_reviews/movie_reviews/cleaned_reviews.parquet")

# 결측치 제거 및 한글만 남기고, 소문자화
df = df.withColumn("clean_text", regexp_replace(col("context"), r"[^ㄱ-ㅎ가-힣\s]", ""))
df = df.withColumn("clean_text", trim(lower(col("clean_text"))))

# 단어 단위로 분리
df_words = df.withColumn("word", explode(split(col("clean_text"), "\s+")))

# 불용어
stopwords = ["영화", "정말", "너무", "근데", "그리고"]
broadcast_stopwords = spark.sparkContext.broadcast(stopwords)

# 불용어 및 1글자 제외
df_filtered = df_words.filter(
    (col("word").isNotNull()) &
    (col("word") != "") &
    (~col("word").isin(broadcast_stopwords.value)) &
    (col("word").rlike(r"^[ㄱ-ㅎ가-힣]{2,}$"))
)

# 영화별 키워드 빈도수 집계
df_keywords = df_filtered.groupBy("movieNm", "word").count().withColumnRenamed("word", "keyword")

# 저장
df_keywords.write.mode("overwrite").parquet("gs://movie_info_and_reviews/movie_reviews/movie_keywords.parquet")


25/04/08 22:28:00 WARN Utils: Your hostname, haebinui-MacBookAir.local resolves to a loopback address: 127.0.0.1; using 192.168.219.101 instead (on interface en0)
25/04/08 22:28:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/haebin/Desktop/project/movie_review_project/Movie_analysis/.venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/haebin/.ivy2/cache
The jars for the packages stored in: /Users/haebin/.ivy2/jars
com.google.cloud.bigdataoss#gcs-connector added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-efae861a-5301-47dd-ac73-f23c9eea5282;1.0
	confs: [default]
	found com.google.cloud.bigdataoss#gcs-connector;hadoop3-2.2.16 in central
	found com.google.api-client#google-api-client-jackson2;2.0.1 in central
	found com.google.api-client#google-api-client;2.0.1 in central
	found com.google.oauth-client#google-oauth-client;1.34.1 in central
	found com.google.http-client#google-http-client;1.42.3 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in central
	found org.apache.httpcomponents#httpcore;4.4.15 in central
	found commons-logging#commons-logging;1.2 in central
	found commons-codec#commons-codec;1.15 in central
	found com.google.code.findbugs#jsr305;3.0.2 in central
	found com.google.errorprone#error_prone_annotations;2.16 in centr

In [None]:
# 워드클라우드용 키워드 분리한거 빅쿼리 업로드
from google.cloud import bigquery
from google.cloud import bigquery
from google.oauth2 import service_account

# bigquery 설정
project_id = "<project_id>"     # gcp 프로젝트 id
dataset_id = "movie_reviews"  # 빅쿼리 데이터셋
#table_id = "review_data"    # 빅쿼리 테이블명
key_path = GOOGLE_APPLICATION_CREDENTIALS       # 서비스 계정 키 경로

def upload_parquet_to_bigquery(gcs_path: str, table_id: str):
    '''
    GCS에 있는 Parquet 파일을 BigQuery로 업로드하는 함수.
    '''
    # 자격증명 및 클라이언트 초기화
    credentials = service_account.Credentials.from_service_account_file(key_path)
    client = bigquery.Client(credentials=credentials, project=project_id)

    
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.PARQUET,
        write_disposition="WRITE_TRUNCATE"  # 기존 테이블 덮어쓰기
    )
    # 테이블 id 전체 경로
    table_ref = f"{project_id}.{dataset_id}.{table_id}"
    
    load_job = client.load_table_from_uri(
        gcs_path, table_ref, job_config=job_config
    )
    
    load_job.result()
    print(f"[DONE] {gcs_path} -> {table_id} 업로드 완료")

upload_parquet_to_bigquery(
    gcs_path="gs://movie_info_and_reviews/movie_reviews/movie_keywords.parquet/*.parquet",
    table_id="movie_wordcloud"
)

[DONE] gs://movie_info_and_reviews/movie_reviews/movie_keywords.parquet/*.parquet -> movie_wordcloud 업로드 완료


In [None]:
# 워드클라우드용 키워드 분리한거 빅쿼리 업로드
from google.cloud import bigquery
from google.cloud import bigquery
from google.oauth2 import service_account
from dotenv import load_dotenv 
import os

load_dotenv()

GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
GOOGLE_CONNECTOR_HADOOP = os.getenv("GOOGLE_CONNECTOR_HADOOP")

# bigquery 설정
project_id = "<project_id>"     # gcp 프로젝트 id
dataset_id = "movie_reviews"  # 빅쿼리 데이터셋
#table_id = "review_data"    # 빅쿼리 테이블명
key_path = GOOGLE_APPLICATION_CREDENTIALS       # 서비스 계정 키 경로

def upload_parquet_to_bigquery(gcs_path: str, table_id: str):
    '''
    GCS에 있는 Parquet 파일을 BigQuery로 업로드하는 함수.
    '''
    # 자격증명 및 클라이언트 초기화
    credentials = service_account.Credentials.from_service_account_file(key_path)
    client = bigquery.Client(credentials=credentials, project=project_id)

    
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.PARQUET,
        write_disposition="WRITE_TRUNCATE"  # 기존 테이블 덮어쓰기
    )
    # 테이블 id 전체 경로
    table_ref = f"{project_id}.{dataset_id}.{table_id}"
    
    load_job = client.load_table_from_uri(
        gcs_path, table_ref, job_config=job_config
    )
    
    load_job.result()
    print(f"[DONE] {gcs_path} -> {table_id} 업로드 완료")

upload_parquet_to_bigquery(
    gcs_path="gs://movie_info_and_reviews/movie_reviews/movie_word_frequencies.parquet",
    table_id="movie_word_frequencies"
)

[DONE] gs://movie_info_and_reviews/movie_reviews/movie_word_frequencies.parquet -> movie_word_frequencies 업로드 완료


# clean_reviews.parquet 분석

In [7]:
# 병합된 리뷰 불러오기
df = spark.read.parquet("gs://movie_info_and_reviews/movie_reviews/cleaned_reviews.parquet")

#영화별 리뷰 수
review_count_df = df.groupBy("movieNm").agg(count("*").alias("review_count"))

# 날짜 데이터가 존재하는 경우 => 영화별 일 평균 리뷰 수 분석
if 'date' in df.columns:
    from pyspark.sql.functions import to_date, min, max, datediff
    
    df = df.withColumn("date", to_date("date"))
    
    date_range_df = df.groupBy("movieNm").agg(
        min("date").alias("min_date"),
        max("date").alias("max_date"),
        count("*").alias("review_count")
    ).withColumn("days_active", datediff("max_date", "min_date") + 1)
    
    result_df = date_range_df.withColumn(
        "avg_reviews_per_day", (col("review_count") / col("days_active")).cast("double")
    )
    
    result_df.orderBy(col("avg_reviews_per_day").desc()).show(truncate=False)
else:
    review_count_df.orderBy(col("review_count").desc()).show(truncate=False)



+---------------------------------------+----------+----------+------------+-----------+-------------------+
|movieNm                                |min_date  |max_date  |review_count|days_active|avg_reviews_per_day|
+---------------------------------------+----------+----------+------------+-----------+-------------------+
|대가족                                 |2025-02-15|2025-02-15|9232        |1          |9232.0             |
|패딩턴:%20페루에%20가다!               |2025-03-07|2025-03-07|5999        |1          |5999.0             |
|보고타:%20마지막%20기회의%20땅         |2025-01-05|2025-01-06|6481        |2          |3240.5             |
|시빌%20워:%20분열의%20시대             |2025-01-05|2025-01-05|3087        |1          |3087.0             |
|수퍼%20소닉3                           |2025-01-28|2025-01-28|2271        |1          |2271.0             |
|히트맨2                                |2025-01-23|2025-01-26|8510        |4          |2127.5             |
|캡틴%20아메리카:%20브레이브%20뉴%20월드|2025-02-13|2025-02-16|8354 

                                                                                

In [8]:
### 평점 분석
from pyspark.sql.functions import avg, stddev, count

rating_df = df.filter(col("star").isNotNull())

#영화별 평균 평점
rating_summary = rating_df.groupBy("movieNm").agg(
    count("star").alias("rating_count"),
    avg("star").alias("avg_rating"),
    stddev("star").alias("stddev_rating")
)

rating_summary.orderBy(col("avg_rating").desc()).show(truncate=False)



+-----------------------------------------------------------------------------------+------------+------------------+------------------+
|movieNm                                                                            |rating_count|avg_rating        |stddev_rating     |
+-----------------------------------------------------------------------------------+------------+------------------+------------------+
|케이온                                                                             |1           |10.0              |NULL              |
|초혼,%20다시%20부르는%20노래                                                       |2           |10.0              |0.0               |
|양들의%20침묵                                                                      |46          |8.652173913043478 |1.0158169411714961|
|위플래쉬                                                                           |27          |8.25925925925926  |1.1633040674201884|
|아노라                                                          

                                                                                

In [10]:
### 플랫폼별 리뷰 분포
platform_summary = df.groupBy("platform").agg(
    count("*").alias("review_count"),
    avg("star").alias("avg_rating")
)

platform_summary.orderBy(col("review_count").desc()).show()



+--------+------------+-----------------+
|platform|review_count|       avg_rating|
+--------+------------+-----------------+
| megabox|      145580| 4.28784860557769|
|  watcha|       34067|3.503258285143981|
|    cine|         445|7.615730337078651|
+--------+------------+-----------------+



                                                                                

In [12]:
#### 추천 키워드
from pyspark.sql.functions import explode, col, from_json
from pyspark.sql.types import ArrayType, StringType

df = df.withColumn(
    "recommend_tags_array",
    from_json(col("recommend_tags"), ArrayType(StringType()))
)

keyword_df = df.filter(col("recommend_tags_array").isNotNull())

keyword_explode = keyword_df.withColumn("tag", explode("recommend_tags_array"))

tags_summary = keyword_explode.groupBy("tag").count().orderBy(col("count").desc())

tags_summary.show(truncate=False)



+------+-----+
|tag   |count|
+------+-----+
|배우  |38543|
|스토리|28726|
|영상미|23319|
|연출  |21037|
|OST   |9499 |
+------+-----+



                                                                                

### 영화 분석

In [None]:
### 영화별 리뷰 수 및 평균 평점 분석
'''
활용:
리뷰 수가 많은 영화 = 화제성 높은 영화

평균 평점으로 관객 만족도 비교 가능
→ 마케팅 타겟 선정에 활용 (입소문 난 영화 vs. 저평가된 영화)
'''
from pyspark.sql.functions import col, count, avg, desc

# 영화별 리뷰 수와 평균 평점
movie_summary = df.groupBy("movieNm").agg(
    count("context").alias("review_count"),
    avg("star").alias("avg_rating")
).orderBy(desc("review_count"))

movie_summary.show(truncate=False)



+---------------------------------------+------------+------------------+
|movieNm                                |review_count|avg_rating        |
+---------------------------------------+------------+------------------+
|하얼빈                                 |15310       |4.253461789679948 |
|미키%2017                              |14187       |4.284485796856277 |
|위키드                                 |13298       |4.4691306963453155|
|소방관                                 |10035       |4.218634778276034 |
|대가족                                 |9232        |4.3944432409012135|
|히트맨2                                |8510        |3.7485311398354875|
|말할%20수%20없는%20비밀                |8399        |4.178175973330158 |
|캡틴%20아메리카:%20브레이브%20뉴%20월드|8354        |3.908486952358152 |
|힘내라%20대한민국                      |7791        |3.9243999486587087|
|서브스턴스                             |6770        |4.168316100443131 |
|보고타:%20마지막%20기회의%20땅         |6481        |3.6283752507329115|
|무파사:%20라이온%20킹        

                                                                                

In [None]:
### 평점 분포 파악
'''
활용:
특정 영화에 대한 별점 편향(예: 5점 몰림, 1점 테러 등)을 확인

마케팅 타겟팅: "평균은 낮지만 특정 층은 강하게 선호"하는 영화 발견
'''
from pyspark.sql.functions import round

rating_distribution = df.withColumn("round_rating", round(col("star") * 2) / 2)

rating_dist_summary = rating_distribution.groupBy("movieNm", "round_rating")\
                                            .count().orderBy("movieNm", "round_rating")
                                            
rating_dist_summary.show()



+--------------------------+------------+-----+
|                   movieNm|round_rating|count|
+--------------------------+------------+-----+
|            500일의%20썸머|         0.5|   32|
|            500일의%20썸머|         1.0|   22|
|            500일의%20썸머|         1.5|   24|
|            500일의%20썸머|         2.0|   49|
|            500일의%20썸머|         2.5|  100|
|            500일의%20썸머|         3.0|  261|
|            500일의%20썸머|         3.5|  485|
|            500일의%20썸머|         4.0|  861|
|            500일의%20썸머|         4.5|  591|
|            500일의%20썸머|         5.0| 1110|
|            500일의%20썸머|         6.0|    3|
|            500일의%20썸머|         7.0|    5|
|            500일의%20썸머|         8.0|   18|
|            500일의%20썸머|         9.0|    7|
|            500일의%20썸머|        10.0|    7|
|9월%205일:%20위험한%20특종|         2.0|    3|
|9월%205일:%20위험한%20특종|         2.5|   14|
|9월%205일:%20위험한%20특종|         3.0|   32|
|9월%205일:%20위험한%20특종|         3.5|  134|
|9월%205일:%20위험한%20특종|         4.

                                                                                

In [None]:
### 영화별 리뷰 키워드 빈도 분석
'''
활용:
영화별 강점 포인트 파악 (예: “연출”, “OST”, “배우”, “스토리” 등)

마케팅 문구, 예고편 포인트 선정
'''
tags_df = df.filter(col("recommend_tags_array").isNotNull())\
            .withColumn("tag", explode("recommend_tags_array"))

tags_summary = tags_df.groupBy("movieNm", "tag").count().orderBy("movieNm", "count", ascending=[True, False])

tags_summary.show(truncate=False)



+-----------------------------------------------+------+-----+
|movieNm                                        |tag   |count|
+-----------------------------------------------+------+-----+
|500일의%20썸머                                 |배우  |839  |
|500일의%20썸머                                 |스토리|583  |
|500일의%20썸머                                 |연출  |297  |
|500일의%20썸머                                 |영상미|96   |
|500일의%20썸머                                 |OST   |63   |
|검은%20수녀들                                  |스토리|42   |
|검은%20수녀들                                  |영상미|32   |
|검은%20수녀들                                  |연출  |14   |
|검은%20수녀들                                  |OST   |9    |
|검은%20수녀들                                  |배우  |3    |
|그%20시절,%20우리가%20좋아했던%20소녀          |배우  |984  |
|그%20시절,%20우리가%20좋아했던%20소녀          |영상미|734  |
|그%20시절,%20우리가%20좋아했던%20소녀          |스토리|696  |
|그%20시절,%20우리가%20좋아했던%20소녀          |OST   |464  |
|그%20시절,%20우리가%20좋아했던%20소녀          |연출  |420  |
|

                                                                                

In [None]:
### 트렌드 분석 (시간 흐름에 따른 리뷰 증가/감소)
'''
활용:
리뷰 증가 시점 → 이벤트 or 입소문 발생 확인

타이밍 기반 마케팅 (특정 요일/주간에 집중)
'''
from pyspark.sql.functions import to_date

df_with_date = df.withColumn("review_date", to_date("date"))
movie_trend = df_with_date.groupBy("movieNm", "review_date").count().orderBy("movieNm", "review_date")

movie_trend.show()



+--------------+-----------+-----+
|       movieNm|review_date|count|
+--------------+-----------+-----+
|500일의%20썸머|       NULL| 3532|
|500일의%20썸머| 2010-03-16|    1|
|500일의%20썸머| 2010-03-17|    1|
|500일의%20썸머| 2010-03-20|    1|
|500일의%20썸머| 2010-03-22|    1|
|500일의%20썸머| 2010-03-23|    2|
|500일의%20썸머| 2010-03-24|    2|
|500일의%20썸머| 2010-03-26|    2|
|500일의%20썸머| 2010-03-29|    2|
|500일의%20썸머| 2010-03-30|    1|
|500일의%20썸머| 2010-04-02|    1|
|500일의%20썸머| 2010-04-03|    1|
|500일의%20썸머| 2010-04-06|    2|
|500일의%20썸머| 2010-04-07|    1|
|500일의%20썸머| 2010-05-17|    1|
|500일의%20썸머| 2010-05-18|    1|
|500일의%20썸머| 2010-07-24|    1|
|500일의%20썸머| 2010-07-27|    1|
|500일의%20썸머| 2010-09-29|    1|
|500일의%20썸머| 2010-11-12|    1|
+--------------+-----------+-----+
only showing top 20 rows



                                                                                

In [6]:
from google.cloud import storage

client = storage.Client()
bucket = client.bucket("movie_info_and_reviews")

# 이 경로에 있는 파일 목록 보기
blobs = bucket.list_blobs(prefix="movie_reviews/cleaned_reviews")
for blob in blobs:
    print(blob.name)

movie_reviews/cleaned_reviews.parquet/
movie_reviews/cleaned_reviews.parquet/_SUCCESS
movie_reviews/cleaned_reviews.parquet/part-00000-89bba8f7-5a77-4641-a8b1-882f5f1ae2ee-c000.snappy.parquet
movie_reviews/cleaned_reviews.parquet/part-00001-89bba8f7-5a77-4641-a8b1-882f5f1ae2ee-c000.snappy.parquet
movie_reviews/cleaned_reviews.parquet/part-00002-89bba8f7-5a77-4641-a8b1-882f5f1ae2ee-c000.snappy.parquet
movie_reviews/cleaned_reviews.parquet/part-00003-89bba8f7-5a77-4641-a8b1-882f5f1ae2ee-c000.snappy.parquet
movie_reviews/cleaned_reviews.parquet/part-00004-89bba8f7-5a77-4641-a8b1-882f5f1ae2ee-c000.snappy.parquet
movie_reviews/cleaned_reviews.parquet/part-00005-89bba8f7-5a77-4641-a8b1-882f5f1ae2ee-c000.snappy.parquet
movie_reviews/cleaned_reviews.parquet/part-00006-89bba8f7-5a77-4641-a8b1-882f5f1ae2ee-c000.snappy.parquet
movie_reviews/cleaned_reviews.parquet/part-00007-89bba8f7-5a77-4641-a8b1-882f5f1ae2ee-c000.snappy.parquet
movie_reviews/cleaned_reviews.parquet/part-00008-89bba8f7-5a77-464

In [None]:
# bigquery 설정
project_id = "global-cursor-454210-i1"     # gcp 프로젝트 id
dataset_id = "movie_boxoffice"  # 빅쿼리 데이터셋
#table_id = "review_data"    # 빅쿼리 테이블명
key_path = GOOGLE_APPLICATION_CREDENTIALS       # 서비스 계정 키 경로

def get_date():
    from datetime import datetime, timedelta
    
    """
    어제 날짜일자를 구하는 함수입니다.
    """
    return (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")

def upload_csv_to_bigquery(gcs_path:str, table_id:str, schema=None):
    '''
    GCS에 있는 CSV파일을 빅쿼리에 업로드하는 함수
    '''
    # 자격증명 및 클라이언트 초기화
    credentials = service_account.Credentials.from_service_account_file(key_path)
    client = bigquery.Client(credentials=credentials, project=project_id)
    
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        autodetect=True if schema is None else False,
        schema=schema,
        write_disposition="WRITE_TRUNCATE"  # WRITE_APPEND : 기존 테이블에 추가, WRITE_EMPTY : 테이블이 비어있을 때만 업로드. 비어있지 않으면 에러
    )
    
    # 테이블 id 전체 경로
    table_ref = f"{project_id}.{dataset_id}.{table_id}"
    
    load_job = client.load_table_from_uri(
        gcs_path, table_ref, job_config=job_config
    )
    
    load_job.result()
    print(f"[DONE] {gcs_path} → {table_id} 업로드 완료")
    
upload_csv_to_bigquery(
    f"gs://movie_info_and_reviews/daily_boxoffice/daily_box_office_{get_date()}.csv",
    table_id="daily_boxoffice"
)

upload_csv_to_bigquery(
    f"gs://movie_info_and_reviews/daily_regions_boxoffice/daily_region_box_office_{get_date()}.csv",
    table_id="daily_region_boxoffice"
)

[DONE] gs://movie_info_and_reviews/daily_regions_boxoffice/daily_region_box_office_20250331.csv → daily_region_boxoffice 업로드 완료


In [7]:
upload_parquet_to_bigquery(
    gcs_path="gs://movie_info_and_reviews/movie_reviews/megabox_keywords.parquet/*.parquet",
    table_id="review_tags"
)

[DONE] gs://movie_info_and_reviews/movie_reviews/megabox_keywords.parquet/*.parquet -> review_tags 업로드 완료


25/04/01 23:57:09 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1288514 ms exceeds timeout 120000 ms
25/04/01 23:57:09 WARN SparkContext: Killing executors is not supported by current scheduler.
25/04/01 23:57:11 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

## 박스오피스 파일 한번에 업로드

In [5]:
### spark로 병합 후 parquet저장 -> bigquery 업로드
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, regexp_extract, to_date, lit
import re

spark = SparkSession.builder.appName("BoxOfficeMerge").getOrCreate()

folder_path = "gs://movie_info_and_reviews/daily_boxoffice/daily_box_office_*.csv"

# csv 불러오기
df = spark.read\
            .option("inferSchema", "true")\
            .option("recursiveFileLookup", "true") \
            .option("header",True)\
            .option("encoding", "UTF-8")\
            .option("multiLine", False)\
            .option("sep", ",")\
            .csv(folder_path)

# targetDt 추출
df = df.withColumn("filename", input_file_name())
df = df.withColumn("boxoffice_date", regexp_extract("filename", r"daily_box_office_(\d{8})\.csv", 1))
df = df.withColumn("boxoffice_date", to_date("boxoffice_date", "yyyyMMdd"))

# 컬럼 캐스팅
from pyspark.sql.types import IntegerType, DoubleType, LongType, DateType
df = df.withColumn("rank", col("rank").cast(IntegerType())) \
       .withColumn("rankInten", col("rankInten").cast(IntegerType())) \
       .withColumn("salesAmt", col("salesAmt").cast(LongType())) \
       .withColumn("salesShare", col("salesShare").cast(DoubleType())) \
       .withColumn("salesInten", col("salesInten").cast(LongType())) \
       .withColumn("salesChange", col("salesChange").cast(DoubleType())) \
       .withColumn("salesAcc", col("salesAcc").cast(LongType())) \
       .withColumn("audiCnt", col("audiCnt").cast(IntegerType())) \
       .withColumn("audiInten", col("audiInten").cast(IntegerType())) \
       .withColumn("audiChange", col("audiChange").cast(DoubleType())) \
       .withColumn("audiAcc", col("audiAcc").cast(LongType())) \
       .withColumn("scrnCnt", col("scrnCnt").cast(IntegerType())) \
       .withColumn("showCnt", col("showCnt").cast(IntegerType())) \
       .withColumn("prdtYear", col("prdtYear").cast(IntegerType())) \
       .withColumn("showTm", col("showTm").cast(IntegerType())) \
       .withColumn("openDt", col("openDt").cast(DateType()))

df.write.mode("overwrite").parquet("gs://movie_info_and_reviews/daily_boxoffice/merged_boxoffice.parquet")

25/04/05 09:17:25 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [None]:
### 빅쿼리 업로드
from pyspark.sql import SparkSession
from google.cloud import storage
from google.cloud import bigquery
from google.oauth2 import service_account
from dotenv import load_dotenv
import os

load_dotenv()

GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
GOOGLE_CONNECTOR_HADOOP = os.getenv("GOOGLE_CONNECTOR_HADOOP")

project_id = "<project_id>"     # gcp 프로젝트 id
bucket_name = "movie_info_and_reviews"
parquet_output_path = "gs://movie_info_and_reviews/daily_boxoffice/merged_boxoffice.parquet/*.parquet"
table_id = "movie_boxoffice.merged_daily_boxoffice"  # ← 프로젝트.데이터셋.테이블

# BigQuery 업로드 함수
def upload_parquet_to_bigquery(gcs_path, table_id):
    credentials = service_account.Credentials.from_service_account_file(GOOGLE_APPLICATION_CREDENTIALS)
    client = bigquery.Client(credentials=credentials, project=project_id)

    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.PARQUET,
        write_disposition="WRITE_TRUNCATE"  # 초기 생성: 덮어쓰기
    )
    table_ref = f'{project_id}.{table_id}'
    load_job = client.load_table_from_uri(gcs_path, table_id, job_config=job_config)
    load_job.result()
    print(f"[DONE] {gcs_path} → {table_id} 업로드 완료")

upload_parquet_to_bigquery(parquet_output_path, table_id)

[DONE] gs://movie_info_and_reviews/daily_boxoffice/merged_boxoffice.parquet/*.parquet → movie_boxoffice.merged_daily_boxoffice 업로드 완료


# 지역별 박스오피스 한번에 업로드

In [3]:
### spark로 병합 후 parquet저장 -> bigquery 업로드
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, regexp_extract, to_date, lit
import re

spark = SparkSession.builder.appName("BoxOfficeMerge").getOrCreate()

folder_path = "gs://movie_info_and_reviews/daily_regions_boxoffice/daily_region_box_office_*.csv"

# csv 불러오기
df = spark.read\
            .option("inferSchema", "true")\
            .option("recursiveFileLookup", "true") \
            .option("header",True)\
            .csv(folder_path)

# targetDt 추출
df = df.withColumn("filename", input_file_name())
df = df.withColumn("boxoffice_date", regexp_extract("filename", r"daily_region_box_office_(\d{8})\.csv", 1))
df = df.withColumn("boxoffice_date", to_date("boxoffice_date", "yyyyMMdd"))

# 컬럼 캐스팅
from pyspark.sql.types import IntegerType, DoubleType, LongType, DateType
df = df.withColumn("rank", col("rank").cast(IntegerType())) \
       .withColumn("rankInten", col("rankInten").cast(IntegerType())) \
       .withColumn("salesAmt", col("salesAmt").cast(LongType())) \
       .withColumn("salesAcc", col("salesAcc").cast(LongType())) \
       .withColumn("audiCnt", col("audiCnt").cast(IntegerType())) \
       .withColumn("audiAcc", col("audiAcc").cast(LongType())) \
       .withColumn("scrnCnt", col("scrnCnt").cast(IntegerType())) \
       .withColumn("showCnt", col("showCnt").cast(IntegerType())) \
       .withColumn("prdtYear", col("prdtYear").cast(IntegerType())) \
       .withColumn("showTm", col("showTm").cast(IntegerType()))

df.write.mode("overwrite").parquet("gs://movie_info_and_reviews/daily_regions_boxoffice/merged_regions_boxoffice.parquet")

25/04/05 09:27:06 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/04/05 09:29:28 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [None]:
### 빅쿼리 업로드
from pyspark.sql import SparkSession
from google.cloud import storage
from google.cloud import bigquery
from google.oauth2 import service_account
from dotenv import load_dotenv
import os

load_dotenv()

GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
GOOGLE_CONNECTOR_HADOOP = os.getenv("GOOGLE_CONNECTOR_HADOOP")

project_id = "<project_id>"     # gcp 프로젝트 id
bucket_name = "movie_info_and_reviews"
parquet_output_path = "gs://movie_info_and_reviews/daily_regions_boxoffice/merged_regions_boxoffice.parquet/*.parquet"
table_id = "movie_boxoffice.merged_daily_region_boxoffice"  # ← 프로젝트.데이터셋.테이블

# BigQuery 업로드 함수
def upload_parquet_to_bigquery(gcs_path, table_id):
    credentials = service_account.Credentials.from_service_account_file(GOOGLE_APPLICATION_CREDENTIALS)
    client = bigquery.Client(credentials=credentials, project=project_id)

    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.PARQUET,
        write_disposition="WRITE_TRUNCATE"  # 초기 생성: 덮어쓰기
    )
    table_ref = f'{project_id}.{table_id}'
    load_job = client.load_table_from_uri(gcs_path, table_ref, job_config=job_config)
    load_job.result()
    print(f"[DONE] {gcs_path} → {table_id} 업로드 완료")

upload_parquet_to_bigquery(parquet_output_path, table_id)

[DONE] gs://movie_info_and_reviews/daily_regions_boxoffice/merged_regions_boxoffice.parquet/*.parquet → movie_boxoffice.merged_daily_region_boxoffice 업로드 완료


# 박스오피스 장르 나눠서 저장

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, trim, col

spark = SparkSession.builder.appName("GenreSplitForTimeAnalysis").getOrCreate()

df = spark.read.parquet("gs://movie_info_and_reviews/daily_boxoffice/merged_boxoffice.parquet/*.parquet")

#genre 분리
df = df.withColumn("genre", explode(split(col("genreNm"), ",")))
df = df.withColumn("genre", trim(col("genre")))

genre_df = df.select("boxoffice_date", "movieNm", "genre", "salesAmt", "audiCnt", "openDt")

genre_df.write.mode("overwrite").parquet("gs://movie_info_and_reviews/daily_boxoffice/genre_analysis.parquet")

25/04/06 22:03:19 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/04/06 22:03:45 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [None]:
### 빅쿼리 업로드
from google.cloud import bigquery
from google.oauth2 import service_account
from dotenv import load_dotenv
import os

load_dotenv()

GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
GOOGLE_CONNECTOR_HADOOP = os.getenv("GOOGLE_CONNECTOR_HADOOP")

project_id = "<project_id>"     # gcp 프로젝트 id
bucket_name = "movie_info_and_reviews"
parquet_output_path = "gs://movie_info_and_reviews/daily_boxoffice/genre_analysis.parquet/*.parquet"
table_id = "movie_boxoffice.analysis_genre"  # ← 프로젝트.데이터셋.테이블

# BigQuery 업로드 함수
def upload_parquet_to_bigquery(gcs_path, table_id):
    credentials = service_account.Credentials.from_service_account_file(GOOGLE_APPLICATION_CREDENTIALS)
    client = bigquery.Client(credentials=credentials, project=project_id)

    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.PARQUET,
        write_disposition="WRITE_TRUNCATE"  # 초기 생성: 덮어쓰기
    )
    table_ref = f'{project_id}.{table_id}'
    load_job = client.load_table_from_uri(gcs_path, table_ref, job_config=job_config)
    load_job.result()
    print(f"[DONE] {gcs_path} → {table_id} 업로드 완료")

upload_parquet_to_bigquery(parquet_output_path, table_id)

[DONE] gs://movie_info_and_reviews/daily_boxoffice/genre_analysis.parquet/*.parquet → movie_boxoffice.analysis_genre 업로드 완료


25/04/07 11:05:46 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 3868537 ms exceeds timeout 120000 ms
25/04/07 11:05:46 WARN SparkContext: Killing executors is not supported by current scheduler.
25/04/07 11:05:54 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

# 리뷰 감정 분석

### sangrimlee/bert-base-multilingual-cased-nsmc 모델은 네이버 영화 리뷰 데이터셋(NSMC)을 활용하여 학습된 BERT 기반의 감정 분석 모델로, 한국어 텍스트의 긍정/부정을 분류한다. 

In [1]:
import pandas as pd
from transformers import pipeline
from tqdm import tqdm
import os
from dotenv import load_dotenv

load_dotenv()

GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")

df = pd.read_parquet("gs://movie_info_and_reviews/movie_reviews/cleaned_reviews.parquet")
df = df[["movieNm", "context", "platform", "date"]].dropna()
df["context"] = df["context"].astype(str)

# 감정 분석 모델 로딩 (로컬이라서 cpu 사용)
model_name = "sangrimlee/bert-base-multilingual-cased-nsmc"

sentiment_model = pipeline(
    "sentiment-analysis",
    model=model_name,
    framework="pt",     #pytorch
    device=1    #CPU 사용
)

# tqdm 적용
tqdm.pandas()

# 긴 문장 자르기
df["short_text"] = df["context"].str.slice(0, 200)

# 긍정, 부정 분류
df["raw_sentiment"] = df["short_text"].progress_apply(lambda x: sentiment_model(x)[0]["label"])

sentiment_map = {
    "LABEL_0": "부정",
    "LABEL_1": "긍정"
}
df["sentiment"] = df["raw_sentiment"].map(sentiment_map)

# 저장
df[["movieNm", "context", "sentiment", "platform", "date", "raw_sentiment"]].to_parquet(
    "gs://movie_info_and_reviews/movie_reviews/review_sentiment.parquet", 
    engine="pyarrow",
    storage_options={"token": GOOGLE_APPLICATION_CREDENTIALS},
    index=False
)


  from .autonotebook import tqdm as notebook_tqdm
Device set to use mps:1
100%|██████████| 473771/473771 [9:43:33<00:00, 13.53it/s]    


In [3]:
gcs_path = "gs://movie_info_and_reviews/movie_reviews/review_sentiment.parquet"

df = pd.read_parquet(gcs_path)

df = df.drop(columns=["sentiment"])
df = df.rename(columns={"raw_sentiment" : "sentiment"})

df.to_parquet(
    gcs_path,
    engine="pyarrow",
    storage_options={"token": GOOGLE_APPLICATION_CREDENTIALS},
    index=False
)

In [4]:
# 빅쿼리 업로드 
from google.cloud import bigquery
from google.cloud import bigquery
from google.oauth2 import service_account

# bigquery 설정
project_id = "global-cursor-454210-i1"     # gcp 프로젝트 id
dataset_id = "movie_reviews"  # 빅쿼리 데이터셋
key_path = GOOGLE_APPLICATION_CREDENTIALS       # 서비스 계정 키 경로

def upload_parquet_to_bigquery(gcs_path: str, table_id: str):
    '''
    GCS에 있는 Parquet 파일을 BigQuery로 업로드하는 함수.
    '''
    # 자격증명 및 클라이언트 초기화
    credentials = service_account.Credentials.from_service_account_file(key_path)
    client = bigquery.Client(credentials=credentials, project=project_id)

    
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.PARQUET,
        write_disposition="WRITE_TRUNCATE"  # 기존 테이블 덮어쓰기
    )
    # 테이블 id 전체 경로
    table_ref = f"{project_id}.{dataset_id}.{table_id}"
    
    load_job = client.load_table_from_uri(
        gcs_path, table_ref, job_config=job_config
    )
    
    load_job.result()
    print(f"[DONE] {gcs_path} -> {table_id} 업로드 완료")

upload_parquet_to_bigquery(
    gcs_path="gs://movie_info_and_reviews/movie_reviews/review_sentiment.parquet",
    table_id="review_sentiment"
)

[DONE] gs://movie_info_and_reviews/movie_reviews/review_sentiment.parquet -> review_sentiment 업로드 완료


## 긍정 / 부정별 wordcloud용 테이블 만들고 빅쿼리 업로드하기

In [5]:
import pandas as pd
import re
from konlpy.tag import Okt
from collections import Counter
from tqdm import tqdm

from google.cloud import storage

import os
from dotenv import load_dotenv

load_dotenv()

GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")

# tqdm 
tqdm.pandas()

# 형태소 분석
okt = Okt()

# 불용어
stopwords = ["영화", "정말", "너무", "그리고", "근데", "그냥", "봤어요", "있어요", "합니다", "한다", "때문"]

df = pd.read_parquet("gs://movie_info_and_reviews/movie_reviews/review_sentiment.parquet")

# 리뷰 context 전처리
df["clean_context"] = df["context"].fillna("").apply(lambda x: re.sub(r"[^ㄱ-ㅎ가-힣\s]", "", str(x)))

result_rows = []       # 결과 리스트

for sentiment in df["sentiment"].unique():
    print(f"[INFO] 처리중 : {sentiment}")
    
    sentiment_df = df[df["sentiment"] == sentiment]
    
    for idx, row in tqdm(sentiment_df.iterrows(), total=len(sentiment_df)):
        movie = row["movieNm"]
        platform = row["platform"]
        date = row["date"]
        text = row["clean_context"]
        
        # 형태소 분석, 품사 필터
        words = [
            word for word, tag in okt.pos(text)
            if tag in ["Noun", "Adjective", "Verb"] and word not in stopwords and len(word) > 1
        ]
        
        freq = Counter(words)
        
        for word, count in freq.items():
            result_rows.append({
                "movieNm": movie,
                "platform": platform,
                "date": date,
                "sentiment": sentiment,
                "word": word,
                "count": count
            })
    

wc_df = pd.DataFrame(result_rows)

wc_df.to_parquet(
    "gs://movie_info_and_reviews/movie_reviews/review_sentiment_wordcloud.parquet", 
    engine="pyarrow",
    storage_options={"token": GOOGLE_APPLICATION_CREDENTIALS},
    index=False
)

[INFO] 처리중 : positive


100%|██████████| 370807/370807 [43:26<00:00, 142.25it/s] 


[INFO] 처리중 : negative


100%|██████████| 102964/102964 [15:45<00:00, 108.85it/s]


In [6]:
# 빅쿼리 업로드 
from google.cloud import bigquery
from google.cloud import bigquery
from google.oauth2 import service_account

# bigquery 설정
project_id = "global-cursor-454210-i1"     # gcp 프로젝트 id
dataset_id = "movie_reviews"  # 빅쿼리 데이터셋
key_path = GOOGLE_APPLICATION_CREDENTIALS       # 서비스 계정 키 경로

def upload_parquet_to_bigquery(gcs_path: str, table_id: str):
    '''
    GCS에 있는 Parquet 파일을 BigQuery로 업로드하는 함수.
    '''
    # 자격증명 및 클라이언트 초기화
    credentials = service_account.Credentials.from_service_account_file(key_path)
    client = bigquery.Client(credentials=credentials, project=project_id)

    
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.PARQUET,
        write_disposition="WRITE_TRUNCATE"  # 기존 테이블 덮어쓰기
    )
    # 테이블 id 전체 경로
    table_ref = f"{project_id}.{dataset_id}.{table_id}"
    
    load_job = client.load_table_from_uri(
        gcs_path, table_ref, job_config=job_config
    )
    
    load_job.result()
    print(f"[DONE] {gcs_path} -> {table_id} 업로드 완료")

upload_parquet_to_bigquery(
    gcs_path="gs://movie_info_and_reviews/movie_reviews/review_sentiment_wordcloud.parquet",
    table_id="review_sentiment_wordcloud"
)

[DONE] gs://movie_info_and_reviews/movie_reviews/review_sentiment_wordcloud.parquet -> review_sentiment_wordcloud 업로드 완료
