In [1]:
# Import Libraries
import pyspark
from pyspark import SQLContext
from pyspark.sql.types import IntegerType, StructType, StructField, StringType
from pyspark.sql.functions import pandas_udf, monotonically_increasing_id , from_json,col, avg
import pandas as pd
import pyspark.sql.functions as F
from tqdm import trange, notebook
import time
import datetime
import re

In [2]:
spark = SparkSession.builder\
        .master("local[3]")\
        .appName("WordCount")\
        .config("spark.driver.memory", "8g")\
        .config('spark.driver.cores', '2')\
        .config("spark.sql.legacy.json.allowEmptyString.enabled", True)\
        .getOrCreate()
sc=spark.sparkContext

In [3]:
schema = StructType([
    StructField("idol", StringType(), True),
    StructField("tweet", StringType(), True),
    StructField("retweets_count", IntegerType(), True),
    StructField("likes_count", IntegerType(), True),
])
tdf = spark.read.schema(schema).option("encoding", "UTF-8").option("allowNonUTF8", "true").option("multiLine","true").json(f"file:///home/j8a507/cluster/twitter/tweet_data.json")



In [4]:
# 긍부정 분석 모델
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline

tokenizer = AutoTokenizer.from_pretrained("Woonn/bert-base-finetuned-emotion")

model = AutoModelForSequenceClassification.from_pretrained("Woonn/bert-base-finetuned-emotion")

  from .autonotebook import tqdm as notebook_tqdm


In [5]:
# build pipeline
classifier = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer)

In [6]:
%%time

@pandas_udf('float')
def sentiment_batch_udf(row: pd.Series) -> pd.Series:
  pipe = classifier(row.to_list(), truncation=True, batch_size=2)
  result = [round(sentiment['score'], 2) for sentiment in pipe]
  return pd.Series(result)

result = tdf.select(tdf.idol, tdf.likes_count, tdf.retweets_count, sentiment_batch_udf(tdf.tweet).alias("score"))

del tokenizer
del model
del classifier


CPU times: user 323 ms, sys: 879 ms, total: 1.2 s
Wall time: 3.52 s


In [21]:
adf = tdf.select(tdf.idol, tdf.likes_count, tdf.retweets_count)
adf = adf.groupBy("idol").agg((F.sum("likes_count") + F.sum("retweets_count")).alias("action_count"))

pdf = result.groupBy("idol").agg((avg("score")*100).cast("int").alias("pos_neg"))
                              
final_df = pdf.join(adf, adf.idol == pdf.idol, "inner")

# Replace "/path/to/partitioned/data" with the path to your partitioned data in HDFS
hdfs_path = "output/"

# # Replace "/path/to/local/directory" with the path to your local directory
# local_path = "file:///home/j8a507/cluster/news/idol_news_count"

# # Download the partitioned directory from HDFS to your local machine
# subprocess.check_call(["hdfs", "dfs", "-getmerge", hdfs_path, local_path])

[Stage 18:>                                                         (0 + 1) / 1]

+--------+-------+--------+------------+
|    idol|pos_neg|    idol|action_count|
+--------+-------+--------+------------+
|  세븐틴|     79|  세븐틴|         246|
|블랙핑크|     90|블랙핑크|        1333|
+--------+-------+--------+------------+



                                                                                