In [1]:
from kafka import KafkaConsumer
bootstrap_servers = ['localhost:29092']
consumer = KafkaConsumer( bootstrap_servers=bootstrap_servers)
consumer.topics()

{'__confluent.support.metrics',
 '_schemas',
 'connect-status',
 'connect_configs',
 'connect_offsets',
 'source.public.grades_streaming',
 'source.public.raw_grades',
 'test'}

# PROCESSING AND WRITE TO HUDI USING STRUCTURED STREAMING #

In [8]:
from scripts.data_cleaners import BaseDataCleaner, AcademicDataCleaner, GradeDataCleaner
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    from_json, col, when, lit, current_timestamp, 
    concat_ws, split, expr
)
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, LongType, 
    FloatType, TimestampType
)
import sys
sys.path.append('etl/scripts')  # Add scripts directory to path
from scripts.data_cleaners import AcademicDataCleaner, BaseDataCleaner

schema = StructType([
    StructField("id", LongType(), True),  # bigint → LongType()
    StructField("schoolyear", StringType(), True),  # character varying → StringType()
    StructField("semester", StringType(), True),
    StructField("code", StringType(), True),
    StructField("description", StringType(), True),
    StructField("units", IntegerType(), True),  # integer → IntegerType()
    StructField("instructor_id", StringType(), True),
    StructField("instructor_name", StringType(), True),
    StructField("srcode", StringType(), True),
    StructField("fullname", StringType(), True),
    StructField("campus", StringType(), True),
    StructField("program", StringType(), True),
    StructField("major", StringType(), True),
    StructField("yearlevel", StringType(), True),
    StructField("curriculum", StringType(), True),
    StructField("class_section", StringType(), True),
    StructField("grade_final", StringType(), True),
    StructField("grade_reexam", StringType(), True),
    StructField("status", StringType(), True)
])


# Define transform function
def transform(df, spark):
    """Cleans and processes the extracted data."""
    grade_cleaner = GradeDataCleaner()
    df = BaseDataCleaner.standardize_case(df, ['grade_final', 'campus', 'semester', 'schoolyear'])
    df = AcademicDataCleaner.clean_semesters(df)
    df = BaseDataCleaner.remove_null_strings(df, 'semester')
    
    df = BaseDataCleaner.clean_strings(df, [
        'schoolyear', 'semester', 'code', 'description', 'units', 'instructor_id', 
        'instructor_name', 'srcode', 'fullname', 'campus', 'program', 
        'grade_final', 'grade_reexam', 'status', 'major', 'curriculum', 'class_section'
    ])
    
    df = AcademicDataCleaner.clean_schoolyear(df)
    df = grade_cleaner.process_grades(df)
    df = BaseDataCleaner.remove_null_strings(df, 'program')
    
    df = grade_cleaner.allow_numerical_data(df, "grade_reexam")

    df = AcademicDataCleaner.cast_columns(df, [("id", "int"), ("units", "int"), 
                                               ("grade_numeric", "decimal(5,2)")])
    
    df = grade_cleaner.filter_incomplete_grades(df)

    df = AcademicDataCleaner.get_valid_schoolyears(df)

    df = AcademicDataCleaner.create_yearsem_order(df)
    df = AcademicDataCleaner.map_program_ids(df, spark, "C:/LEONAIDAS/program_with_id.csv")
    
    return df

# Configure Spark with additional Kafka configs
spark = (SparkSession.builder
         .appName("KafkaToHudiProcessor")
         .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
         .config("spark.jars.packages", 
                ("org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0,"
                 "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,"
                 "org.apache.kafka:kafka-clients:3.3.0"))
         .config('spark.sql.extensions', 'org.apache.hudi.spark3.sql.HoodieSparkSessionExtension')
         .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog')
         .config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark_warehouse/checkpoints")
         .getOrCreate())

# Define Kafka configuration
kafka_options = {
    "kafka.bootstrap.servers": "localhost:29092",  # Update with your Kafka broker
    "subscribe": "source.public.grades_streaming",
    "startingOffsets": "earliest",
    "kafka.security.protocol": "PLAINTEXT",
    "failOnDataLoss": "false",
    "maxOffsetsPerTrigger": "100"  # Control batch size
}

# Read from Kafka with improved configuration
kafka_df = (spark
            .readStream
            .format("kafka")
            .options(**kafka_options)
            .load())

# Parse JSON with error handling
parsed_df = (kafka_df
             .select(from_json(
                 col("value").cast("string"),
                 schema
             ).alias("data"))
             .select("data.*")
             .withColumn("processing_time", current_timestamp()))

# Enhanced Hudi options
hudi_options = {
    'hoodie.table.name': 'grades',
    'hoodie.datasource.write.recordkey.field': 'id',
    'hoodie.datasource.write.partitionpath.field': 'schoolyear',
    'hoodie.datasource.write.table.name': 'grades',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': 'processing_time',
    'hoodie.upsert.shuffle.parallelism': '2',
    'hoodie.insert.shuffle.parallelism': '2',
    'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
    'hoodie.cleaner.commits.retained': '2',
    'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.SimpleKeyGenerator'
}

# Enhanced batch processing function
def process_batch(batch_df, batch_id):
    try:
        if batch_df.count() > 0:
            # Apply transformations
            transformed_df = transform(batch_df, spark)
            
            # Write to Hudi with error handling
            transformed_df.write \
                .format("hudi") \
                .options(**hudi_options) \
                .mode("append") \
                .save("C:/tmp/spark_warehouse/from_kafka")
            
            print(f"Successfully processed batch {batch_id} with {batch_df.count()} records")
    except Exception as e:
        print(f"Error processing batch {batch_id}: {str(e)}")
        raise e

# Start streaming with improved configuration
query = (parsed_df.writeStream
         .foreachBatch(process_batch)
         .outputMode("update")
         .option("checkpointLocation", "C:/tmp/spark_warehouse/checkpoints")
         .trigger(processingTime="2 minutes")
         .start())

# Add query monitoring
while query.isActive:
    print(f"Active stream stats: {query.status}")
    print(f"Recent progress: {query.recentProgress}")
    query.awaitTermination(60)  # Check status every minute

Active stream stats: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}
Recent progress: []


StreamingQueryException: Query [id = f64b7897-5c67-4433-8be8-6c168c2ecfe4, runId = 6a6f0405-55c4-4b9a-b760-0ad9bd9e2258] terminated with exception: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "c:\Users\denve\AppData\Local\Programs\Python\Python310\lib\site-packages\py4j\clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "c:\Users\denve\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\utils.py", line 272, in call
    raise e
  File "c:\Users\denve\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\utils.py", line 269, in call
    self.func(DataFrame(jdf, self.session), batch_id)
  File "C:\Users\denve\AppData\Local\Temp\ipykernel_26044\2848879293.py", line 142, in process_batch
    raise e
  File "C:\Users\denve\AppData\Local\Temp\ipykernel_26044\2848879293.py", line 137, in process_batch
    .save("C:/tmp/spark_warehouse/from_kafka")
  File "c:\Users\denve\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\readwriter.py", line 968, in save
    self._jwrite.save(path)
  File "c:\Users\denve\AppData\Local\Programs\Python\Python310\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "c:\Users\denve\AppData\Local\Programs\Python\Python310\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
    return f(*a, **kw)
  File "c:\Users\denve\AppData\Local\Programs\Python\Python310\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o1228.save.
: org.apache.hudi.exception.HoodieUpsertException: Failed upsert schema compatibility check
	at org.apache.hudi.table.HoodieTable.validateUpsertSchema(HoodieTable.java:852)
	at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:140)
	at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:224)
	at org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:431)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:132)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:244)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
	at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
	at com.sun.proxy.$Proxy32.call(Unknown Source)
	at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:51)
	at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:51)
	at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:32)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:666)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:664)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:664)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Caused by: org.apache.hudi.exception.HoodieException: Failed to read schema/check compatibility for base path C:/tmp/spark_warehouse/from_kafka
	at org.apache.hudi.table.HoodieTable.validateSchema(HoodieTable.java:840)
	at org.apache.hudi.table.HoodieTable.validateUpsertSchema(HoodieTable.java:850)
	... 75 more
Caused by: org.apache.hudi.exception.SchemaCompatibilityException: Failed schema compatibility check
writerSchema: {"type":"record","name":"grades_record","namespace":"hoodie.grades","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"id","type":["null","int"],"default":null},{"name":"schoolyear","type":["null","string"],"default":null},{"name":"semester","type":["null","string"],"default":null},{"name":"code","type":["null","string"],"default":null},{"name":"description","type":["null","string"],"default":null},{"name":"units","type":["null","int"],"default":null},{"name":"instructor_id","type":["null","string"],"default":null},{"name":"instructor_name","type":["null","string"],"default":null},{"name":"srcode","type":["null","string"],"default":null},{"name":"fullname","type":["null","string"],"default":null},{"name":"campus","type":["null","string"],"default":null},{"name":"program","type":["null","string"],"default":null},{"name":"major","type":["null","string"],"default":null},{"name":"yearlevel","type":["null","string"],"default":null},{"name":"curriculum","type":["null","string"],"default":null},{"name":"class_section","type":["null","string"],"default":null},{"name":"grade_final","type":["null","string"],"default":null},{"name":"grade_reexam","type":["null","string"],"default":null},{"name":"status","type":["null","string"],"default":null},{"name":"processing_time","type":{"type":"long","logicalType":"timestamp-micros"}},{"name":"grade_numeric","type":["null",{"type":"fixed","name":"fixed","namespace":"hoodie.grades.grades_record.grade_numeric","size":3,"logicalType":"decimal","precision":5,"scale":2}],"default":null},{"name":"grade_classification","type":"string"},{"name":"start_year","type":["null","int"],"default":null},{"name":"year_sem","type":"string"},{"name":"program_id","type":["null","int"],"default":null}]}
tableSchema: {"type":"record","name":"grades_record","namespace":"hoodie.grades","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"id","type":["null","long"],"default":null},{"name":"schoolyear","type":["null","string"],"default":null},{"name":"semester","type":["null","string"],"default":null},{"name":"code","type":["null","string"],"default":null},{"name":"description","type":["null","string"],"default":null},{"name":"units","type":["null","int"],"default":null},{"name":"instructor_id","type":["null","string"],"default":null},{"name":"instructor_name","type":["null","string"],"default":null},{"name":"srcode","type":["null","string"],"default":null},{"name":"fullname","type":["null","string"],"default":null},{"name":"campus","type":["null","string"],"default":null},{"name":"program","type":["null","string"],"default":null},{"name":"major","type":["null","string"],"default":null},{"name":"yearlevel","type":["null","string"],"default":null},{"name":"curriculum","type":["null","string"],"default":null},{"name":"class_section","type":["null","string"],"default":null},{"name":"grade_final","type":["null","string"],"default":null},{"name":"grade_reexam","type":["null","string"],"default":null},{"name":"status","type":["null","string"],"default":null}]}
	at org.apache.hudi.avro.AvroSchemaUtils.checkSchemaCompatible(AvroSchemaUtils.java:340)
	at org.apache.hudi.table.HoodieTable.validateSchema(HoodieTable.java:838)
	... 76 more



# PROCESS AND WRITE TO BOTH HUDI AND DATABASE #

In [None]:
from scripts.data_cleaners import BaseDataCleaner, AcademicDataCleaner, GradeDataCleaner
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    from_json, col, when, lit, current_timestamp, 
    concat_ws, split, expr
)
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, LongType, 
    FloatType, TimestampType
)
import sys
from datetime import datetime
sys.path.append('etl/scripts')  # Add scripts directory to path
from scripts.data_cleaners import AcademicDataCleaner, BaseDataCleaner

schema = StructType([
    StructField("id", LongType(), True),  # bigint → LongType()
    StructField("schoolyear", StringType(), True),  # character varying → StringType()
    StructField("semester", StringType(), True),
    StructField("code", StringType(), True),
    StructField("description", StringType(), True),
    StructField("units", IntegerType(), True),  # integer → IntegerType()
    StructField("instructor_id", StringType(), True),
    StructField("instructor_name", StringType(), True),
    StructField("srcode", StringType(), True),
    StructField("fullname", StringType(), True),
    StructField("campus", StringType(), True),
    StructField("program", StringType(), True),
    StructField("major", StringType(), True),
    StructField("yearlevel", StringType(), True),
    StructField("curriculum", StringType(), True),
    StructField("class_section", StringType(), True),
    StructField("grade_final", StringType(), True),
    StructField("grade_reexam", StringType(), True),
    StructField("status", StringType(), True)
])


# Define transform function
def transform(df, spark):
    """Cleans and processes the extracted data."""
    grade_cleaner = GradeDataCleaner()
    df = BaseDataCleaner.standardize_case(df, ['grade_final', 'campus', 'semester', 'schoolyear'])
    df = AcademicDataCleaner.clean_semesters(df)
    df = BaseDataCleaner.remove_null_strings(df, 'semester')
    
    df = BaseDataCleaner.clean_strings(df, [
        'schoolyear', 'semester', 'code', 'description', 'units', 'instructor_id', 
        'instructor_name', 'srcode', 'fullname', 'campus', 'program', 
        'grade_final', 'grade_reexam', 'status', 'major', 'curriculum', 'class_section'
    ])
    
    df = AcademicDataCleaner.clean_schoolyear(df)
    df = grade_cleaner.process_grades(df)
    df = BaseDataCleaner.remove_null_strings(df, 'program')
    
    df = grade_cleaner.allow_numerical_data(df, "grade_reexam")

    df = AcademicDataCleaner.cast_columns(df, [("id", "int"), ("units", "int"), 
                                               ("grade_numeric", "decimal(5,2)")])
    
    df = grade_cleaner.filter_incomplete_grades(df)

    df = AcademicDataCleaner.get_valid_schoolyears(df)

    df = AcademicDataCleaner.create_yearsem_order(df)
    df = AcademicDataCleaner.map_program_ids(df, spark, "C:/LEONAIDAS/program_with_id.csv")

    # Add processing_time column
    #df = df.withColumn("processing_time", current_timestamp())
    
    return df

# Configure Spark with Kafka and Hudi dependencies
spark = (SparkSession.builder
         .appName("KafkaToHudiProcessor")
         .master("local[*]")
         .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
         .config('spark.jars.packages', 
                ('org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,'  # Match your Spark version
                 'org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0,'
                 'org.postgresql:postgresql:42.7.5'))  # PostgreSQL driver
         .config('spark.hadoop.javax.jdo.option.ConnectionDriverName', 'org.postgresql.Driver')
         .config("spark.sql.extensions", "org.apache.hudi.spark3.sql.HoodieSparkSessionExtension")
         .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
         .config("spark.streaming.stopGracefullyOnShutdown", "true")
         .config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark_warehouse/checkpoints")
         .getOrCreate())

# Optimized Kafka configuration
kafka_options = {
    "kafka.bootstrap.servers": "localhost:29092",
    "subscribe": "source.public.grades_streaming",
    "startingOffsets": "earliest",
    "kafka.security.protocol": "PLAINTEXT",
    "failOnDataLoss": "false",
    "fetchOffset.numRetries": "5",
    "fetch.max.wait.ms": "5000",
    "maxOffsetsPerTrigger": "100"  # Adjust based on your needs
}

# Read from Kafka with improved configuration
kafka_df = (spark
            .readStream
            .format("kafka")
            .options(**kafka_options)
            .load())

# Parse JSON with error handling
parsed_df = (kafka_df
             .select(from_json(
                 col("value").cast("string"),
                 schema
             ).alias("data"))
             .select("data.*"))

# Enhanced Hudi options for update handling
hudi_options = {
    'hoodie.table.name': 'grades',
    'hoodie.datasource.write.recordkey.field': 'id',
    'hoodie.datasource.write.partitionpath.field': 'schoolyear',
    'hoodie.datasource.write.table.name': 'grades',
    'hoodie.datasource.write.operation': 'upsert',  # Use upsert for updates
    'hoodie.datasource.write.precombine.field': 'processing_time',  # Change back to processing_time
    'hoodie.upsert.shuffle.parallelism': '2',
    'hoodie.insert.shuffle.parallelism': '2',
    'hoodie.bulkinsert.shuffle.parallelism': '2',
    'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
    'hoodie.cleaner.commits.retained': '2',
    'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
    'hoodie.write.lock.provider': 'org.apache.hudi.client.transaction.lock.InProcessLockProvider'
}

def process_batch(batch_df, batch_id):
    try:
        if batch_df.rdd.isEmpty():
            print(f"Batch {batch_id} is empty, skipping...")
            return

        # Cache the batch for multiple operations
        batch_df.cache()
        
        start_time = datetime.now()
        
        # Apply transformations
        transformed_df = transform(batch_df, spark)
        transformed_df.cache()
        
        try:
            # Add processing_time for Hudi
            hudi_df = transformed_df.withColumn("processing_time", current_timestamp())
            
            # Write to Hudi
            hudi_df.write \
                .format("hudi") \
                .options(**hudi_options) \
                .mode("append") \
                .save("C:/tmp/spark_warehouse/from_kafka")
            
            transformed_df.write \
            .format("jdbc") \
            .option("url", "jdbc:postgresql://localhost:5433/CAIST") \
            .option("dbtable", "processed_grades") \
            .option("user", "postgres") \
            .option("password", "password") \
            .option("driver", "org.postgresql.Driver") \
            .mode("append") \
            .save()

            # Log metrics
            end_time = datetime.now()
            duration = (end_time - start_time).total_seconds()
            input_count = batch_df.count()
            output_count = transformed_df.count()
            
            print(f"""
            Batch {batch_id} metrics:
            - Processing time: {duration:.2f} seconds
            - Input records: {input_count}
            - Output records: {output_count}
            - Processing rate: {output_count/duration:.2f} records/second
            """)
            
        finally:
            # Clean up cached DataFrames
            transformed_df.unpersist()
            hudi_df.unpersist()
            batch_df.unpersist()
            
    except Exception as e:
        print(f"Error processing batch {batch_id}: {str(e)}")
        raise e

# Start streaming with monitoring
query = (parsed_df.writeStream
         .foreachBatch(process_batch)
         .outputMode("update")
         .option("checkpointLocation", "C:/tmp/spark_warehouse/checkpoints")
         .trigger(processingTime="2 minutes")
         .start())

# Enhanced monitoring
while query.isActive:
    try:
        stats = query.status
        progress = query.recentProgress
        
        print(f"""
        Stream Status: {stats['message']}
        Data Available: {stats.get('isDataAvailable', False)}
        Trigger Active: {stats.get('isTriggerActive', False)}
        Records Processed: {progress[-1]['numInputRows'] if progress else 0}
        """)
        
        query.awaitTermination(30)  # Check every 2 minutes
        
    except Exception as e:
        print(f"Monitoring error: {str(e)}")

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: Invalid Spark URL: spark://HeartbeatReceiver@Geo_Lappy.mshome.net:53849
	at org.apache.spark.rpc.RpcEndpointAddress$.apply(RpcEndpointAddress.scala:66)
	at org.apache.spark.rpc.netty.NettyRpcEnv.asyncSetupEndpointRefByURI(NettyRpcEnv.scala:140)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.executor.Executor.<init>(Executor.scala:244)
	at org.apache.spark.scheduler.local.LocalEndpoint.<init>(LocalSchedulerBackend.scala:64)
	at org.apache.spark.scheduler.local.LocalSchedulerBackend.start(LocalSchedulerBackend.scala:132)
	at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:222)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:585)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)


# IMPLEMENTING UPSERTS TO DB # 

In [2]:
from scripts.data_cleaners import BaseDataCleaner, AcademicDataCleaner, GradeDataCleaner
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    from_json, col, when, lit, current_timestamp, 
    concat_ws, split, expr
)
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, LongType, 
    FloatType, TimestampType
)
import sys
from datetime import datetime
sys.path.append('etl/scripts')  # Add scripts directory to path
from scripts.data_cleaners import AcademicDataCleaner, BaseDataCleaner

schema = StructType([
    StructField("id", LongType(), True),  # bigint → LongType()
    StructField("schoolyear", StringType(), True),  # character varying → StringType()
    StructField("semester", StringType(), True),
    StructField("code", StringType(), True),
    StructField("description", StringType(), True),
    StructField("units", IntegerType(), True),  # integer → IntegerType()
    StructField("instructor_id", StringType(), True),
    StructField("instructor_name", StringType(), True),
    StructField("srcode", StringType(), True),
    StructField("fullname", StringType(), True),
    StructField("campus", StringType(), True),
    StructField("program", StringType(), True),
    StructField("major", StringType(), True),
    StructField("yearlevel", StringType(), True),
    StructField("curriculum", StringType(), True),
    StructField("class_section", StringType(), True),
    StructField("grade_final", StringType(), True),
    StructField("grade_reexam", StringType(), True),
    StructField("status", StringType(), True)
])


# Define transform function
def transform(df, spark):
    """Cleans and processes the extracted data."""
    grade_cleaner = GradeDataCleaner()
    df = BaseDataCleaner.standardize_case(df, ['grade_final', 'campus', 'semester', 'schoolyear'])
    df = AcademicDataCleaner.clean_semesters(df)
    df = BaseDataCleaner.remove_null_strings(df, 'semester')
    
    df = BaseDataCleaner.clean_strings(df, [
        'schoolyear', 'semester', 'code', 'description', 'units', 'instructor_id', 
        'instructor_name', 'srcode', 'fullname', 'campus', 'program', 
        'grade_final', 'grade_reexam', 'status', 'major', 'curriculum', 'class_section'
    ])
    
    df = AcademicDataCleaner.clean_schoolyear(df)
    df = grade_cleaner.process_grades(df)
    df = BaseDataCleaner.remove_null_strings(df, 'program')
    
    df = grade_cleaner.allow_numerical_data(df, "grade_reexam")

    df = AcademicDataCleaner.cast_columns(df, [("id", "int"), ("units", "int"), 
                                               ("grade_numeric", "decimal(5,2)")])
    
    df = grade_cleaner.filter_incomplete_grades(df)

    df = AcademicDataCleaner.get_valid_schoolyears(df)

    df = AcademicDataCleaner.create_yearsem_order(df)
    df = AcademicDataCleaner.map_program_ids(df, spark, "C:/LEONAIDAS/program_with_id.csv")

    # Add processing_time column
    #df = df.withColumn("processing_time", current_timestamp())
    
    return df

# Configure Spark with Kafka and Hudi dependencies
spark = (SparkSession.builder
         .appName("KafkaToHudiProcessor")
         .master("local[*]")
         .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
         .config('spark.jars.packages', 
                ('org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,'  # Match your Spark version
                 'org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0,'
                 'org.postgresql:postgresql:42.7.5'))  # PostgreSQL driver
         .config('spark.hadoop.javax.jdo.option.ConnectionDriverName', 'org.postgresql.Driver')
         .config("spark.sql.extensions", "org.apache.hudi.spark3.sql.HoodieSparkSessionExtension")
         .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
         .config("spark.streaming.stopGracefullyOnShutdown", "true")
         .config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark_warehouse/checkpoints")
         .getOrCreate())

# Optimized Kafka configuration
kafka_options = {
    "kafka.bootstrap.servers": "localhost:29092",
    "subscribe": "source.public.grades_streaming",
    "startingOffsets": "earliest",
    "kafka.security.protocol": "PLAINTEXT",
    "failOnDataLoss": "false",
    "fetchOffset.numRetries": "5",
    "fetch.max.wait.ms": "5000",
    "maxOffsetsPerTrigger": "100"  # Adjust based on your needs
}

# Read from Kafka with improved configuration
kafka_df = (spark
            .readStream
            .format("kafka")
            .options(**kafka_options)
            .load())

# Parse JSON with error handling
parsed_df = (kafka_df
             .select(from_json(
                 col("value").cast("string"),
                 schema
             ).alias("data"))
             .select("data.*"))

# Enhanced Hudi options for update handling
hudi_options = {
    'hoodie.table.name': 'grades',
    'hoodie.datasource.write.recordkey.field': 'id',
    'hoodie.datasource.write.partitionpath.field': 'schoolyear',
    'hoodie.datasource.write.table.name': 'grades',
    'hoodie.datasource.write.operation': 'upsert',  # Use upsert for updates
    'hoodie.datasource.write.precombine.field': 'processing_time',  # Change back to processing_time
    'hoodie.upsert.shuffle.parallelism': '2',
    'hoodie.insert.shuffle.parallelism': '2',
    'hoodie.bulkinsert.shuffle.parallelism': '2',
    'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
    'hoodie.cleaner.commits.retained': '2',
    'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
    'hoodie.write.lock.provider': 'org.apache.hudi.client.transaction.lock.InProcessLockProvider'
}

def process_batch(batch_df, batch_id):
    try:
        if batch_df.rdd.isEmpty():
            print(f"Batch {batch_id} is empty, skipping...")
            return

        # Cache the batch for multiple operations
        batch_df.cache()
        
        start_time = datetime.now()
        
        # Apply transformations
        transformed_df = transform(batch_df, spark)
        transformed_df.cache()
        
        try:
            # Add processing_time for Hudi
            hudi_df = transformed_df.withColumn("processing_time", current_timestamp())
            
            # Write to Hudi
            hudi_df.write \
                .format("hudi") \
                .options(**hudi_options) \
                .mode("append") \
                .save("C:/tmp/spark_warehouse/from_kafka")
            
            # Inside process_batch function, modify the PostgreSQL write section:
            transformed_df.write \
                .format("jdbc") \
                .option("url", "jdbc:postgresql://localhost:5433/CAIST") \
                .option("dbtable", """(
                    CREATE TABLE IF NOT EXISTS processed_grades (
                        id BIGINT PRIMARY KEY,
                        schoolyear VARCHAR,
                        semester VARCHAR,
                        code VARCHAR,
                        description VARCHAR, 
                        units INTEGER,
                        instructor_id VARCHAR,
                        instructor_name VARCHAR,
                        srcode VARCHAR,
                        fullname VARCHAR,
                        campus VARCHAR,
                        program VARCHAR,
                        major VARCHAR,
                        yearlevel VARCHAR,
                        curriculum VARCHAR,
                        class_section VARCHAR,
                        grade_final VARCHAR,
                        grade_reexam VARCHAR,
                        status VARCHAR,
                        grade_numeric DECIMAL(5,2),
                        grade_classification VARCHAR,
                        year_sem VARCHAR,
                        program_id INTEGER
                    );
                    INSERT INTO processed_grades VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                    ON CONFLICT (id) DO UPDATE SET
                        schoolyear = EXCLUDED.schoolyear,
                        semester = EXCLUDED.semester,
                        code = EXCLUDED.code,
                        description = EXCLUDED.description,
                        units = EXCLUDED.units,
                        instructor_id = EXCLUDED.instructor_id,
                        instructor_name = EXCLUDED.instructor_name,
                        srcode = EXCLUDED.srcode,
                        fullname = EXCLUDED.fullname,
                        campus = EXCLUDED.campus,
                        program = EXCLUDED.program,
                        major = EXCLUDED.major,
                        yearlevel = EXCLUDED.yearlevel,
                        curriculum = EXCLUDED.curriculum,
                        class_section = EXCLUDED.class_section,
                        grade_final = EXCLUDED.grade_final,
                        grade_reexam = EXCLUDED.grade_reexam,
                        status = EXCLUDED.status,
                        grade_numeric = EXCLUDED.grade_numeric,
                        grade_classification = EXCLUDED.grade_classification,
                        year_sem = EXCLUDED.year_sem,
                        program_id = EXCLUDED.program_id
                )""") \
                .option("user", "postgres") \
                .option("password", "password") \
                .option("driver", "org.postgresql.Driver") \
                .option("batchsize", "1000") \
                .mode("append") \
                .save()

            # Log metrics
            end_time = datetime.now()
            duration = (end_time - start_time).total_seconds()
            input_count = batch_df.count()
            output_count = transformed_df.count()
            
            print(f"""
            Batch {batch_id} metrics:
            - Processing time: {duration:.2f} seconds
            - Input records: {input_count}
            - Output records: {output_count}
            - Processing rate: {output_count/duration:.2f} records/second
            """)
            
        finally:
            # Clean up cached DataFrames
            transformed_df.unpersist()
            hudi_df.unpersist()
            batch_df.unpersist()
            
    except Exception as e:
        print(f"Error processing batch {batch_id}: {str(e)}")
        raise e

# Start streaming with monitoring
query = (parsed_df.writeStream
         .foreachBatch(process_batch)
         .outputMode("update")
         .option("checkpointLocation", "C:/tmp/spark_warehouse/checkpoints")
         .trigger(processingTime="2 minutes")
         .start())

# Enhanced monitoring
while query.isActive:
    try:
        stats = query.status
        progress = query.recentProgress
        
        print(f"""
        Stream Status: {stats['message']}
        Data Available: {stats.get('isDataAvailable', False)}
        Trigger Active: {stats.get('isTriggerActive', False)}
        Records Processed: {progress[-1]['numInputRows'] if progress else 0}
        """)
        
        query.awaitTermination(30)  # Check every 2 minutes
        
    except Exception as e:
        print(f"Monitoring error: {str(e)}")


        Stream Status: Initializing sources
        Data Available: False
        Trigger Active: False
        Records Processed: 0
        

        Stream Status: Waiting for next trigger
        Data Available: False
        Trigger Active: False
        Records Processed: 0
        

        Stream Status: Waiting for next trigger
        Data Available: False
        Trigger Active: False
        Records Processed: 0
        

        Stream Status: Waiting for next trigger
        Data Available: False
        Trigger Active: False
        Records Processed: 0
        

        Stream Status: Waiting for next trigger
        Data Available: False
        Trigger Active: False
        Records Processed: 0
        

        Stream Status: Waiting for next trigger
        Data Available: False
        Trigger Active: False
        Records Processed: 0
        

        Stream Status: Waiting for next trigger
        Data Available: False
        Trigger Active: False
        Recor

# Spark Psycopg 2 ETL #

In [None]:
import psycopg2, os
from psycopg2.extras import execute_values
from pyspark.sql.utils import AnalysisException

from scripts.data_cleaners import BaseDataCleaner, AcademicDataCleaner, GradeDataCleaner
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import (    
    from_json, col, when, lit, current_timestamp, 
    concat_ws, split, expr
)
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, LongType, 
    FloatType, TimestampType
)
import sys
from datetime import datetime
sys.path.append('etl/scripts')  # Add scripts directory to path
from scripts.data_cleaners import AcademicDataCleaner, BaseDataCleaner

schema = StructType([
    StructField("id", LongType(), True),  # bigint → LongType()
    StructField("schoolyear", StringType(), True),  # character varying → StringType()
    StructField("semester", StringType(), True),
    StructField("code", StringType(), True),
    StructField("description", StringType(), True),
    StructField("units", IntegerType(), True),  # integer → IntegerType()
    StructField("instructor_id", StringType(), True),
    StructField("instructor_name", StringType(), True),
    StructField("srcode", StringType(), True),
    StructField("fullname", StringType(), True),
    StructField("campus", StringType(), True),
    StructField("program", StringType(), True),
    StructField("major", StringType(), True),
    StructField("yearlevel", StringType(), True),
    StructField("curriculum", StringType(), True),
    StructField("class_section", StringType(), True),
    StructField("grade_final", StringType(), True),
    StructField("grade_reexam", StringType(), True),
    StructField("status", StringType(), True)
])






# Define transform function
def transform(df, spark):
    """Cleans and processes the extracted data."""
    grade_cleaner = GradeDataCleaner()
    df = BaseDataCleaner.standardize_case(df, ['grade_final', 'campus', 'semester', 'schoolyear'])
    df = AcademicDataCleaner.clean_semesters(df)
    df = BaseDataCleaner.remove_null_strings(df, 'semester')
    
    df = BaseDataCleaner.clean_strings(df, [
        'schoolyear', 'semester', 'code', 'description', 'units', 'instructor_id', 
        'instructor_name', 'srcode', 'fullname', 'campus', 'program', 
        'grade_final', 'grade_reexam', 'status', 'major', 'curriculum', 'class_section'
    ])
    
    df = AcademicDataCleaner.clean_schoolyear(df)
    df = grade_cleaner.process_grades(df)
    df = BaseDataCleaner.remove_null_strings(df, 'program')
    
    df = grade_cleaner.allow_numerical_data(df, "grade_reexam")

    df = AcademicDataCleaner.cast_columns(df, [("id", "int"), ("units", "int"), 
                                               ("grade_numeric", "decimal(5,2)")])
    
    df = grade_cleaner.filter_incomplete_grades(df)

    df = AcademicDataCleaner.get_valid_schoolyears(df)

    df = AcademicDataCleaner.create_yearsem_order(df)
    df = AcademicDataCleaner.map_program_ids(df, spark, "C:/LEONAIDAS/program_with_id.csv")

    # Add processing_time column
    #df = df.withColumn("processing_time", current_timestamp())
    
    return df

# Configure Spark with Kafka and Hudi dependencies
spark = (SparkSession.builder
         .appName("KafkaToHudiProcessor")
         .master("local[*]")
         .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
         .config('spark.jars.packages', 
                ('org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,'  # Match your Spark version
                 'org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0,'
                 'org.postgresql:postgresql:42.7.5'))  # PostgreSQL driver
         .config('spark.hadoop.javax.jdo.option.ConnectionDriverName', 'org.postgresql.Driver')
         .config("spark.sql.extensions", "org.apache.hudi.spark3.sql.HoodieSparkSessionExtension")
         .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
         .config("spark.streaming.stopGracefullyOnShutdown", "true")
         .config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark_warehouse/checkpoints")
         .getOrCreate())

# Optimized Kafka configuration
kafka_options = {
    "kafka.bootstrap.servers": "localhost:29092",
    "subscribe": "source.public.grades_streaming",
    "startingOffsets": "earliest",
    "kafka.security.protocol": "PLAINTEXT",
    "failOnDataLoss": "false",
    "fetchOffset.numRetries": "5",
    "fetch.max.wait.ms": "5000",
    #"maxOffsetsPerTrigger": "100"  # Adjust based on your needs
}

# Read from Kafka with improved configuration
kafka_df = (spark
            .readStream
            .format("kafka")
            .options(**kafka_options)
            .load())

# Parse JSON with error handling
parsed_df = (kafka_df
             .select(from_json(
                 col("value").cast("string"),
                 schema
             ).alias("data"))
             .select("data.*"))

# Enhanced Hudi options for update handling
hudi_options = {
    'hoodie.table.name': 'grades',
    'hoodie.datasource.write.recordkey.field': 'id',
    'hoodie.datasource.write.partitionpath.field': 'schoolyear',
    'hoodie.datasource.write.table.name': 'grades',
    'hoodie.datasource.write.operation': 'upsert',  # Use upsert for updates
    'hoodie.datasource.write.precombine.field': 'processing_time',  # Change back to processing_time
    'hoodie.upsert.shuffle.parallelism': '2',
    'hoodie.insert.shuffle.parallelism': '2',
    'hoodie.bulkinsert.shuffle.parallelism': '2',
    'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
    'hoodie.cleaner.commits.retained': '2',
    'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
    'hoodie.write.lock.provider': 'org.apache.hudi.client.transaction.lock.InProcessLockProvider'
}

def initialize_hudi_table():
    table_path = "C:/tmp/spark_warehouse/from_kafka"
    
    if not os.path.exists(table_path) or not os.listdir(table_path):
        print("Hudi table does not exist. Initializing now...")

        empty_df = spark.createDataFrame([], schema)
        empty_df.write.format("hudi") \
            .options(**hudi_options) \
            .mode("overwrite") \
            .save(table_path)
        
        print("Hudi table initialized successfully.")
    else:
        try:
            spark.read.format("hudi").load(table_path).show(1)
            print("Hudi table exists. Proceeding...")
        except AnalysisException:
            print("Error reading Hudi table. Reinitializing...")
            empty_df = spark.createDataFrame([], schema)
            empty_df.write.format("hudi") \
                .options(**hudi_options) \
                .mode("overwrite") \
                .save(table_path)
        
        print("Hudi table initialized successfully.")

def write_to_postgres(transformed_df):
    """
    Writes the transformed Spark DataFrame to PostgreSQL using Psycopg2.
    Uses batch insert/update for efficiency.
    """
    
    db_params = {
        "dbname": "CAIST",
        "user": "postgres",
        "password": "password",
        "host": "localhost",
        "port": "5433"
    }
    
    # Collect data from Spark DataFrame into a list of tuples
    data = [tuple(row) for row in transformed_df.collect()]
    
    if not data:
        print("No data to write.")
        return
    
    insert_query = """
        INSERT INTO processed_grades (
            id, schoolyear, semester, code, description, units, instructor_id, 
            instructor_name, srcode, fullname, campus, program, major, yearlevel, 
            curriculum, class_section, grade_final, grade_reexam, status, 
            grade_numeric, grade_classification, start_year, year_sem, program_id
        ) VALUES %s 
        ON CONFLICT (id) DO UPDATE SET 
            schoolyear = EXCLUDED.schoolyear,
            semester = EXCLUDED.semester,
            code = EXCLUDED.code,
            description = EXCLUDED.description,
            units = EXCLUDED.units,
            instructor_id = EXCLUDED.instructor_id,
            instructor_name = EXCLUDED.instructor_name,
            srcode = EXCLUDED.srcode,
            fullname = EXCLUDED.fullname,
            campus = EXCLUDED.campus,
            program = EXCLUDED.program,
            major = EXCLUDED.major,
            yearlevel = EXCLUDED.yearlevel,
            curriculum = EXCLUDED.curriculum,
            class_section = EXCLUDED.class_section,
            grade_final = EXCLUDED.grade_final,
            grade_reexam = EXCLUDED.grade_reexam,
            status = EXCLUDED.status,
            grade_numeric = EXCLUDED.grade_numeric,
            grade_classification = EXCLUDED.grade_classification,
            start_year = EXCLUDED.start_year,
            year_sem = EXCLUDED.year_sem,
            program_id = EXCLUDED.program_id;
    """
    
    try:
        with psycopg2.connect(**db_params) as conn:
            with conn.cursor() as cur:
                execute_values(cur, insert_query, data)
                conn.commit()
                print(f"Inserted/Updated {len(data)} records into PostgreSQL.")
    except Exception as e:
        print(f"Error writing to PostgreSQL: {e}")


def process_batch(batch_df, batch_id):
    try:
        if batch_df.rdd.isEmpty():
            print(f"Batch {batch_id} is empty, skipping...")
            return
        
        batch_df.cache()
        start_time = datetime.now()

        # Ensure the Hudi table is initialized before writing
        initialize_hudi_table()
        
        # Apply transformations
        transformed_df = transform(batch_df, spark)
        transformed_df.cache()
        
        try:
            # Write to Hudi
            hudi_df = transformed_df.withColumn("processing_time", current_timestamp())
            hudi_df.write \
                .format("hudi") \
                .options(**hudi_options) \
                .mode("append") \
                .save("C:/tmp/spark_warehouse/from_kafka")
            
            # Write to PostgreSQL using Psycopg2
            write_to_postgres(transformed_df)
            
            end_time = datetime.now()
            duration = (end_time - start_time).total_seconds()
            print(f"Batch {batch_id} processed in {duration:.2f} seconds")
        
        finally:
            transformed_df.unpersist()
            hudi_df.unpersist()
            batch_df.unpersist()
            
    except Exception as e:
        print(f"Error processing batch {batch_id}: {str(e)}")
        raise e


# Start streaming with monitoring
query = (parsed_df.writeStream
         .foreachBatch(process_batch)
         .outputMode("update")
         .option("checkpointLocation", "C:/tmp/spark_warehouse/checkpoints")
         .trigger(processingTime="30 seconds")
         .start())

# Enhanced monitoring
while query.isActive:
    try:
        stats = query.status
        progress = query.recentProgress
        
        print(f"""
        Stream Status: {stats['message']}
        Data Available: {stats.get('isDataAvailable', False)}
        Trigger Active: {stats.get('isTriggerActive', False)}
        Records Processed: {progress[-1]['numInputRows'] if progress else 0}
        """)
        
        query.awaitTermination(30)  # Check every 2 minutes
        
    except Exception as e:
        print(f"Monitoring error: {str(e)}")




        Stream Status: Initializing sources
        Data Available: False
        Trigger Active: False
        Records Processed: 0
        
+-------------------+--------------------+------------------+----------------------+--------------------+------+--------+-------+--------------------+-----+-------------+------------------+------+----------------+---------+--------------------+-----+---------+----------+-------------+-----------+------------+------+-------------+--------------------+----------+----------------+----------+--------------------+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|    id|semester|   code|         description|units|instructor_id|   instructor_name|srcode|        fullname|   campus|             program|major|yearlevel|curriculum|class_section|grade_final|grade_reexam|status|grade_numeric|grade_classification|start_year|        year_sem|program_id|     processing_time|schoolyear|
+--------

# WORKING ETL EXCEPT INITIALIZING HUDI, TABLE #

In [None]:
import psycopg2
from psycopg2.extras import execute_values

from scripts.data_cleaners import BaseDataCleaner, AcademicDataCleaner, GradeDataCleaner
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    from_json, col, when, lit, current_timestamp, 
    concat_ws, split, expr
)
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, LongType, 
    FloatType, TimestampType
)
import sys
from datetime import datetime
sys.path.append('etl/scripts')  # Add scripts directory to path
from scripts.data_cleaners import AcademicDataCleaner, BaseDataCleaner

schema = StructType([
    StructField("id", LongType(), True),  # bigint → LongType()
    StructField("schoolyear", StringType(), True),  # character varying → StringType()
    StructField("semester", StringType(), True),
    StructField("code", StringType(), True),
    StructField("description", StringType(), True),
    StructField("units", IntegerType(), True),  # integer → IntegerType()
    StructField("instructor_id", StringType(), True),
    StructField("instructor_name", StringType(), True),
    StructField("srcode", StringType(), True),
    StructField("fullname", StringType(), True),
    StructField("campus", StringType(), True),
    StructField("program", StringType(), True),
    StructField("major", StringType(), True),
    StructField("yearlevel", StringType(), True),
    StructField("curriculum", StringType(), True),
    StructField("class_section", StringType(), True),
    StructField("grade_final", StringType(), True),
    StructField("grade_reexam", StringType(), True),
    StructField("status", StringType(), True)
])


# Define transform function
def transform(df, spark):
    """Cleans and processes the extracted data."""
    grade_cleaner = GradeDataCleaner()
    df = BaseDataCleaner.standardize_case(df, ['grade_final', 'campus', 'semester', 'schoolyear'])
    df = AcademicDataCleaner.clean_semesters(df)
    df = BaseDataCleaner.remove_null_strings(df, 'semester')
    
    df = BaseDataCleaner.clean_strings(df, [
        'schoolyear', 'semester', 'code', 'description', 'units', 'instructor_id', 
        'instructor_name', 'srcode', 'fullname', 'campus', 'program', 
        'grade_final', 'grade_reexam', 'status', 'major', 'curriculum', 'class_section'
    ])
    
    df = AcademicDataCleaner.clean_schoolyear(df)
    df = grade_cleaner.process_grades(df)
    df = BaseDataCleaner.remove_null_strings(df, 'program')
    
    df = grade_cleaner.allow_numerical_data(df, "grade_reexam")

    df = AcademicDataCleaner.cast_columns(df, [("id", "int"), ("units", "int"), 
                                               ("grade_numeric", "decimal(5,2)")])
    
    df = grade_cleaner.filter_incomplete_grades(df)

    df = AcademicDataCleaner.get_valid_schoolyears(df)

    df = AcademicDataCleaner.create_yearsem_order(df)
    df = AcademicDataCleaner.map_program_ids(df, spark, "C:/LEONAIDAS/program_with_id.csv")

    # Add processing_time column
    #df = df.withColumn("processing_time", current_timestamp())
    
    return df

# Configure Spark with Kafka and Hudi dependencies
spark = (SparkSession.builder
         .appName("KafkaToHudiProcessor")
         .master("local[*]")
         .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
         .config('spark.jars.packages', 
                ('org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,'  # Match your Spark version
                 'org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0,'
                 'org.postgresql:postgresql:42.7.5'))  # PostgreSQL driver
         .config('spark.hadoop.javax.jdo.option.ConnectionDriverName', 'org.postgresql.Driver')
         .config("spark.sql.extensions", "org.apache.hudi.spark3.sql.HoodieSparkSessionExtension")
         .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
         .config("spark.streaming.stopGracefullyOnShutdown", "true")
         .config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark_warehouse/checkpoints")
         .getOrCreate())

# Optimized Kafka configuration
kafka_options = {
    "kafka.bootstrap.servers": "localhost:29092",
    "subscribe": "source.public.grades_streaming",
    "startingOffsets": "earliest",
    "kafka.security.protocol": "PLAINTEXT",
    "failOnDataLoss": "false",
    "fetchOffset.numRetries": "5",
    "fetch.max.wait.ms": "5000",
    "maxOffsetsPerTrigger": "100"  # Adjust based on your needs
}

# Read from Kafka with improved configuration
kafka_df = (spark
            .readStream
            .format("kafka")
            .options(**kafka_options)
            .load())

# Parse JSON with error handling
parsed_df = (kafka_df
             .select(from_json(
                 col("value").cast("string"),
                 schema
             ).alias("data"))
             .select("data.*"))

# Enhanced Hudi options for update handling
hudi_options = {
    'hoodie.table.name': 'grades',
    'hoodie.datasource.write.recordkey.field': 'id',
    'hoodie.datasource.write.partitionpath.field': 'schoolyear',
    'hoodie.datasource.write.table.name': 'grades',
    'hoodie.datasource.write.operation': 'upsert',  # Use upsert for updates
    'hoodie.datasource.write.precombine.field': 'processing_time',  # Change back to processing_time
    'hoodie.upsert.shuffle.parallelism': '2',
    'hoodie.insert.shuffle.parallelism': '2',
    'hoodie.bulkinsert.shuffle.parallelism': '2',
    'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
    'hoodie.cleaner.commits.retained': '2',
    'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
    'hoodie.write.lock.provider': 'org.apache.hudi.client.transaction.lock.InProcessLockProvider'
}

def write_to_postgres(transformed_df):
    """
    Writes the transformed Spark DataFrame to PostgreSQL using Psycopg2.
    Uses batch insert/update for efficiency.
    """
    
    db_params = {
        "dbname": "caist_db_v4",
        "user": "postgres",
        "password": "postgres",
        "host": "192.168.20.11",
        "port": "5432"
    }
    
    # Collect data from Spark DataFrame into a list of tuples
    data = [tuple(row) for row in transformed_df.collect()]
    
    if not data:
        print("No data to write.")
        return
    
    insert_query = """
        INSERT INTO processed_grades (
            id, schoolyear, semester, code, description, units, instructor_id, 
            instructor_name, srcode, fullname, campus, program, major, yearlevel, 
            curriculum, class_section, grade_final, grade_reexam, status, 
            grade_numeric, grade_classification, start_year, year_sem, program_id
        ) VALUES %s 
        ON CONFLICT (id) DO UPDATE SET 
            schoolyear = EXCLUDED.schoolyear,
            semester = EXCLUDED.semester,
            code = EXCLUDED.code,
            description = EXCLUDED.description,
            units = EXCLUDED.units,
            instructor_id = EXCLUDED.instructor_id,
            instructor_name = EXCLUDED.instructor_name,
            srcode = EXCLUDED.srcode,
            fullname = EXCLUDED.fullname,
            campus = EXCLUDED.campus,
            program = EXCLUDED.program,
            major = EXCLUDED.major,
            yearlevel = EXCLUDED.yearlevel,
            curriculum = EXCLUDED.curriculum,
            class_section = EXCLUDED.class_section,
            grade_final = EXCLUDED.grade_final,
            grade_reexam = EXCLUDED.grade_reexam,
            status = EXCLUDED.status,
            grade_numeric = EXCLUDED.grade_numeric,
            grade_classification = EXCLUDED.grade_classification,
            start_year = EXCLUDED.start_year,
            year_sem = EXCLUDED.year_sem,
            program_id = EXCLUDED.program_id;
    """
    
    try:
        with psycopg2.connect(**db_params) as conn:
            with conn.cursor() as cur:
                execute_values(cur, insert_query, data)
                conn.commit()
                print(f"Inserted/Updated {len(data)} records into PostgreSQL.")
    except Exception as e:
        print(f"Error writing to PostgreSQL: {e}")


def process_batch(batch_df, batch_id):
    try:
        if batch_df.rdd.isEmpty():
            print(f"Batch {batch_id} is empty, skipping...")
            return
        
        batch_df.cache()
        start_time = datetime.now()
        
        # Apply transformations
        transformed_df = transform(batch_df, spark)
        transformed_df.cache()
        
        try:
            # Write to Hudi
            hudi_df = transformed_df.withColumn("processing_time", current_timestamp())
            hudi_df.write \
                .format("hudi") \
                .options(**hudi_options) \
                .mode("append") \
                .save("C:/tmp/spark_warehouse/from_kafka")
            
            # Write to PostgreSQL using Psycopg2
            write_to_postgres(transformed_df)
            
            end_time = datetime.now()
            duration = (end_time - start_time).total_seconds()
            print(f"Batch {batch_id} processed in {duration:.2f} seconds")
        
        finally:
            transformed_df.unpersist()
            hudi_df.unpersist()
            batch_df.unpersist()
            
    except Exception as e:
        print(f"Error processing batch {batch_id}: {str(e)}")
        raise e


# Start streaming with monitoring
query = (parsed_df.writeStream
         .foreachBatch(process_batch)
         .outputMode("update")
         .option("checkpointLocation", "C:/tmp/spark_warehouse/checkpoints")
         .trigger(processingTime="2 minutes")
         .start())

# Enhanced monitoring
while query.isActive:
    try:
        stats = query.status
        progress = query.recentProgress
        
        print(f"""
        Stream Status: {stats['message']}
        Data Available: {stats.get('isDataAvailable', False)}
        Trigger Active: {stats.get('isTriggerActive', False)}
        Records Processed: {progress[-1]['numInputRows'] if progress else 0}
        """)
        
        query.awaitTermination(30)  # Check every 2 minutes
        
    except Exception as e:
        print(f"Monitoring error: {str(e)}")




        Stream Status: Initializing sources
        Data Available: False
        Trigger Active: False
        Records Processed: 0
        
Error writing to PostgreSQL: ON CONFLICT DO UPDATE command cannot affect row a second time
HINT:  Ensure that no rows proposed for insertion within the same command have duplicate constrained values.

Batch 0 processed in 14.66 seconds

        Stream Status: Waiting for next trigger
        Data Available: True
        Trigger Active: False
        Records Processed: 200
        
Error writing to PostgreSQL: ON CONFLICT DO UPDATE command cannot affect row a second time
HINT:  Ensure that no rows proposed for insertion within the same command have duplicate constrained values.

Batch 1 processed in 7.58 seconds

        Stream Status: Waiting for next trigger
        Data Available: True
        Trigger Active: False
        Records Processed: 200
        

        Stream Status: Waiting for next trigger
        Data Available: True
        Trig

In [None]:
import json
# Kafka consumer setup
topicName = 'source.public.raw_grades'
consumer = KafkaConsumer(topicName, auto_offset_reset='latest', 
                         bootstrap_servers=bootstrap_servers, group_id='grades-group')

# Process each message and write to Hudi
for msg in consumer:
    # Print the message for monitoring
    message_data = json.loads(msg.value)
    print("Received message:", message_data)

# EVAN CLEANING THROUGH ETL

In [None]:
import psycopg2, os
from psycopg2.extras import execute_values
from pyspark.sql.utils import AnalysisException

from scripts.data_cleaners import BaseDataCleaner, AcademicDataCleaner, GradeDataCleaner
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import (    
    from_json, col, when, lit, current_timestamp, 
    concat_ws, split, expr
)
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, LongType, 
    FloatType, TimestampType, DecimalType
)
import sys
from datetime import datetime
sys.path.append('etl/scripts')  # Add scripts directory to path
from scripts.data_cleaners import AcademicDataCleaner, BaseDataCleaner

schema = StructType([
    StructField("id", LongType(), True),  # bigint → LongType()
    StructField("schoolyear", StringType(), True),  # character varying → StringType()
    StructField("semester", StringType(), True),
    StructField("code", StringType(), True),
    StructField("description", StringType(), True),
    StructField("units", IntegerType(), True),  # integer → IntegerType()
    StructField("instructor_id", StringType(), True),
    StructField("instructor_name", StringType(), True),
    StructField("srcode", StringType(), True),
    StructField("fullname", StringType(), True),
    StructField("campus", StringType(), True),
    StructField("college", StringType(), True),
    StructField("program", StringType(), True),
    StructField("major", StringType(), True),
    StructField("yearlevel", StringType(), True),
    StructField("curriculum", StringType(), True),
    StructField("class_section", StringType(), True),
    StructField("grade_final", StringType(), True),
    StructField("grade_reexam", StringType(), True),
    StructField("status", StringType(), True)
])

hudi_schema = StructType([
    StructField("id", IntegerType(), True),  # bigint → LongType()
    StructField("schoolyear", StringType(), True),
    StructField("semester", StringType(), True),
    StructField("code", StringType(), True),
    StructField("description", StringType(), True),
    StructField("units", IntegerType(), True),
    StructField("instructor_id", StringType(), True),
    StructField("instructor_name", StringType(), True),
    StructField("srcode", StringType(), True),
    StructField("fullname", StringType(), True),
    StructField("campus", StringType(), True),
    StructField("college", StringType(), True),
    StructField("program", StringType(), True),
    StructField("major", StringType(), True),
    StructField("yearlevel", StringType(), True),
    StructField("curriculum", StringType(), True),
    StructField("class_section", StringType(), True),
    StructField("grade_final", StringType(), True),
    StructField("grade_reexam", StringType(), True),
    StructField("status", StringType(), True),
    StructField("grade_numeric", DecimalType(5,2), True),  # Changed from FloatType to DecimalType
    StructField("grade_classification", StringType(), True),
    StructField("start_year", IntegerType(), True),  # Changed from StringType to IntegerType
    StructField("year_sem", StringType(), True),
    StructField("program_id", IntegerType(), True),
    StructField("processing_time", TimestampType(), True),
])






# Define transform function
def transform(df, spark):
    """Cleans and processes the extracted data."""
    grade_cleaner = GradeDataCleaner()
    df = BaseDataCleaner.standardize_case(df, ['grade_final', 'campus', 'semester', 'schoolyear'])
    df = AcademicDataCleaner.clean_semesters(df)
    df = BaseDataCleaner.remove_null_strings(df, 'semester')
    
    df = BaseDataCleaner.clean_strings(df, [
        'schoolyear', 'semester', 'code', 'description', 'units', 'instructor_id', 
        'instructor_name', 'srcode', 'fullname', 'campus', 'program', 
        'grade_final', 'grade_reexam', 'status', 'major', 'curriculum', 'class_section'
    ])
    
    df = AcademicDataCleaner.clean_schoolyear(df)
    df = grade_cleaner.process_grades(df)
    df = BaseDataCleaner.remove_null_strings(df, 'program')
    
    df = grade_cleaner.allow_numerical_data(df, "grade_reexam")

    df = AcademicDataCleaner.cast_columns(df, [("id", "int"), ("units", "int"), 
                                               ("grade_numeric", "decimal(5,2)")])
    
    df = grade_cleaner.filter_incomplete_grades(df)

    df = AcademicDataCleaner.get_valid_schoolyears(df)

    df = AcademicDataCleaner.create_yearsem_order(df)
    df = AcademicDataCleaner.map_program_ids(df, spark, "C:/LEONAIDAS/program_with_id.csv")

    # Add processing_time column
    #df = df.withColumn("processing_time", current_timestamp())
    
    return df

# Configure Spark with Kafka and Hudi dependencies
spark = (SparkSession.builder
         .appName("KafkaToHudiProcessor")
         .master("local[*]")
         .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
         .config('spark.jars.packages', 
                ('org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,'  # Match your Spark version
                 'org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0,'
                 'org.postgresql:postgresql:42.7.5'))  # PostgreSQL driver
         .config('spark.hadoop.javax.jdo.option.ConnectionDriverName', 'org.postgresql.Driver')
         .config("spark.sql.extensions", "org.apache.hudi.spark3.sql.HoodieSparkSessionExtension")
         .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
         .config("spark.streaming.stopGracefullyOnShutdown", "true")
         .config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark_warehouse/checkpoints")
         # Add these network configurations
         .config("spark.driver.host", "localhost")
         .config("spark.driver.bindAddress", "localhost")
         .config("spark.network.timeout", "600s")
         .config("spark.local.dir", "C:/tmp/spark-temp")
         .config("spark.sql.warehouse.dir", "C:/tmp/spark_warehouse")
         .getOrCreate())

# Optimized Kafka configuration
kafka_options = {
    "kafka.bootstrap.servers": "localhost:29092",
    "subscribe": "source.public.raw_grades",
    "startingOffsets": "earliest",
    "kafka.security.protocol": "PLAINTEXT",
    "failOnDataLoss": "false",
    "fetchOffset.numRetries": "5",
    "fetch.max.wait.ms": "5000",
    "maxOffsetsPerTrigger": "50000" # Adjust based on your needs
}

# Read from Kafka with improved configuration
kafka_df = (spark
            .readStream
            .format("kafka")
            .options(**kafka_options)
            .load())

# Parse JSON with error handling
parsed_df = (kafka_df
             .select(from_json(
                 col("value").cast("string"),
                 schema
             ).alias("data"))
             .select("data.*"))

# Enhanced Hudi options for update handling
hudi_options = {
    'hoodie.table.name': 'grades',
    'hoodie.datasource.write.recordkey.field': 'id',
    'hoodie.datasource.write.partitionpath.field': 'schoolyear',
    'hoodie.datasource.write.table.name': 'grades',
    'hoodie.datasource.write.operation': 'upsert',  # Use upsert for updates
    'hoodie.datasource.write.precombine.field': 'processing_time',  # Change back to processing_time
    'hoodie.upsert.shuffle.parallelism': '2',
    'hoodie.insert.shuffle.parallelism': '2',
    'hoodie.bulkinsert.shuffle.parallelism': '2',
    'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
    'hoodie.cleaner.commits.retained': '2',
    'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
    'hoodie.write.lock.provider': 'org.apache.hudi.client.transaction.lock.InProcessLockProvider',
}

def initialize_hudi_table():
    table_path = "C:/tmp/spark_warehouse/from_kafka"
    
    if not os.path.exists(table_path) or not os.listdir(table_path):
        print("Hudi table does not exist. Initializing now...")

        empty_df = spark.createDataFrame([], hudi_schema)
        empty_df.write.format("hudi") \
            .options(**hudi_options) \
            .mode("overwrite") \
            .save(table_path)
        
        print("Hudi table initialized successfully.")
    else:
        try:
            spark.read.format("hudi").load(table_path).show(1)
            print("Hudi table exists. Proceeding...")
        except AnalysisException:
            print("Error reading Hudi table. Reinitializing...")
            empty_df = spark.createDataFrame([], hudi_schema)
            empty_df.write.format("hudi") \
                .options(**hudi_options) \
                .mode("overwrite") \
                .save(table_path)
        
        print("Hudi table initialized successfully.")

def write_to_postgres(transformed_df):
    """
    Writes the transformed Spark DataFrame to PostgreSQL using Psycopg2.
    Uses batch insert/update for efficiency.
    """
    
    db_params = {
        "dbname": "caist_db_v4",
        "user": "postgres",
        "password": "postgres",
        "host": "192.168.20.11",
        "port": "5432"
    }
    
    # Collect data from Spark DataFrame into a list of tuples
    data = [tuple(row) for row in transformed_df.collect()]
    
    if not data:
        print("No data to write.")
        return
    
    insert_query = """
        INSERT INTO processed_grades_store (
            id, schoolyear, semester, code, description, units, instructor_id, 
            instructor_name, srcode, fullname, campus, college, program, major, yearlevel, 
            curriculum, class_section, grade_final, grade_reexam, status, 
            grade_numeric, grade_classification, start_year, year_sem, program_id
        ) VALUES %s 
        ON CONFLICT (id) DO UPDATE SET 
            schoolyear = EXCLUDED.schoolyear,
            semester = EXCLUDED.semester,
            code = EXCLUDED.code,
            description = EXCLUDED.description,
            units = EXCLUDED.units,
            instructor_id = EXCLUDED.instructor_id,
            instructor_name = EXCLUDED.instructor_name,
            srcode = EXCLUDED.srcode,
            fullname = EXCLUDED.fullname,
            campus = EXCLUDED.campus,
            program = EXCLUDED.program,
            major = EXCLUDED.major,
            yearlevel = EXCLUDED.yearlevel,
            curriculum = EXCLUDED.curriculum,
            class_section = EXCLUDED.class_section,
            grade_final = EXCLUDED.grade_final,
            grade_reexam = EXCLUDED.grade_reexam,
            status = EXCLUDED.status,
            grade_numeric = EXCLUDED.grade_numeric,
            grade_classification = EXCLUDED.grade_classification,
            start_year = EXCLUDED.start_year,
            year_sem = EXCLUDED.year_sem,
            program_id = EXCLUDED.program_id;
    """
    
    try:
        with psycopg2.connect(**db_params) as conn:
            with conn.cursor() as cur:
                execute_values(cur, insert_query, data)
                conn.commit()
                print(f"Inserted/Updated {len(data)} records into PostgreSQL.")
    except Exception as e:
        print(f"Error writing to PostgreSQL: {e}")


def process_batch(batch_df, batch_id):
    try:
        if batch_df.rdd.isEmpty():
            print(f"Batch {batch_id} is empty, skipping...")
            return
        
        batch_df.cache()
        start_time = datetime.now()

        # Ensure the Hudi table is initialized before writing
        initialize_hudi_table()
        
        # Apply transformations
        transformed_df = transform(batch_df, spark)
        transformed_df.cache()
        
        try:
            # Write to Hudi
            hudi_df = transformed_df.withColumn("processing_time", current_timestamp())
            hudi_df.write \
                .format("hudi") \
                .options(**hudi_options) \
                .mode("append") \
                .save("C:/tmp/spark_warehouse/from_kafka")
            
            # Write to PostgreSQL using Psycopg2
            write_to_postgres(transformed_df)
            
            end_time = datetime.now()
            duration = (end_time - start_time).total_seconds()
            print(f"Batch {batch_id} processed in {duration:.2f} seconds")
        
        finally:
            transformed_df.unpersist()
            hudi_df.unpersist()
            batch_df.unpersist()
            
    except Exception as e:
        print(f"Error processing batch {batch_id}: {str(e)}")
        raise e


# Start streaming with monitoring
query = (parsed_df.writeStream
         .foreachBatch(process_batch)
         .outputMode("update")
         .option("checkpointLocation", "C:/tmp/spark_warehouse/checkpoints")
         .trigger(processingTime="0 seconds")
         .start())

# Enhanced monitoring
while query.isActive:
    try:
        stats = query.status
        progress = query.recentProgress
        
        print(f"""
        Stream Status: {stats['message']}
        Data Available: {stats.get('isDataAvailable', False)}
        Trigger Active: {stats.get('isTriggerActive', False)}
        Records Processed: {progress[-1]['numInputRows'] if progress else 0}
        """)
        
        query.awaitTermination(30)  # Check every 2 minutes
        
    except Exception as e:
        print(f"Monitoring error: {str(e)}")




        Stream Status: Initializing sources
        Data Available: False
        Trigger Active: False
        Records Processed: 0
        
Hudi table does not exist. Initializing now...
Hudi table initialized successfully.

        Stream Status: Processing new data
        Data Available: True
        Trigger Active: True
        Records Processed: 0
        

        Stream Status: Processing new data
        Data Available: True
        Trigger Active: True
        Records Processed: 0
        

        Stream Status: Processing new data
        Data Available: True
        Trigger Active: True
        Records Processed: 0
        

        Stream Status: Processing new data
        Data Available: True
        Trigger Active: True
        Records Processed: 0
        

        Stream Status: Processing new data
        Data Available: True
        Trigger Active: True
        Records Processed: 0
        
Inserted/Updated 49594 records into PostgreSQL.
Batch 0 processed in 168.

# CHECK CREATED HUDI TABLE

In [None]:
# Read the Hudi table
hudi_df = spark.read \
    .format("hudi") \
    .load("C:/tmp/spark_warehouse/from_kafka")

# Show schema
print("Table Schema:")
hudi_df.printSchema()

# Show sample data
print("\nSample Data:")
hudi_df.show(5)

# Get total count
print(f"\nTotal Records: {hudi_df.count()}")

# Get statistics by partition (schoolyear)
print("\nRecords by Schoolyear:")
hudi_df.groupBy("schoolyear").count().show()