In [1]:
from pyspark.sql import SparkSession,functions as F
from pyspark import StorageLevel
from pyspark import SparkConf
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, ArrayType

# Step 1: Define SparkConf and Set Configurations
conf = SparkConf()

# General Application and Driver Settings
conf.set("spark.app.name", "ComprehensiveSparkJob-John Learning ")  # Application name
conf.set("spark.master", "local[*]")                # Run Spark locally with all available cores
conf.set("spark.driver.memory", "4g")               # Memory allocated to the driver program
conf.set("spark.driver.cores", "2")                 # Number of CPU cores for the driver
conf.set("spark.ui.port", "4040")                   # Web UI port (default is 4040)
conf.set("spark.executor.memoryOverhead", "1g")


# # Executor Settings
conf.set("spark.executor.memory", "4g")             # Memory allocated per executor
conf.set("spark.executor.cores", "2")               # Number of cores per executor
conf.set("spark.executor.instances", "2")           # Number of executor instances


conf.set("spark.default.parallelism", "200")          # Default parallelism (number of partitions)
conf.set("spark.sql.shuffle.partitions", "200")       # Partitions for shuffle operations
# conf.set("spark.task.cpus", "1")                    # Number of CPUs allocated per task

# # Data Handling Settings
conf.set("spark.memory.fraction", "0.6")            # Fraction of JVM heap for execution and storage
conf.set("spark.memory.storageFraction", "0.4")     # Fraction of memory for caching data

# # Serialization Settingsk
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")  # Use Kryo serialization
conf.set("spark.kryo.registrationRequired", "false")   # Ensure classes are registered for Kryo
conf.set("spark.kryo.classesToRegister", "org.apache.spark.sql.Row")  # Example: Registering a class

# # Debugging and Logging
conf.set("spark.eventLog.enabled", "true")           # Enable event logging
conf.set("spark.eventLog.dir", "/root/spark_log/spark-events/")  # Directory for event logs
conf.set("spark.history.fs.logDirectory", "/root/spark_log/spark-history/")  # Spark History Server logs
conf.set("spark.local.dir", "/root/spark_log/spark_cache/")  # Specify a custom directory

conf.set("spark.memory.offHeap.enabled", "true")
conf.set("spark.memory.offHeap.size", "2g")  # Adjust based on available memory
conf.set("spark.sql.adaptive.enabled", "true")

conf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC -Xmx5g -Xms5g")
conf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/root/spark_logspark_heap_dump.hprof")
conf.set("spark.executor.extraJavaOptions", "-XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/root/spark_log/gc.log")
conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")

# Step 2: Initialize SparkSession with Configurations
spark = SparkSession.builder.config(conf=conf).getOrCreate()


file_path = "/root/docker_dataset/large-file.json"  # Replace with the path to your JSON file
df = spark.read.option("multiline", "true").json(file_path)
df.persist(StorageLevel.MEMORY_AND_DISK)
print(df.rdd.getNumPartitions())  # Check how many partitions are there
df = df.repartition(10)
# Show the schema to understand the structure


# Explode the 'values' array to flatten the structure
df_exploded = df.select(F.explode(F.col("values")).alias("value"))

commit_schema = ArrayType(StructType([
    StructField("sha", StringType(), True),
    StructField("author", StructType([
        StructField("email", StringType(), True),
        StructField("name", StringType(), True)
    ]), True),
    StructField("message", StringType(), True),
    StructField("distinct", BooleanType(), True),
    StructField("url", StringType(), True)
]))

# Explode the 'entities' array inside each 'value'
df_exploded = df_exploded.select(
    "value.*", 
    F.explode_outer(F.col("value.payload.commits")).alias("commits")
)


df_exploded.persist(StorageLevel.MEMORY_AND_DISK)

# df_exploded.printSchema()
print(df_exploded.rdd.getNumPartitions())  # Check how many partitions are there

# Now, you can select the specific columns you're interested in
df_final = df_exploded.select(
    F.col("id").alias("event_id"),
    F.col("type").alias("event_type"),
    F.col("public").alias("is_public"),
    F.col("created_at").alias("event_timestamp"),    
    # Repository details
    F.col("repo.id").alias("repo_id"),
    F.col("repo.name").alias("repo_name"),
    F.col("repo.url").alias("repo_url"),    
    # Actor details
    F.col("actor.id").alias("actor_id"),
    F.col("actor.login").alias("actor_login"),
    F.col("actor.gravatar_id").alias("actor_gravatar_id"),
    F.col("actor.url").alias("actor_profile_url"),
    F.col("actor.avatar_url").alias("actor_avatar_url"),  
    # Commit details (after explode)
    F.col("commits.sha").alias("commit_sha"),
    F.col("commits.message").alias("commit_message"),
    F.col("commits.distinct").alias("commit_distinct"),
    F.col("commits.url").alias("commit_url"),    
    # Author details inside commits
    F.col("commits.author.email").alias("author_email"),
    F.col("commits.author.name").alias("author_name"),
    # Payload details
    F.col("payload.ref").alias("ref"),
    F.col("payload.ref_type").alias("ref_type"),
    F.col("payload.master_branch").alias("master_branch"),
    F.col("payload.description").alias("repo_description"),
    F.col("payload.pusher_type").alias("pusher_type"),
    F.col("payload.push_id").alias("push_id"),
    F.col("payload.size").alias("push_size"),
    F.col("payload.distinct_size").alias("distinct_push_size"),
    F.col("payload.head").alias("commit_head"),
    F.col("payload.before").alias("commit_before") 
)

df_final.persist(StorageLevel.MEMORY_AND_DISK)
df_final.write.parquet("/root/docker_dataset/big_data.parquet",mode="overwrite")
df_parquet = spark.read.parquet("/root/docker_dataset/big_data.parquet")
df_parquet =df_parquet.limit(100)

df_parquet.show()

# Step 5: Stop the Spark Session
# spark.stop()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/25 20:46:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/01/25 20:46:47 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
25/01/25 20:46:56 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


1
10


                                                                                

+----------+----------------+---------+--------------------+--------+--------------------+--------------------+--------+---------------+-----------------+--------------------+--------------------+--------------------+-----------------------------+---------------+--------------------+--------------------+-------------------+--------------------+--------+-------------+--------------------+-----------+---------+---------+------------------+--------------------+--------------------+
|  event_id|      event_type|is_public|     event_timestamp| repo_id|           repo_name|            repo_url|actor_id|    actor_login|actor_gravatar_id|   actor_profile_url|    actor_avatar_url|          commit_sha|               commit_message|commit_distinct|          commit_url|        author_email|        author_name|                 ref|ref_type|master_branch|    repo_description|pusher_type|  push_id|push_size|distinct_push_size|         commit_head|       commit_before|
+----------+----------------+---