In [1]:
import findspark
findspark.find()

'/home/hadoop/spark-3.1.2'

In [2]:
# 관련 모듈 import
import pyspark
from pyspark.sql.session import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.functions import translate
import time

In [3]:
# SparkSession 인스턴스 생성
spark = SparkSession.builder.appName('hyunmin').config("spark.jars", "~/mysql-connector-java-8.0.27.jar").getOrCreate()#.set("spark.driver.extraClassPath", "/home/hadoop/mysql-connector-java-8.0.27/mysql-connector-java-8.0.27.jar")


In [21]:
# hdfs 파일 불러오기
rdd = sc.textFile("hdfs://hadoop01:9000/nouns_hdfs/000000_0").map(lambda x: x.split("|"))
# header 지정
headers = rdd.first()
rdd2 = rdd.filter(lambda x: x != headers)
df = rdd2.toDF(headers)
# RDD show
df.show(n=1000)
# RDD 스키마 구조
df.printSchema()

+----------+-----+-------------------------------+
|      date|event|                          nouns|
+----------+-----+-------------------------------+
|2022-01-01|  CUR|  ['신년올림픽기획', '평창',...|
|2022-01-01|  SKN|    ['스켈레톤', '정승기', '...|
|2022-01-01|  SKN|     ['첫', '월드컵', '메달'...|
|2022-01-01|  SKN|    ['스켈레톤', '정승기', '...|
|2022-01-01|  SKN|    ['스켈레톤', '정승기', '...|
|2022-01-01|  SKN|    ['스켈레톤', '기대주', '...|
|2022-01-01|  SKN|    ['스켈레톤', '기대주', '...|
|2022-01-01|  SKN|    ['정승기', '윤성빈', '스...|
|2022-01-01|  SKN|    ['스켈레톤', '정승기', '...|
|2022-01-01|  SKN|    ['스켈레톤', '정승기', '...|
|2022-01-02|  BOB|   ['봅슬레이', '원윤종팀', ...|
|2022-01-02|  BOB|    ['원윤종', '김진수', '시...|
|2022-01-02|  BOB|    ['원윤종', '김진수', '봅...|
|2022-01-02|  BOB|     ['봅슬레이', '2인승', '...|
|2022-01-02|  SKN|     ['생애', '첫', '월드컵'...|
|2022-01-03|  BOB|   ['봅슬레이', '원윤종팀', ...|
|2022-01-03|  BOB|    ['원윤종팀', '올림픽', '...|
|2022-01-03|  BOB|     ['희망', '불씨', '원윤종']|
|2022-01-03|  BOB|   ['봅슬레이', '원윤종팀', ...|
|2022-01-03| 

In [5]:
# 데이터 전처리
df_reg=df.withColumn('noun', 
               f.regexp_replace('nouns','[\\[\\[\\]]',''))

In [6]:
a=df_reg.withColumn('noun', f.regexp_replace(f.col('noun'), "\\s", "")) # 공백 제거
df_reg=a.withColumn('noun', f.regexp_replace(f.col('noun'), "'루'",'루지')) # 특정 단어 변경

In [7]:
# 명사 단어 배열 만들기
token_array = df_reg.select("date",'event', f.split('noun',',').alias("noun"))

In [8]:
token_array.printSchema()
token_array.show()

root
 |-- date: string (nullable = true)
 |-- event: string (nullable = true)
 |-- noun: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------+-----+-----------------------------+
|      date|event|                         noun|
+----------+-----+-----------------------------+
|2022-01-01|  CUR|['신년올림픽기획', '평창',...|
|2022-01-01|  SKN|  ['스켈레톤', '정승기', '...|
|2022-01-01|  SKN|   ['첫', '월드컵', '메달'...|
|2022-01-01|  SKN|  ['스켈레톤', '정승기', '...|
|2022-01-01|  SKN|  ['스켈레톤', '정승기', '...|
|2022-01-01|  SKN|  ['스켈레톤', '기대주', '...|
|2022-01-01|  SKN|  ['스켈레톤', '기대주', '...|
|2022-01-01|  SKN|  ['정승기', '윤성빈', '스...|
|2022-01-01|  SKN|  ['스켈레톤', '정승기', '...|
|2022-01-01|  SKN|  ['스켈레톤', '정승기', '...|
|2022-01-02|  BOB| ['봅슬레이', '원윤종팀', ...|
|2022-01-02|  BOB|  ['원윤종', '김진수', '시...|
|2022-01-02|  BOB|  ['원윤종', '김진수', '봅...|
|2022-01-02|  BOB|   ['봅슬레이', '2인승', '...|
|2022-01-02|  SKN|   ['생애', '첫', '월드컵'...|
|2022-01-03|  BOB| ['봅슬레이', '원윤종팀', ...|
|2022-01-03|  BOB|  

In [36]:
# date,event별 명사 WordCount
token_count = token_array.select("date","event", f.explode("noun").alias("token"))\
    .groupBy("date","event", "token").count()\
    .orderBy("date","event", "token")
# 따옴표 제거
token_count=token_count.withColumn('token', translate('token', "'", ''))
token_count=token_count.filter("token != ''")
token_count.show(truncate=False, n=10000)

+----------+-----+------------------+-----+
|date      |event|token             |count|
+----------+-----+------------------+-----+
|2022-01-01|CUR  |4년               |1    |
|2022-01-01|CUR  |끝                |1    |
|2022-01-01|CUR  |돌풍              |1    |
|2022-01-01|CUR  |베이징            |1    |
|2022-01-01|CUR  |신년올림픽기획    |1    |
|2022-01-01|CUR  |우여곡절          |1    |
|2022-01-01|CUR  |팀킴              |1    |
|2022-01-01|CUR  |평창              |1    |
|2022-01-01|CUR  |후                |1    |
|2022-01-01|SKN  |17위              |1    |
|2022-01-01|SKN  |3위               |1    |
|2022-01-01|SKN  |6차               |2    |
|2022-01-01|SKN  |감격              |1    |
|2022-01-01|SKN  |금메달            |1    |
|2022-01-01|SKN  |기대주            |2    |
|2022-01-01|SKN  |대회              |1    |
|2022-01-01|SKN  |동메달            |1    |
|2022-01-01|SKN  |메달              |8    |
|2022-01-01|SKN  |생애              |4    |
|2022-01-01|SKN  |스켈레톤          |8    |
|2022-01-01|SKN  |아이언맨          |1 

In [37]:
# date별 명사 WordCount
date_count = token_array.select("date", f.explode("noun").alias("token"))\
    .groupBy("date", "token").count()\
    .orderBy("date", "token")
# 따옴표 제거
date_count=date_count.withColumn('token', translate('token', "'", ''))
date_count=date_count.filter("token != ''")
date_count.show(truncate=False, n=100)

+----------+--------------+-----+
|date      |token         |count|
+----------+--------------+-----+
|2022-01-01|17위          |1    |
|2022-01-01|3위           |1    |
|2022-01-01|4년           |1    |
|2022-01-01|6차           |2    |
|2022-01-01|감격          |1    |
|2022-01-01|금메달        |1    |
|2022-01-01|기대주        |2    |
|2022-01-01|끝            |1    |
|2022-01-01|대회          |1    |
|2022-01-01|돌풍          |1    |
|2022-01-01|동메달        |1    |
|2022-01-01|메달          |8    |
|2022-01-01|베이징        |1    |
|2022-01-01|생애          |4    |
|2022-01-01|스켈레톤      |8    |
|2022-01-01|신년올림픽기획|1    |
|2022-01-01|아이언맨      |1    |
|2022-01-01|우여곡절      |1    |
|2022-01-01|월드컵        |7    |
|2022-01-01|윤성빈        |5    |
|2022-01-01|정승기        |9    |
|2022-01-01|첫            |9    |
|2022-01-01|추월          |1    |
|2022-01-01|팀킴          |1    |
|2022-01-01|평창          |2    |
|2022-01-01|한국          |1    |
|2022-01-01|획득          |4    |
|2022-01-01|후            |1    |
|2022-01-02|2

In [38]:
# event별 명사 WordCount
event_count = token_array.select("event", f.explode("noun").alias("token"))\
    .groupBy("event", "token").count()\
    .orderBy("event", "token")
# 따옴표 제거
event_count=event_count.withColumn('token', translate('token', "'", ''))
event_count=event_count.filter("token != ''")
event_count.show(truncate=False, n=100)

+-----+------------+-----+
|event|token       |count|
+-----+------------+-----+
|ALP  |100         |1    |
|ALP  |10명        |1    |
|ALP  |10위        |1    |
|ALP  |11일        |1    |
|ALP  |18위        |5    |
|ALP  |19일        |1    |
|ALP  |1위         |4    |
|ALP  |2           |1    |
|ALP  |2022        |5    |
|ALP  |2022베이징  |2    |
|ALP  |21위        |5    |
|ALP  |28년        |2    |
|ALP  |29위        |1    |
|ALP  |2관왕       |6    |
|ALP  |2연속       |2    |
|ALP  |2연패       |4    |
|ALP  |2종목       |1    |
|ALP  |2회         |1    |
|ALP  |3           |2    |
|ALP  |30분        |1    |
|ALP  |33명        |1    |
|ALP  |39th        |1    |
|ALP  |39위        |4    |
|ALP  |3관왕       |1    |
|ALP  |3번         |1    |
|ALP  |3연속       |2    |
|ALP  |3연패       |1    |
|ALP  |3주         |1    |
|ALP  |3회         |3    |
|ALP  |40㎞        |1    |
|ALP  |4명         |1    |
|ALP  |4번         |3    |
|ALP  |4살         |1    |
|ALP  |4연속       |1    |
|ALP  |4위         |4    |
|ALP  |4종목    

In [39]:
# mysql로 데이터 저장
token_count.write.format('jdbc').option("url", "jdbc:mysql://59.18.172.39:3306/olympic") \
    .option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "token") \
    .option("user", "root").option("password", "root").mode('append').save()
time.sleep(2)

date_count.write.format('jdbc').option("url", "jdbc:mysql://59.18.172.39:3306/olympic") \
    .option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "per_date") \
    .option("user", "root").option("password", "root").mode('append').save()
time.sleep(2)

event_count.write.format('jdbc').option("url", "jdbc:mysql://59.18.172.39:3306/olympic") \
    .option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "event") \
    .option("user", "root").option("password", "root").mode('append').save()


In [54]:
# hdfs 형식으로 저장
# token_count.write.option("header",False) \
#         .mode("overwrite") \
#         .csv("hdfs://hadoop01:9000/token/")
# event_count.write.option("header",False) \
#         .mode("overwrite") \
#         .csv("hdfs://hadoop01:9000/event/")
# date_count.write.option("header",False) \
#         .mode("overwrite") \
#         .csv("hdfs://hadoop01:9000/date/")

KeyboardInterrupt: 