In [0]:
sc.version

In [1]:
%spark.pyspark
# parquet 파일 읽기
df = spark.read.parquet('<your installation folder>/data/parquet/p.parquet')
z.show(df)

In [2]:
%spark.pyspark
df.createOrReplaceTempView("chat")


In [3]:
%sql
-- 닉네임 별 채팅 빈도
SELECT nickname, COUNT(*) AS count
FROM chat
GROUP BY nickname;

In [4]:
%spark.pyspark
# 날짜 형식 통일
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import TimestampType
import datetime
import re

# Spark 세션 생성
spark = SparkSession.builder.appName("DateConversion").getOrCreate()

# Unix 타임스탬프를 변환하는 함수
def unix_to_datetime(timestamp):
    try:
        return datetime.datetime.fromtimestamp(float(timestamp))
    except ValueError:
        return None

# 날짜 형식을 인식하고 변환하는 함수
def parse_date(date_str):
    formats = [
        "%Y-%m-%d %H:%M:%S",  # 형식: 2024-08-27 10:45:24
        "%a %b %d %H:%M:%S %Y",  # 형식: Tue Aug 27 11:04:45 2024
    ]
    
    # Unix 타임스탬프인지 확인
    if re.match(r'^\d+(\.\d+)?$', date_str):
        return unix_to_datetime(date_str)
    
    # 날짜 형식 시도
    for fmt in formats:
        try:
            return datetime.datetime.strptime(date_str, fmt)
        except ValueError:
            continue
    return None

# UDF 등록
parse_date_udf = udf(parse_date, TimestampType())

# 데이터프레임에 UDF 적용
df = df.withColumn("parsed_time", parse_date_udf(col("time")))
z.show(df)


In [5]:
%spark.pyspark
df.createOrReplaceTempView("chat_time")


In [6]:
%sql
-- 날짜 별 채팅 빈도
SELECT 
    to_date(parsed_time) AS date,
    COUNT(*) AS count
FROM 
    chat_time
GROUP BY 
    to_date(parsed_time)
ORDER BY 
    date

In [7]:
%sql
-- 날짜 + 시간 별 채팅 빈도
SELECT 
    date_format(parsed_time, 'yyyy-MM-dd HH:00:00') AS date_hour,
    COUNT(*) AS count
FROM 
    chat_time
GROUP BY 
    date_format(parsed_time, 'yyyy-MM-dd HH:00:00')
ORDER BY 
    date_hour

In [8]:
%spark.pyspark
# 단어 빈도 계산(상위 19개)

from pyspark.sql.functions import col, explode, split, lower, regexp_replace, trim

# 메시지가 빈 경우 필터링
df_filtered = df.filter(col('message').isNotNull() & (trim(col('message')) != ''))

# 메시지에서 특수문자 및 구두점 제거
df_cleaned = df_filtered.withColumn(
    'clean_message',
    regexp_replace(col('message'), '[^\\w\\s가-힣]', '')  # 영문자, 숫자, 공백, 한글만 허용
)

# 메시지에서 단어 분리 및 소문자 변환
words_df = df_cleaned.withColumn('word', explode(split(lower(col('clean_message')), '\\s+')))

# 빈 단어 제거 (단어가 공백이거나 NULL인 경우)
words_df_filtered = words_df.filter(col('word') != '')

# 단어 빈도 계산
word_count_df = words_df_filtered.groupBy('word').count().orderBy(col('count').desc())

# 결과 출력

top_10_words_df = word_count_df.limit(10)
top_10_words_df.show(truncate=False)

top_10_words_df.createOrReplaceTempView("frequent")


In [9]:
%sql
-- 단어 빈도 시각화
select * from frequent
