In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DataFrame").getOrCreate()

sc = spark.sparkContext

#### text 형태의 데이터를 전처리해서 csv 형태로 저장해보자!
(부제: 엔지니어링 A조 MBTI 살펴보기)

- raw_data 생성

In [None]:
raw_data = ["윤건우, ISTP, 24; \
            이가람, ESTJ, 23; \
            이선의, null, null; \
            이수민, ESTP, null; \
            채영대, null, null; \
            이상암, ENTJ, null; \
            유하준, ESTJ, secret; \
            정태형, ESFJ, null; \
            최윤서, INTJ, 24"]

- RDD로 변환

In [None]:
RDD_data = sc.parallelize(raw_data)
RDD_data.collect()

- flatMap() 함수를 이용하여 데이터 전처리

In [None]:
RDD_split_data = RDD_data.flatMap(lambda x : x.split(';'))

In [None]:
RDD_split_data.collect()

- 등등의 로직을 이용하여 남은 전처리

In [None]:
# Split data and create Row objects
def parse_data(row):
    parts = row.strip().split(", ")
    name = parts[0]
    _type = parts[1] if parts[1] != "null" else None
    age = parts[2] if parts[2] != "null" else None
    return [name, _type, age]

In [None]:
RDD_clear_data = RDD_split_data.map(parse_data)
RDD_clear_data.collect()

- Dataframe 형태로 변환

In [None]:
# RDD.toDF()를 이용하여 RDD를 Dataframe 형태로 변환할 수 있다.
DF_data = RDD_clear_data.toDF(["이름","MBTI","나이"])
DF_data

In [None]:
DF_data.collect()

In [None]:
DF_data.show() # show()를 이용해서도 데이터를 볼 수 있다. (action)

In [None]:
type(DF_data)

- MBTI가 null이 아닌 데이터만 추출

In [None]:
DF_not_null = DF_data.filter(DF_data["MBTI"].isNotNull())
DF_not_null.show()

- MBTI 열을 이용하여 새로운 열 생성

In [None]:
from pyspark.sql.functions import col

new_DF = DF_not_null\
            .withColumn("EorI", col("MBTI").substr(1, 1)) \
            .withColumn("NorS", col("MBTI").substr(2, 1)) \
            .withColumn("ForT", col("MBTI").substr(3, 1)) \
            .withColumn("JorP", col("MBTI").substr(4, 1))

In [None]:
new_DF.show()

- 그래프 그려보기

In [None]:
import matplotlib.pyplot as plt

In [None]:
# 각 요소별 분포 데이터 추출
mbti1 = new_DF.groupBy("EorI").count().collect()
mbti2 = new_DF.groupBy("NorS").count().collect()
mbti3 = new_DF.groupBy("ForT").count().collect()
mbti4 = new_DF.groupBy("JorP").count().collect()

In [None]:
plt.figure(figsize=(12, 8))

plt.subplot(2, 2, 1)
plt.bar([row["EorI"] for row in mbti1], [row["count"] for row in mbti1])
plt.xlabel("E/I")
plt.ylabel("Count")
plt.title("E/I dist")

plt.subplot(2, 2, 2)
plt.bar([row["NorS"] for row in mbti2], [row["count"] for row in mbti2])
plt.xlabel("N/S")
plt.ylabel("Count")
plt.title("N/S dist")

plt.subplot(2, 2, 3)
plt.bar([row["ForT"] for row in mbti3], [row["count"] for row in mbti3])
plt.xlabel("F/T")
plt.ylabel("Count")
plt.title("F/T dist")

plt.subplot(2, 2, 4)
plt.bar([row["JorP"] for row in mbti4], [row["count"] for row in mbti4])
plt.xlabel("J/P")
plt.ylabel("Count")
plt.title("J/P dist")

plt.tight_layout()
plt.show()

In [None]:
# spark 세션 종료
spark.stop()