In [None]:
import argparse

from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [None]:

def load_config(spark_context: SparkContext):
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", "minio_access_key")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key","minio_secret_key")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://minio:9000")
    # spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID", "<Your-MinIO-AccessKey>"))
    # spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key",os.getenv("AWS_SECRET_ACCESS_KEY", "<Your-MinIO-SecretKey>"))
    # spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT", "<Your-MinIO-Endpoint>"))
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
    # spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
    # spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
    # spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")


spark = (SparkSession.builder \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    # # .config("spark.jars.packages, "org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
    # .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate())

# Disable below line to see INFO logs
spark.sparkContext.setLogLevel("ERROR")

load_config(spark.sparkContext)


In [None]:
read_filepath = "s3a://gh-archive-data-raw/gh-archives/year=2023/month=01/day=01/hour=4/"

In [None]:
if __name__ == "__main__":
    # parser = argparse.ArgumentParser()
    # parser.add_argument("--date", help="Date in format YYYY-MM-DD", required=True)
    # parser.add_argument("--source_files_pattern", help="Source files pattern for the GH archive to process.", required=True)
    # parser.add_argument("--destination_files_pattern", help="Destination files pattern for the GH archive to process.", required=True)

    # args = parser.parse_args()
    # date = args.date

    # read_filepath = args.source_files_pattern
    # write_filepath = args.destination_files_pattern
    # read_filepath=read_filepath.format(date)
    # print(f"date received: {date}")


    print(f"read_filepath: {read_filepath}")
    df = spark.read.json(read_filepath)


    allowed_events = [
        "PushEvent",
        "ForkEvent",
        "PublicEvent",
        "WatchEvent",
        "PullRequestEvent",
    ]

    main_df = df.select(
        F.col("id").alias("event_id"),
        F.col("type").alias("event_type"),
        F.to_timestamp( F.col("created_at"), "yyyy-MM-dd'T'HH:mm:ss'Z'" ).alias("created_at"),
        F.col("repo.id").alias("repository_id"),
        F.col("repo.name").alias("repository_name"),
        F.col("repo.url").alias("repository_url"),
        F.col("actor.id").alias("user_id"),
        F.col("actor.login").alias("user_name"),
        F.col("actor.url").alias("user_url"),
        F.col("actor.avatar_url").alias("user_avatar_url"),
        F.col("org.id").alias("org_id"),
        F.col("org.login").alias("org_name"),
        F.col("org.url").alias("org_url"),
        F.col("org.avatar_url").alias("org_avatar_url"),
        F.col("payload.push_id").alias("push_id"),
        F.col("payload.distinct_size").alias("number_of_commits"),
        F.col("payload.pull_request.base.repo.language").alias("language"),
    ).filter(
        F.col("type").isin(allowed_events)
    )

    main_df = main_df.withColumn("year", F.year("created_at")) \
        .withColumn("month", F.month("created_at")) \
        .withColumn("day", F.dayofmonth("created_at")) \
        .withColumn("hour", F.hour("created_at")) \
        .withColumn("minute", F.minute("created_at")) \
        .withColumn("second", F.second("created_at")) \
    
    # add timestamp field
    main_df = main_df.withColumn("ts", F.unix_timestamp("created_at", "yyyy-MM-dd'T'HH:mm:ss'Z'"))

    # add update date field 
    main_df = main_df.withColumn("updated_at", F.current_timestamp())



In [None]:
main_df.show()

In [None]:
# For custom configuration visit 
# https://hudi.apache.org/docs/configurations/

hudi_options = {
    "hoodie.table.name": "gh_archive",
    "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.recordkey.field": "event_id",
    "hoodie.datasource.write.precombine.field": "updated_at",
    "hoodie.datasource.write.partitionpath.field": "year, month, day, hour",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    # "hoodie.datasource.hive_sync.enable": "true",
    # "hoodie.datasource.hive_sync.database": "<your_database_name>",
    # "hoodie.datasource.hive_sync.table": "<your_table_name>",
    # "hoodie.datasource.hive_sync.partition_fields": "<your_partitionkey_field>",
    # "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
    # "hoodie.datasource.hive_sync.use_jdbc": "false",
    # "hoodie.datasource.hive_sync.mode": "hms"
    'hoodie.upsert.shuffle.parallelism': 2,
    'hoodie.insert.shuffle.parallelism': 2
}


main_df.write.format("hudi"). \
    options(**hudi_options). \
    mode("append"). \
    save("s3a://gh-archive-data-curated/hudi/gh_archive/")

In [None]:
gh_archive_df = spark.read.format("hudi").load("s3a://gh-archive-data-curated/hudi/gh_archive/")
gh_archive_df.createOrReplaceTempView("gh_archive")

In [None]:
gh_archive_df.count()

In [None]:
gh_archive_df.show(truncate=False)