In [1]:
import os
from pyspark.sql import SparkSession

def merge_csv_files_with_spark(input_directory, output_file='merged_output.csv'):
    # Khởi tạo SparkSession
    spark = SparkSession.builder \
        .appName("CSVFilesMerger") \
        .getOrCreate()

    # Kiểm tra xem thư mục đầu vào có tồn tại không
    if not os.path.exists(input_directory):
        print(f"Error: Directory {input_directory} does not exist.")
        return 0

    # Danh sách các tệp CSV
    csv_files = [os.path.join(input_directory, f) for f in os.listdir(input_directory) if f.endswith('.csv')]

    # Kiểm tra nếu không có tệp nào
    if not csv_files:
        print("No CSV files found in the directory.")
        return 0

    print(f"Found {len(csv_files)} CSV files. Merging...")

    # Đọc và hợp nhất tất cả các tệp CSV
    merged_df = spark.read.option("header", "true").csv(csv_files)

    # Tạo thư mục đầu ra nếu chưa tồn tại
    output_directory = os.path.dirname(output_file) or '.'
    os.makedirs(output_directory, exist_ok=True)

    # Ghi tệp hợp nhất ra CSV
    merged_df.coalesce(1).write.option("header", "true").csv(output_file, mode="overwrite")

    print(f"\nMerging complete!")
    print(f"Total files merged: {len(csv_files)}")
    print(f"Output directory: {output_file}")
    print(f"Total rows in merged file: {merged_df.count()}")

    # Dừng SparkSession
    spark.stop()

    return len(csv_files)

def main():
    # Example usage
    input_directory = 'D:/app/Bigdata-IS405.P11/Crawl/Airflow/airflow/data/amazon_scraper_output'  # Directory containing CSV files
    output_file = 'amazon_products1'  # Output directory for merged file (Spark outputs as a directory)

    merge_csv_files_with_spark(input_directory, output_file)

if __name__ == "__main__":
    main()


Found 6 CSV files. Merging...

Merging complete!
Total files merged: 6
Output directory: amazon_products1
Total rows in merged file: 8974
