In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col

spark = SparkSession.builder \
    .appName("StackOverflowSalaryMerge") \
    .getOrCreate()

# 1. Các cột lõi sẽ dùng cho dự án
base_cols = [
    "ResponseId",
    "Country",
    "Age",
    "Employment",
    "EdLevel",
    "YearsCode",
    "YearsCodePro",
    "DevType",
    "OrgSize",
    "Currency",
    "LanguageHaveWorkedWith",
    "DatabaseHaveWorkedWith",
    "PlatformHaveWorkedWith",
    "WebframeHaveWorkedWith",
    "MiscTechHaveWorkedWith",
    "ToolsTechHaveWorkedWith",
    "NEWCollabToolsHaveWorkedWith",
    "SurveyLength",
    "SurveyEase",
    "ConvertedCompYearly"
]

year_files = {
    2021: "data/survey_results_public_2021.csv",
    2022: "data/survey_results_public_2022.csv",
    2023: "data/survey_results_public_2023.csv",
    2024: "data/survey_results_public_2024.csv"
}

dfs = []

for year, path in year_files.items():
    df = (
        spark.read
             .option("header", "true")
             .option("multiLine", "true")   # phòng các cột dạng text dài
             .option("escape", "\"")
             .csv(path)
             .select(*base_cols)
             .withColumn("SurveyYear", lit(year))
    )
    dfs.append(df)

# 2. Gộp 4 DataFrame theo tên cột
merged_df = dfs[0]
for df in dfs[1:]:
    merged_df = merged_df.unionByName(df)

# 3. Ép kiểu cho các trường số quan trọng
merged_df = (
    merged_df
      .withColumn("ConvertedCompYearly", col("ConvertedCompYearly").cast("double"))
      .withColumn("YearsCode", col("YearsCode").cast("double"))
      .withColumn("YearsCodePro", col("YearsCodePro").cast("double"))
      .withColumn("Age", col("Age").cast("double"))
)

# 4. Lưu dưới dạng Parquet (chuẩn cho Spark, nhanh + tiết kiệm)
merged_df.write.mode("overwrite").parquet("data/merged_survey_2021_2024.parquet")


ModuleNotFoundError: No module named 'pyspark'