In [None]:
# Define the output file path and default timestamp path
ts_file_path = "Files/adls2_fabricoutput/ts_emailclicked"
output_file_path = "Files/adls2_fabricoutput/emailclicked"

#cut off date the data ingestion CIJ data, any record before this date will be ignored
cutoffdate = "2024-05-09 09:46:55"

# Define the output format (either 'csv' or 'json')
#output_format = 'csv'  # Change this to 'json' if you want JSON output
output_format = 'json'  # Change this to 'json' if you want JSON output

In [None]:
from notebookutils import mssparkutils
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("ADLS2 Example") \
    .getOrCreate()



In [None]:
from pyspark.sql.utils import AnalysisException
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType


# Function to check if the file exists
def file_exists(path):
    try:
        # Try to read the file
        ts_df = spark.read.text(path)
        return ts_df
    except AnalysisException:
        # If the file does not exist or there's a read failure
        return None
    


# Function to write or read timestamp
def writeReadTS(ts_file_path, cutoffdate):
    ts_df = file_exists(ts_file_path)
    if ts_df is None:
        # Create a DataFrame with the default timestamp value
        data = [(cutoffdate,)]
        ts_df = spark.createDataFrame(data, ["timestamp"])
        
        # Write the DataFrame to a single TXT file in ADLS2
        ts_df.coalesce(1).write.mode("overwrite").text(ts_file_path)
        
        print(f"File created with default timestamp: {cutoffdate}")
    else:
        print("File already exists or read successfully.")
    
    return ts_df



In [None]:
# Get last stored timestamp or create a default one for 1st time
ts_df = writeReadTS(ts_file_path, cutoffdate)

# Define the schema for reading the text file
schema = StructType([StructField("timestamp", StringType(), True)])

# Read the text file back into a DataFrame with the specified schema
ts_df = spark.read.schema(schema).text(ts_file_path)

# Collect the DataFrame to a list of Row objects
rows = ts_df.collect()

# Extract the timestamp value from the first row
last_ts = rows[0]["timestamp"]

print(f"last_ts = {last_ts}")




In [None]:
from pyspark.sql.functions import col, max

# Filter the DataFrame to include only records with Timestamp > '2024-05-09 09:46:55'
filtered_df = spark.read.parquet("Files/EmailClicked").filter(col("Timestamp") > last_ts)

# Check if the DataFrame is empty
if filtered_df.count() == 0:
    print("filtered_df is empty. Exiting notebook with positive finish state.")
    mssparkutils.notebook.exit("Success, filtered_df is empty")

# Deduplicate the DataFrame
print("before dedup count = ", filtered_df.count())
dup_filtered_df = filtered_df.dropDuplicates()
print("after dedup count = ", dup_filtered_df.count())

# Get the maximum value of the Timestamp column
if dup_filtered_df.count() > 0:
    newlast_ts = dup_filtered_df.agg(max(col("Timestamp"))).collect()[0][0]
    print("Last Timestamp: ", newlast_ts)
else:
    print("dup_filtered_df DataFrame is empty after deduplication.")
    mssparkutils.notebook.exit("Success, dup_filtered_df is empty")    

if not isinstance(newlast_ts, str):
    newlast_ts = str(newlast_ts)

print("newlast_ts", newlast_ts)

# Show the filtered DataFrame
#dup_filtered_df.show()

In [None]:
#Sample code for manipulating columns

# Get a list of all column names
#columns = dup_filtered_df.columns
#print("Columns: ", columns)

# Rename columns
#df = dup_filtered_df.withColumnRenamed("old_name1", "new_name1")    

# Select only the "ActivityId" and "Timestamp" columns
#selected_df = dup_filtered_df.select("ActivityId", "Timestamp")

# Show the DataFrame with only the selected columns
#selected_df.show()

In [None]:
# Just for debug
# Print the count
print("filtered_record_count:", filtered_df.count())
print("dedup_record_count:", dup_filtered_df.count())


In [None]:
# append record to ADLS2
from datetime import datetime

# Get the current timestamp
current_timestamp = datetime.now()

# Format the timestamp for the folder structure
year = current_timestamp.strftime("%Y")
month = current_timestamp.strftime("%m")
day = current_timestamp.strftime("%d")

# Define the output path with the formatted timestamp
output_path = f"{output_file_path}/{year}/{month}/{day}/"

# Export the filtered DataFrame to the specified format with the dynamic filename
if output_format == 'csv':
    dup_filtered_df.coalesce(1).write.mode("append").csv(output_path, header=True)
elif output_format == 'json':
    dup_filtered_df.coalesce(1).write.mode("append").json(output_path)

# Print the output path
print("Output path:", output_path)

In [None]:
# save the new timestamp

data = [(newlast_ts,)]
newts_df = spark.createDataFrame(data, ["timestamp"])

# Write the DataFrame to a single TXT file in ADLS2
newts_df.coalesce(1).write.mode("overwrite").text(ts_file_path)




In [None]:
# just for debug
# Validate the records in db
# Load the exported CSV into another DataFrame
if output_format == 'csv':
    loaded_df = spark.read.csv(output_path, header=True, inferSchema=True)
elif output_format == 'json':
    loaded_df = spark.read.json(output_path)


# Validate the count
if loaded_df.count() == dup_filtered_df.count():
    print("Validation successful! Record counts match.")
else:
    print(f"Validation failed! Filtered record count: {dup_filtered_df.count()}, Loaded record count: {loaded_df.count()}")

In [None]:
# just for debug
# Validate the timestamp records in db
# Define the schema for reading the text file
schema = StructType([StructField("timestamp", StringType(), True)])

# Read the text file back into a DataFrame with the specified schema
newts_df = spark.read.schema(schema).text(ts_file_path)

# Collect the DataFrame to a list of Row objects
rows = newts_df.collect()

# Extract the timestamp value from the first row
read_newlast_ts = rows[0]["timestamp"]

print(f"read newlast_ts= {read_newlast_ts}")

print(f"newlast_ts = {newlast_ts}")


In [None]:
#If Fabric Capacity is F2/F4 and you have more than one concurrently session
# you need to unremark mssparkutils.session.stop() this line to let Fabric terminate one session before start another one
# You need to wait for new session startup if you stop the session

# release session resources
# mssparkutils.session.stop()