## Necessary imports

In [None]:
from delta import *
from video_rppg_extraction import VideoRppgExtraction
from signal_processer import SignalProcessor
from pyspark.sql.types import *
from pyspark.sql.functions import col, max
import pyspark
from datetime import datetime, timedelta
import numpy as np
from minio import Minio
import time
import subprocess
from settings import RAW_BUCKET_NAME, S3A_ENDPOINT, LANDING_BUCKET_NAME

## Build spark session

In [8]:

builder = pyspark.sql.SparkSession.builder \
    .appName("MyApp") \
    .master("local[*]") \
    .config("spark.jars.packages", "org.apache.hadoop.fs.s3a.S3AFileSystem, org.apache.hadoop.hadoop-aws") \
    .config("spark.driver.memory", "10g") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.warehouse.dir", "delta") \
    .config("spark.hadoop.fs.s3a.endpoint", S3A_ENDPOINT) \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", True) \
    .config("spark.hadoop.fs.s3a.fas.upload", True) \
    .config("spark.hadoop.fs.s3a.multipart.size", 104857608) \
    .config("fs.s3a.connection.maximum", 100) \
    .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")  \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.databricks.delta.properties.defaults.enableChangeDataFeed", "true") \

spark = configure_spark_with_delta_pip(builder, ["org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1", "org.apache.hadoop:hadoop-aws:3.3.1"]).getOrCreate()

client = Minio("localhost:9000", access_key="minioadmin", secret_key="minioadmin", secure=False)

Ivy Default Cache set to: /Users/cunha/.ivy2/cache
The jars for the packages stored in: /Users/cunha/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-dbb061ce-2151-460d-854c-4941d1262cc4;1.0
	confs: [default]


:: loading settings :: url = jar:file:/Users/cunha/opt/anaconda3/envs/delta-lake-env/lib/python3.7/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found io.delta#delta-core_2.12;2.2.0 in central
	found io.delta#delta-storage;2.2.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.1 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
	found org.apache.hadoop#hadoop-aws;3.3.1 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.901 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Fin

23/05/31 15:20:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


NameError: name 'ACCESS_KEY' is not defined

## Initalization and Table Creation

In [9]:
# Initialize necessary classes
video_extractor = VideoRppgExtraction()
signal_processer = SignalProcessor()

# Define schema for DataFrame
filtered_bvp_schema = StructType(
    [
        StructField("filtered_bvp", ArrayType(DoubleType())),
        StructField("fps", DoubleType()),
        StructField("start_timestamp", TimestampType()),
        StructField("end_timestamp", TimestampType()),
        StructField("subject_uuid", StringType()),
        StructField("video", StringType())
    ]
)
(
DeltaTable.createIfNotExists(spark)
    .location("s3a://trusted-zone/trusted_filtered_bvp_signal")
    .tableName("trusted_filtered_bvp_signal")
    .addColumns(filtered_bvp_schema)
    .execute()
) 


#spark.sql("ALTER TABLE default.filtered_bvp_signal SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")


trusted_heartpy_measures_schema = StructType([
    StructField("bpm", DoubleType(), True),
    StructField("breathingrate", DoubleType(), True),
    StructField("end_timestamp", TimestampType(), True),
    StructField("hr_mad", DoubleType(), True),
    StructField("ibi", DoubleType(), True),
    StructField("pnn20", DoubleType(), True),
    StructField("pnn50", DoubleType(), True),
    StructField("rmssd", DoubleType(), True),
    StructField("s", DoubleType(), True),
    StructField("sd1", DoubleType(), True),
    StructField("sd1/sd2", DoubleType(), True),
    StructField("sd2", DoubleType(), True),
    StructField("sdnn", DoubleType(), True),
    StructField("sdsd", DoubleType(), True),
    StructField("start_timestamp", TimestampType(), True),
    StructField("subject_uuid", StringType(), True),
    StructField("video", StringType(), True)
])
(DeltaTable.createIfNotExists(spark)
    .location("s3a://trusted-zone/trusted_heartpy_measures")
    .tableName("trusted_heartpy_measures")
    .addColumns(trusted_heartpy_measures_schema)
    .execute()
)


trusted_working_data_schema = StructType([
    StructField("RR_diff", ArrayType(DoubleType(), True), True),
    StructField("RR_list", ArrayType(DoubleType(), True), True),
    StructField("RR_sqdiff", ArrayType(DoubleType(), True), True),
    StructField("breathing_frq", ArrayType(DoubleType(), True), True),
    StructField("breathing_psd", ArrayType(DoubleType(), True), True),
    StructField("breathing_signal", ArrayType(DoubleType(), True), True),
    StructField("end_timestamp", TimestampType(), True),
    StructField("hr", ArrayType(DoubleType(), True), True),
    StructField("nn20", ArrayType(DoubleType(), True), True),
    StructField("nn50", ArrayType(DoubleType(), True), True),
    StructField("removed_beats_y", ArrayType(DoubleType(), True), True),
    StructField("rolling_mean", ArrayType(DoubleType(), True), True),
    StructField("rrsd", DoubleType(), True),
    StructField("sample_rate", DoubleType(), True),
    StructField("start_timestamp", TimestampType(), True),
    StructField("subject_uuid", StringType(), True),
    StructField("video", StringType(), True)
])
(
DeltaTable.createIfNotExists(spark)
    .location("s3a://trusted-zone/trusted_working_data")
    .tableName("trusted_working_data")
    .addColumns(trusted_working_data_schema)
    .execute()
)


refined_breathing_data_schema = StructType([
    StructField('breathing_frq', ArrayType(DoubleType()), True),
    StructField('breathing_psd', ArrayType(DoubleType()), True),
    StructField('breathing_rate', LongType(), True),
    StructField('breathing_signal', ArrayType(DoubleType()), True),
    StructField('end_timestamp', TimestampType(), True),
    StructField('start_timestamp', TimestampType(), True),
    StructField('subject_uuid', StringType(), True),
    StructField('video', StringType(), True),
])
(
DeltaTable.createIfNotExists(spark)
    .location("s3a://refined-zone/refined_breathing_data")
    .tableName("refined_breathing_data")
    .addColumns(refined_breathing_data_schema)
    .execute()
)


refined_psd_frequencies_data_schema = StructType([
    StructField("freq_ULF", ArrayType(DoubleType()), True),
    StructField("power_ULF", ArrayType(DoubleType()), True),
    StructField("freq_VLF", ArrayType(DoubleType()), True),
    StructField("power_VLF", ArrayType(DoubleType()), True),
    StructField("freq_LF", ArrayType(DoubleType()), True),
    StructField("power_LF", ArrayType(DoubleType()), True),
    StructField("freq_HF", ArrayType(DoubleType()), True),
    StructField("power_HF", ArrayType(DoubleType()), True),
    StructField("freq_VHF", ArrayType(DoubleType()), True),
    StructField("power_VHF", ArrayType(DoubleType()), True),
    StructField("start_timestamp", TimestampType(), True),
    StructField("end_timestamp", TimestampType(), True),
    StructField("subject_uuid", StringType(), True),
    StructField("video", StringType(), True)
])
(
DeltaTable.createIfNotExists(spark)
    .location("s3a://refined-zone/refined_psd_frequencies_data")
    .tableName("refined_psd_frequencies_data")
    .addColumns(refined_psd_frequencies_data_schema)
    .execute()
) 


refined_rri_histogram_data_schema = StructType([
    StructField("bins", ArrayType(DoubleType(), containsNull=True)),
    StructField("counts", ArrayType(LongType(), containsNull=True)),
    StructField("end_timestamp", TimestampType(), nullable=True),
    StructField("rri", ArrayType(DoubleType(), containsNull=True)),
    StructField("start_timestamp", TimestampType(), nullable=True),
    StructField("subject_uuid", StringType(), nullable=True),
    StructField("video", StringType(), nullable=True)
])
(
DeltaTable.createIfNotExists(spark)
    .location("s3a://refined-zone/refined_rri_histogram_data")
    .tableName("refined_rri_histogram_data")
    .addColumns(refined_rri_histogram_data_schema)
    .execute()
)


refined_hr_data_schema = StructType(
    [
        StructField("hr", ArrayType(DoubleType())),
        StructField("fps", DoubleType()),
        StructField("bpm", LongType()),
        StructField("start_timestamp", TimestampType()),
        StructField("end_timestamp", TimestampType()),
        StructField("subject_uuid", StringType()),
        StructField("video", StringType()),
    ]
)
(
DeltaTable.createIfNotExists(spark)
    .location("s3a://refined-zone/refined_hr_data")
    .tableName("refined_hr_data")
    .addColumns(refined_hr_data_schema)
    .execute()
)

trusted_sessions_schema = StructType(
    [
        StructField("id", LongType()),
        StructField("start_timestamp", TimestampType()),
        StructField("end_timestamp", TimestampType()),
        StructField("subject_uuid", StringType()),
        StructField("video", StringType()),
    ]
)
(
DeltaTable.createIfNotExists(spark)
    .location("s3a://trusted-zone/trusted_sessions")
    .tableName("trusted_sessions")
    .addColumns(trusted_sessions_schema)
    .execute()
)

23/05/31 15:21:59 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
23/05/31 15:22:01 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

<delta.tables.DeltaTable at 0x7ff409323f50>

## Function that processed the video to extract BVP Signal using rPPG

In [None]:
# Extract BVP signal from a video
def process_video(video_obj):
    print("Processing: ", video_obj.object_name)

    subject_uuid = str(video_obj.object_name).split("_")[0]
    tmp_video_path = "/tmp/" + str(video_obj.object_name)

    client.fget_object(RAW_BUCKET_NAME, str(video_obj.object_name), tmp_video_path)

    # TODO: Decrypt video first

    filtered_bvp, fps, duration = video_extractor.extract_bvp_signal(tmp_video_path)

    # Map all values from np.float64 to python float
    filtered_bvp = list(map(float, filtered_bvp))

    # Create and store data frame into a delta table
    start_timestmap = datetime.now()
    data = {
        "filtered_bvp": filtered_bvp,
        "fps": fps,
        "start_timestamp": start_timestmap,
        "end_timestamp": start_timestmap+timedelta(seconds=duration),
        "subject_uuid": subject_uuid,
        "video": video_obj.object_name,
    }
    filtered_bvp_df = spark.createDataFrame(data=[data])
    filtered_bvp_df.write.format("delta").mode("append").save(
        "s3a://trusted-zone/trusted_filtered_bvp_signal"
    )

    # Get the maximum ID from the table
    session_df = spark.read.format("delta").load("s3a://trusted-zone/trusted_sessions")
    if session_df.rdd.isEmpty():
        max_id = -1
    else:
        max_id = session_df.select(max(col("id"))).collect()[0][0]

    # Add a new session row to the sessions table
    session_data = {
        "id": max_id + 1,
        "start_timestamp": start_timestmap,
        "end_timestamp": start_timestmap+timedelta(seconds=duration),
        "subject_uuid": subject_uuid,
        "video": video_obj.object_name,
    }
    session_data_df = spark.createDataFrame(data=[session_data])
    session_data_df.write.format("delta").mode("append").save(
        "s3a://trusted-zone/trusted_sessions"
    )

    subprocess.call(["rm", tmp_video_path])
    print("Processing over")

## Waiting for a new BVP Signal
### Then processes it and stores measures and working data into new delta tables

In [None]:
filtered_bvp_signal_stream_changes_df = (
    spark.readStream
		.format("delta") 
    	.option("readChangeFeed", "true") 
	    .option("startingVersion", "latest")
	    .option("path", "s3a://trusted-zone/trusted_filtered_bvp_signal")
	    .load()
)

def process_batch(df, epoch_id):
	# Collect all rows as a list of Rows
	rows = df.collect()

	# Iterate over the list of Rows and do the processing
	for row in rows:
		# Process extracted BVP signal (BPM, HRV ...)
		tmp_working_data, measures = signal_processer.process_bvp_signal(row["filtered_bvp"], row["fps"])

		# Change data types of working data, to be compatible with spark df
		working_data = dict()
		for key in list(tmp_working_data.keys()):
			if isinstance(tmp_working_data[key], np.ndarray):
				if isinstance(tmp_working_data[key][0], np.float):
					working_data[key] = [float(v) for v in tmp_working_data[key]]
				elif isinstance(tmp_working_data[key][0], np.int):
					working_data[key] = [int(v) for v in tmp_working_data[key]]
			elif isinstance(tmp_working_data[key], np.float):
				working_data[key] = float(tmp_working_data[key])


		# CREATE MEASURES DATA FRAME
		measures["start_timestamp"] = row["start_timestamp"]
		measures["end_timestamp"] = row["end_timestamp"]
		measures["subject_uuid"] = row["subject_uuid"]
		measures["video"] = row["video"]
		measures_df = spark.createDataFrame(data=[measures])
		# --------------------


		# CREATE WORKING_DATA DATA FRAME
		working_data["start_timestamp"] = row["start_timestamp"]
		working_data["end_timestamp"] = row["end_timestamp"]
		working_data["subject_uuid"] = row["subject_uuid"]
		working_data["video"] = row["video"]
		working_data_df = spark.createDataFrame(data=[working_data])
		# --------------------


		# CREATE BREATHING SIGNAL DATA FRAME
		breathing_data = dict()
		breathing_data["start_timestamp"] = row["start_timestamp"]
		breathing_data["end_timestamp"] = row["end_timestamp"]
		breathing_data["subject_uuid"] = row["subject_uuid"]
		breathing_data["video"] = row["video"]
		breathing_data["breathing_signal"] = working_data["breathing_signal"]
		breathing_data["breathing_psd"] = working_data["breathing_psd"]
		breathing_data["breathing_frq"] = working_data["breathing_frq"]
		breathing_data["breathing_rate"] = round(60 / (1 / measures["breathingrate"])) #measures["breathingrate"]				# manter trusted ou refined? TODO:
		# TODO: Have the index as a column too. To build the plot with no transfomations needed?		(datetime.datetime)			????
		breathing_data_df = spark.createDataFrame(data=[breathing_data])
		# --------------------


		# CREATE HR DATA FRAME
		hr_data = dict()
		hr_data["start_timestamp"] = row["start_timestamp"]
		hr_data["end_timestamp"] = row["end_timestamp"]
		hr_data["subject_uuid"] = row["subject_uuid"]
		hr_data["video"] = row["video"]
		hr_data["hr"] = working_data["hr"]
		hr_data["fps"] = row["fps"]
		hr_data["bpm"] = round(measures["bpm"]) #measures["bpm"]
		hr_data_df = spark.createDataFrame(data=[hr_data])
		# --------------------


		# CREATE RRI HISTOGRAM DATAFRAME
		n_bins = int(np.ceil(np.log2(len(working_data["RR_list"]))) + 1)
		counts, bins = np.histogram(working_data["RR_list"], n_bins)
		
		rri_histogram_data = dict()
		rri_histogram_data["start_timestamp"] = row["start_timestamp"]
		rri_histogram_data["end_timestamp"] = row["end_timestamp"]
		rri_histogram_data["subject_uuid"] = row["subject_uuid"]
		rri_histogram_data["video"] = row["video"]
		rri_histogram_data["rri"] = working_data["RR_list"]
		rri_histogram_data["counts"] = [int(c) for c in counts]
		rri_histogram_data["bins"] = [float(b) for b in bins]
		rri_histogram_data_df = spark.createDataFrame(data=[rri_histogram_data])
		# --------------------


		# CREATE PSD FREQUENCIES DATAFRAME
		freq, power, frequency_band_index, labels = signal_processer.get_psd_frequencies(tmp_working_data["peaklist"], tmp_working_data["RR_list"], row["fps"])

		psd_frequencies_data = dict()
		for band_index, label in zip(frequency_band_index, labels):

			freq_key = "freq_"+str(label)
			psd_frequencies_data[freq_key] = [float(v) for v in freq[band_index]]

			power_key = "power_"+str(label)
			psd_frequencies_data[power_key] = [float(v) for v in power[band_index]]

		""" psd_frequencies_data["freq"] = [float(v) for v in data["freq"]]
		psd_frequencies_data["power"] = [float(v) for v in data["power"]]
		for i in range(len(data["frequency_band_index"])):
			psd_frequencies_data["frequency_band_index"] = [bool(v) for v in data["frequency_band_index"][i]]
		psd_frequencies_data["labels"] = data["labels"] """
		#psd_frequencies_data["freq"] = freq_list
		#psd_frequencies_data["power"] = power_list
		#psd_frequencies_data["labels"] = label_list
		psd_frequencies_data["start_timestamp"] = row["start_timestamp"]
		psd_frequencies_data["end_timestamp"] = row["end_timestamp"]
		psd_frequencies_data["subject_uuid"] = row["subject_uuid"]
		psd_frequencies_data["video"] = row["video"]
		psd_frequencies_data_df = spark.createDataFrame(data=[psd_frequencies_data], schema=refined_psd_frequencies_data_schema)
		# --------------------

		# TODO: Create a data frame with the nk.hrv results? 

		# Store results into a delta table
		measures_df.write.format("delta").mode("append").save("s3a://trusted-zone/trusted_heartpy_measures")
		working_data_df.write.format("delta").mode("append").save("s3a://trusted-zone/trusted_working_data")
		breathing_data_df.write.format("delta").mode("append").save("s3a://refined-zone/refined_breathing_data")
		hr_data_df.write.format("delta").mode("append").save("s3a://refined-zone/refined_hr_data")
		rri_histogram_data_df.write.format("delta").mode("append").save("s3a://refined-zone/refined_rri_histogram_data")
		psd_frequencies_data_df.write.format("delta").mode("append").save("s3a://refined-zone/refined_psd_frequencies_data")

filtered_bvp_signal_stream_changes_df.writeStream.foreachBatch(process_batch).start()

### Listens for changes in "rppg_hrv_parameters" and prepares a new table for the dashboard

In [None]:
""" from pyspark.sql.functions import col, round

measures_stream_changes_df = (
    spark.readStream.format("delta").option("readChangeFeed", "true")
    .option("startingVersion", "latest")
    .load("s3a://trusted-zone/heartpy_measures")
)


def process_rppg_hrv_parameters(df, _):
    df = (
        df.withColumn(
            "breathingrate", round(60 / (1 / col("breathingrate"))).cast("Integer")
        )
        .withColumn("bpm", round(col("bpm")).cast("Integer"))
        .select(
            "bpm", "breathingrate", "end_timestamp", "start_timestamp", "subject_uuid"				# TODO: Add Video column
        )
    )

    df.write.format("delta").mode("append").save("s3a://refined-zone/heartpy_measures")



write_heartpy_measures_stream = (
    measures_stream_changes_df.writeStream
    .foreachBatch(process_rppg_hrv_parameters)
    .start()
) """


<!-- ## Listens for changes in the BVP Signal table and in the Working Data table
### Then joins the two changes of each table into one
#### Used for HR Time Series Plot -->

In [None]:
""" filtered_bvp_signal_stream_changes_df = (
    spark.readStream
		.format("delta") 
    	.option("readChangeFeed", "true") 
	    .option("startingVersion", "latest")
    	.load("s3a://trusted-zone/filtered_bvp_signal")
        .withWatermark("start_timestamp", "10 minutes")
)


working_data_stream_changes_df = (
    spark.readStream
		.format("delta") 
    	.option("readChangeFeed", "true") 
	    .option("startingVersion", "latest")
    	.load("s3a://trusted-zone/working_data")
		.withWatermark("start_timestamp", "10 minutes")
)

joined_stream = filtered_bvp_signal_stream_changes_df.join(working_data_stream_changes_df, ["start_timestamp", "end_timestamp", "subject_uuid", "video"])

def write_to_hr_data(df, _):
	df.show()
	df.write.format("delta").mode("append").save("s3a://refined-zone/hr_data")


write_joined_stream = joined_stream.select("hr", "fps", "start_timestamp", "end_timestamp", "subject_uuid", "video").writeStream.foreachBatch(write_to_hr_data).start()


working_data_stream_changes_df = (
    spark.readStream
		.format("delta") 
    	.option("readChangeFeed", "true") 
	    .option("startingVersion", "latest")
    	.load("s3a://trusted-zone/working_data")
		.withWatermark("start_timestamp", "10 minutes")
) """

""" def write_to_hr_data(df, _):
	df.show()

	# Maybe:  (fazer distribuição dos hr por timestamps)
	
	#	duration = end_timestamp - start_timestamp
	#	time_interval = duration / len(hr_list)


	df.write.format("delta").mode("append").save("s3a://refined-zone/hr_data")

working_data_stream_changes_df.select("hr", "start_timestamp", "end_timestamp", "subject_uuid", "video").writeStream.foreachBatch(write_to_hr_data).start() """

## Infinite Loop waiting for new files (videos) to arrive at the raw bucket

In [None]:
""" already_processed = []
objects = client.list_objects(RAW_BUCKET_NAME)
for obj in objects:
    already_processed.append(obj.etag) """      
    
    #TODO: REMOVE

try:
    most_recent_object = None
    most_recent_time = None
    while True:
        # List all objects with prefix
        objects = client.list_objects(RAW_BUCKET_NAME)

        # Get the most recent object                                    # TODO: Problem if two videos are added why most recent?
        for obj in objects:
            if most_recent_time is None or obj.last_modified > most_recent_time:
                most_recent_object = obj
                most_recent_time = obj.last_modified

        if most_recent_object is not None:
            file_extension = str(obj.object_name).split(".")[-1]

            if file_extension in ["avi", "mov", "mp4"] and obj.etag not in already_processed:
                process_video(most_recent_object)
                #print("process", most_recent_object.object_name)
                most_recent_object = None
            
        # Wait for new events
        time.sleep(5)
except KeyboardInterrupt:
    # Stop watching on keyboard interrupt
    spark.stop()
    pass """

In [None]:
def process_new_obj(obj):
    print(f"New file: {obj.object_name}")

    # TODO: Encrypt file (cant be done with snakebite, because the file contents cannot be read)
    # ...

    subject_uuid, extension = str(obj.object_name).split(".")
    filename_date = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
    new_filename = subject_uuid+"_"+filename_date+"."+extension

    client.copy_object(
        RAW_BUCKET_NAME,
        new_filename,
        CopySource(LANDING_BUCKET_NAME, str(obj.object_name)),
    ) 
    client.remove_object(LANDING_BUCKET_NAME, str(obj.object_name))
    print(f"File processed: {obj.object_name}")

try:
    while True:
        # List all objects
        objects = client.list_objects(LANDING_BUCKET_NAME)

        for obj in objects:
            file_extension = str(obj.object_name).split(".")[-1]

            if file_extension in ["avi", "mov", "mp4"]:
                process_new_obj(obj)
                process_video(obj)
            
        # Wait for new events
        time.sleep(5)
except KeyboardInterrupt:
    # Stop watching on keyboard interrupt
    pass

In [None]:
delta_table = DeltaTable.forPath(spark, "s3a://refined-zone/refined_hr_data")
df = delta_table.toDF()
df.printSchema()
df.show()

In [None]:
delta_table = DeltaTable.forPath(spark, "s3a://trusted-zone/trusted_working_data")
df = delta_table.toDF()
df.printSchema()
df.show()

In [None]:
delta_table = DeltaTable.forPath(spark, "s3a://refined-zone/refined_breathing_data")
df = delta_table.toDF()
df.printSchema()
df.show()

In [None]:
delta_table = DeltaTable.forPath(spark, "s3a://refined-zone/refined_psd_frequencies_data")
df = delta_table.toDF()
df.printSchema()
df.show(truncate=False)

In [None]:
delta_table = DeltaTable.forPath(spark, "s3a://refined-zone/refined_rri_histogram_data")
df = delta_table.toDF()
df.printSchema()
df.show()

In [None]:
delta_table = DeltaTable.forPath(spark, "s3a://trusted-zone/trusted_sessions")
df = delta_table.toDF()
df.printSchema()
df.show(truncate=False)

In [10]:
delta_table = DeltaTable.forPath(spark, "s3a://refined-zone/refined_hr_data")
df = delta_table.toDF()

# TODO: EXAMPLE OF DATA QUERY WITH SPARK SQL AND DATAFRAME


df.show()

[Stage 48:>                                                         (0 + 1) / 1]

+---------+
|   string|
+---------+
|test_text|
+---------+



                                                                                