In [None]:
from pyspark.sql import SparkSession
from dotenv import load_dotenv
from pyspark.sql.types import LongType
from pyspark.sql.functions import col, year, month
import os

In [None]:
!docker inspect minioserver | grep IPAddress

In [None]:
!docker inspect spark-master | grep IPAddress

In [None]:
load_dotenv()

# Get credentials from .env
MINIO_USER = os.getenv("MINIO_ROOT_USER")
MINIO_PASSWORD = os.getenv("MINIO_ROOT_PASSWORD")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY")

In [None]:
# Create Spark session connecting to your Docker cluster
spark = (SparkSession.builder
    .appName("MinIOUpload")
    .master("spark://172.18.0.2:7077")  # Your Spark master in Docker
    .config("spark.jars", 
            "./shared-data/hadoop-aws-3.3.4.jar,"
            "./shared-data/aws-java-sdk-bundle-1.12.792.jar")
    .config("spark.driver.extraClassPath",
            "./shared-data/hadoop-aws-3.3.4.jar:"
            "./shared-data/aws-java-sdk-bundle-1.12.792.jar")
    .config("spark.executor.extraClassPath",  # Add this for executors
            "./shared-data/hadoop-aws-3.3.4.jar:"
            "./shared-data/aws-java-sdk-bundle-1.12.792.jar")
    .config("spark.hadoop.fs.s3a.endpoint", "http://172.18.0.3:9000")
    .config("spark.hadoop.fs.s3a.access.key", MINIO_ACCESS_KEY)
    .config("spark.hadoop.fs.s3a.secret.key", MINIO_SECRET_KEY)
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .config("spark.hadoop.fs.s3a.aws.credentials.provider",
            "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
    .getOrCreate())

print("âœ“ Spark session created!")
print(f"Spark version: {spark.version}")

In [None]:
MINIO_SOURCE_PATH = "s3a://datalake/staging/nyc yellow taxi"
MINIO_UNPARTITIONED_TARGET_PATH = "s3a://datalake/raw-data/unpartitioned/nyc yellow taxi"
MINIO_PARTITIONED_TARGET_PATH = "s3a://datalake/raw-data/partitioned/nyc yellow taxi"

In [None]:
file_status = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jvm.org.apache.hadoop.fs.Path(MINIO_SOURCE_PATH).toUri(), spark._jsc.hadoopConfiguration()).listStatus(spark._jvm.org.apache.hadoop.fs.Path(MINIO_SOURCE_PATH))
parquet_files = [str(f.getPath()) for f in file_status if str(f.getPath()).endswith('.parquet')]
dfs = []
for file_path in parquet_files:
    print(f"Reading: {file_path}")
    df = spark.read.parquet(file_path)
    
    # Cast all int columns to LongType
    for col_name, col_type in df.dtypes:
        if col_type == 'int':
            df = df.withColumn(col_name, col(col_name).cast(LongType()))
    
    dfs.append(df)
    
combined_df = dfs[0]
for df in dfs[1:]:
    combined_df = combined_df.unionByName(df, allowMissingColumns=True)

combined_df.coalesce(1).write.mode("overwrite").parquet(MINIO_UNPARTITIONED_TARGET_PATH)

df_partitioned = (combined_df
    .withColumn("trip_year", year(col("tpep_pickup_datetime")))
    .withColumn("trip_month", month(col("tpep_pickup_datetime")))
)

df_partitioned.write.mode("overwrite").partitionBy("trip_year", "trip_month").parquet(MINIO_PARTITIONED_TARGET_PATH)


In [None]:
spark.stop()