In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col

# Start Spark
spark = SparkSession.builder.appName("IncrementalLoadDemo").getOrCreate()

# 🟢 Simulate the source data (e.g., new incoming data)
source_data = [
    (1, "Alice"),
    (2, "Bob"),
    (3, "Charlie"),
    (4, "David"),
    (5, "Eve"),
    (6, "Frank"),
    (7, "Grace")
]

# 🔵 Simulate the target (already loaded) data
target_data = [
    (1, "Alice"),
    (2, "Bob"),
    (3, "Charlie")
]

# Define schema
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
])

# Create DataFrames
source_df = spark.createDataFrame(source_data, schema)
target_df = spark.createDataFrame(target_data, schema)

# Show both
print("🔵 Target Data:")
target_df.show()

print("🟢 Source Data:")
source_df.show()


🔵 Target Data:
+---+-------+
| id|   name|
+---+-------+
|  1|  Alice|
|  2|    Bob|
|  3|Charlie|
+---+-------+

🟢 Source Data:
+---+-------+
| id|   name|
+---+-------+
|  1|  Alice|
|  2|    Bob|
|  3|Charlie|
|  4|  David|
|  5|    Eve|
|  6|  Frank|
|  7|  Grace|
+---+-------+



Save max_id to log file

In [2]:
# prompt: implement python function that is going to read the txt file and return last line in the file

def get_max_id():
  try:
    with open("/content/log.txt", 'r') as f:
      lines = f.readlines()
      if lines:
        return lines[-1].strip() # Use strip() to remove potential newline characters
      else:
        return None
  except FileNotFoundError:
    print(f"Error: File not found at /content/log.txt")
    return None
  except Exception as e:
    print(f"An error occurred: {e}")
    return None


Load max_id from log file

In [3]:
def save_message_to_file(message):
    try:
        with open("/content/log.txt", 'a') as file:
            file.write(message + '\n')
        print(f"Message saved to /content/log.txt")
    except Exception as e:
        print(f"An error occurred while saving the message: {e}")


In [5]:
save_message_to_file("3")

Message saved to /content/log.txt


Change data capture

In [14]:
max_id = int(get_max_id())
changes_df = source_df.filter(source_df.id > max_id)
changes_df.cache()
changes_df.show()

+---+-----+
| id| name|
+---+-----+
|  4|David|
|  5|  Eve|
|  6|Frank|
|  7|Grace|
+---+-----+



In [15]:
target_df = target_df.union(changes_df)
max_id = target_df.agg({"id": "max"}).collect()[0][0]
target_df.show()
save_message_to_file(str(max_id))

+---+-------+
| id|   name|
+---+-------+
|  1|  Alice|
|  2|    Bob|
|  3|Charlie|
|  4|  David|
|  5|    Eve|
|  6|  Frank|
|  7|  Grace|
+---+-------+

Message saved to /content/log.txt


In [16]:
target_df.show()

+---+-------+
| id|   name|
+---+-------+
|  1|  Alice|
|  2|    Bob|
|  3|Charlie|
|  4|  David|
|  5|    Eve|
|  6|  Frank|
|  7|  Grace|
+---+-------+



In [20]:
source_data = [
    (1, "Alice"),
    (2, "Bob"),
    (3, "Charlie"),
    (4, "David"),
    (5, "Eve"),
    (6, "Frank"),
    (7, "Grace"),
    (8, "Henry"),
    (9, "Ivy"),
    (10, "Jack"),
    (11, "Kelly"),
    (12, "Liam"),
    (13, "Mia"),
    (14, "Noah"),
    (15, "Olivia")
]
source_df = spark.createDataFrame(source_data, schema)

In [21]:
max_id = int(get_max_id())
changes_df = source_df.filter(source_df.id > max_id)
changes_df.cache()
changes_df.show()
target_df = target_df.union(changes_df)
max_id = target_df.agg({"id": "max"}).collect()[0][0]
target_df.show()
save_message_to_file(str(max_id))

+---+------+
| id|  name|
+---+------+
|  8| Henry|
|  9|   Ivy|
| 10|  Jack|
| 11| Kelly|
| 12|  Liam|
| 13|   Mia|
| 14|  Noah|
| 15|Olivia|
+---+------+

+---+-------+
| id|   name|
+---+-------+
|  1|  Alice|
|  2|    Bob|
|  3|Charlie|
|  4|  David|
|  5|    Eve|
|  6|  Frank|
|  7|  Grace|
|  8|  Henry|
|  9|    Ivy|
| 10|   Jack|
| 11|  Kelly|
| 12|   Liam|
| 13|    Mia|
| 14|   Noah|
| 15| Olivia|
+---+-------+

Message saved to /content/log.txt
