In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws

# Step 1: Initialize Spark Session with Hadoop-AWS connector
print("🚀 Initializing Spark session...")
spark = SparkSession.builder \
    .appName("ConnectToS3") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1") \
    .getOrCreate()

# Step 2: Configure Spark to use s3a and AWS credentials
print("🔐 Configuring Spark to use AWS credentials...")
hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider")
hadoop_conf.set("fs.s3a.endpoint", "s3.us-west-2.amazonaws.com") 

# Step 3: Get list of years from user
years_input = input("📅 Please enter years to process (comma-separated, e.g. 2021,2022,2023): ")
years = [year.strip() for year in years_input.split(",")]
channel_name = "esl_dota2"

# Step 4: Loop through each year
for chat_year in years:
    print(f"\n📦 Processing year {chat_year} for channel '{channel_name}'...")

    # Read from S3 bucket
    parquet_files_path = f"s3a://twitch-emotes-analytics-project/data/processed_silver/{channel_name}/{chat_year}/*.parquet"
    try:
        df = spark.read.parquet(parquet_files_path)
        record_count = df.count()
        print(f"✅ Loaded {record_count:,} records from {parquet_files_path}")
    except Exception as e:
        print(f"❌ Failed to read data for {chat_year}: {e}")
        continue

    # Flatten array columns for CSV
    print("🛠 Flattening array columns for CSV compatibility...")
    df_csv = df.withColumn("i_badge_names", concat_ws(",", "i_badge_names")) \
               .withColumn("i_badge_titles", concat_ws(",", "i_badge_titles")) \
               .withColumn("i_badge_versions", concat_ws(",", "i_badge_versions"))

    # Define output paths
    output_s3_path_parquet = f"s3a://twitch-emotes-analytics-project/data/gold/{channel_name}/{chat_year}/all_data_parquet"
    output_s3_path_csv = f"s3a://twitch-emotes-analytics-project/data/gold/{channel_name}/{chat_year}/all_data_csv"

    # Write Parquet output
    print(f"📤 Writing full Parquet dataset to: {output_s3_path_parquet}")
    df.coalesce(1).write.mode("overwrite").parquet(output_s3_path_parquet)

    # Write CSV output
    print(f"📤 Writing CSV-friendly dataset to: {output_s3_path_csv}")
    df_csv.coalesce(1).write.mode("overwrite").option("header", True).csv(output_s3_path_csv)

    print(f"✅ Finished writing all data for {chat_year} 🎉")

print("\n🏁 All requested years processed. Data exported to S3 Gold layer.")


🚀 Initializing Spark session...
🔐 Configuring Spark to use AWS credentials...


📅 Please enter years to process (comma-separated, e.g. 2021,2022,2023):  2021,2022,2023,2024



📦 Processing year 2021 for channel 'esl_dota2'...


                                                                                

✅ Loaded 295,479 records from s3a://twitch-emotes-analytics-project/data/processed/esl_dota2/2021/*.parquet
🛠 Flattening array columns for CSV compatibility...
📤 Writing full Parquet dataset to: s3a://twitch-emotes-analytics-project/data/gold/esl_dota2/2021/all_data_parquet


                                                                                

📤 Writing CSV-friendly dataset to: s3a://twitch-emotes-analytics-project/data/gold/esl_dota2/2021/all_data_csv


25/06/23 18:56:45 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
25/06/23 18:56:48 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

✅ Finished writing all data for 2021 🎉

📦 Processing year 2022 for channel 'esl_dota2'...


                                                                                

✅ Loaded 566,898 records from s3a://twitch-emotes-analytics-project/data/processed/esl_dota2/2022/*.parquet
🛠 Flattening array columns for CSV compatibility...
📤 Writing full Parquet dataset to: s3a://twitch-emotes-analytics-project/data/gold/esl_dota2/2022/all_data_parquet


                                                                                

📤 Writing CSV-friendly dataset to: s3a://twitch-emotes-analytics-project/data/gold/esl_dota2/2022/all_data_csv


25/06/23 19:11:46 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
25/06/23 19:11:49 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

✅ Finished writing all data for 2022 🎉

📦 Processing year 2023 for channel 'esl_dota2'...


                                                                                

✅ Loaded 617,113 records from s3a://twitch-emotes-analytics-project/data/processed/esl_dota2/2023/*.parquet
🛠 Flattening array columns for CSV compatibility...
📤 Writing full Parquet dataset to: s3a://twitch-emotes-analytics-project/data/gold/esl_dota2/2023/all_data_parquet


                                                                                

📤 Writing CSV-friendly dataset to: s3a://twitch-emotes-analytics-project/data/gold/esl_dota2/2023/all_data_csv


25/06/23 19:21:52 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
25/06/23 19:21:55 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

✅ Finished writing all data for 2023 🎉

📦 Processing year 2024 for channel 'esl_dota2'...


                                                                                

✅ Loaded 359,847 records from s3a://twitch-emotes-analytics-project/data/processed/esl_dota2/2024/*.parquet
🛠 Flattening array columns for CSV compatibility...
📤 Writing full Parquet dataset to: s3a://twitch-emotes-analytics-project/data/gold/esl_dota2/2024/all_data_parquet


                                                                                

📤 Writing CSV-friendly dataset to: s3a://twitch-emotes-analytics-project/data/gold/esl_dota2/2024/all_data_csv


25/06/23 19:28:17 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
25/06/23 19:28:20 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

✅ Finished writing all data for 2024 🎉

🏁 All requested years processed. Data exported to S3 Gold layer.
