# PySpark 환경 설정

In [None]:
from pyspark.sql import SparkSession

# SparkSession 생성 (HDFS 연결 포함)
spark = SparkSession.builder \
    .appName("PySpark Example") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://hdfs-cluster:8020") \
    .getOrCreate()

print("SparkSession created successfully")

# 데이터프레임 생성

In [None]:
# 샘플 데이터 생성 (또는 HDFS에서 읽기)
data = [
    ("Alice", 34, "Engineer"),
    ("Bob", 45, "Manager"),
    ("Charlie", 29, "Analyst"),
    ("David", 38, "Engineer")
]

columns = ["name", "age", "job"]

df = spark.createDataFrame(data, columns)
df.show()

# HDFS에서 CSV 읽기 예시 (파일이 존재할 경우)
# df_hdfs = spark.read.csv("hdfs://hdfs-cluster/user/data/sample.csv", header=True)
# df_hdfs.show()

# 데이터 변환 및 필터링

In [None]:
# 나이가 35세 이상인 사람 필터링
df_filtered = df.filter(df.age > 35)
df_filtered.show()

# 새로운 열 추가 (나이 + 10)
from pyspark.sql.functions import col
df_transformed = df.withColumn("age_plus_10", col("age") + 10)
df_transformed.show()

# 집계 및 그룹화

In [None]:
# 직업별 평균 나이 계산
from pyspark.sql.functions import avg
df_grouped = df.groupBy("job").agg(avg("age").alias("avg_age"))
df_grouped.show()

# 데이터 저장 및 출력

In [None]:
# HDFS에 CSV로 저장
df_grouped.write.csv("hdfs://hdfs-cluster/user/data/output.csv", header=True, mode="overwrite")

# PostgreSQL에 저장 (JDBC 사용)
df_grouped.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres-db-rw:5432/postgres") \
    .option("dbtable", "example_table") \
    .option("user", "postgres") \
    .option("password", "your_password") \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

print("Data saved to HDFS and PostgreSQL")

# SparkSession 종료
spark.stop()