# Consumer for Performing Pyrometry Analysis
This consumes the hyprespectral camera data streamed from the initial producer and analyzes it to determine temperatures and emissivities. The code is structured off of notebooks at github.com/openmsi/openmsistream_short_course.

## Config

In [1]:
# imports
import pathlib, importlib, logging, datetime, json, platform
from threading import Thread
from openmsitoolbox.logging import OpenMSILogger
from openmsistream import (
    DataFileDownloadDirectory,
    DataFileStreamProcessor,
    MetadataJSONReproducer,
)

In [2]:
# Configure a logger (only needed when running in a Jupyter notebook like this)
logger = OpenMSILogger("OpenMSIConsumers", filelevel=None)
importlib.reload(logging)

<module 'logging' from '/Users/namanparikh/opt/anaconda3/envs/openmsi/lib/python3.9/logging/__init__.py'>

In [3]:
# The name of the topic to consume files from
CONSUMER_TOPIC_NAME = "tutorial_data"

# Path to the root directory of this repo
repo_root_dir = pathlib.Path().resolve().parent

## Consuming to the Local Filesystem

In [4]:
def download_task(download_directory):
    """Run "reconstruct" for a given DataFileDownloadDirectory, and log some messages
    when it gets shut down

    Args:
        download_directory (DataFileDownloadDirectory): the DataFileDownloadDirectory to run
    """
    # This call to "reconstruct" waits until the program is shut down
    (
        n_read,
        n_processed,
        n_complete_files,
        complete_filepaths,
    ) = download_directory.reconstruct()
    download_directory.close()
    msg = f"{n_read} total messages were consumed"
    if len(complete_filepaths) > 0:
        msg += (
            f", {n_processed} messages were successfully processed, and "
            f'{n_complete_files} file{" was" if n_complete_files==1 else "s were"} '
            "successfully reconstructed"
        )
    else:
        msg += f" and {n_processed} messages were successfully processed"
    msg += (
        f". Most recent completed files (up to {download_directory.N_RECENT_FILES}):\n\t"
    )
    msg += "\n\t".join([str(filepath) for filepath in complete_filepaths])
    download_directory.logger.info(msg)

In [5]:
# Paths to the config file and the directory holding the test files
CONFIG_FILE_PATH = repo_root_dir / "streaming" / "config_files" / "confluent_cloud_broker.config"
TEST_RECO_DIR = repo_root_dir / "streaming" / "reconstructed_test_folder"

In [6]:
CONFIG_FILE_PATH

PosixPath('/Users/namanparikh/Documents/GitHub/paradim/reu2024-hyperspectral-camera/streaming/config_files/confluent_cloud_broker.config')

In [7]:
TEST_RECO_DIR

PosixPath('/Users/namanparikh/Documents/GitHub/paradim/reu2024-hyperspectral-camera/streaming/reconstructed_test_folder')

In [18]:
# Create the DataFileDownloadDirectory
dfdd = DataFileDownloadDirectory(
    TEST_RECO_DIR,
    CONFIG_FILE_PATH,
    CONSUMER_TOPIC_NAME,
    logger=logger,
)
# Start running its "reconstruct" function in a separate thread
download_thread = Thread(
    target=download_task,
    args=(dfdd,),
)
download_thread.start()

[OpenMSIConsumers 2024-07-18 09:38:42] Will reconstruct files from messages in the tutorial_data topic using 2 threads
[OpenMSIConsumers 2024-07-18 09:38:45] 2 total messages were consumed, 2 messages were successfully processed, and 2 files were successfully reconstructed. Most recent completed files (up to 50):
	test_data.txt
	.ipynb_checkpoints/test_data-checkpoint.txt


In [21]:
# Manually shut down the download directory (if running from the command line this would
# be like typing "q" in the Terminal window)
dfdd.control_command_queue.put("q")
download_thread.join()

## Stream Processor

In [13]:
class PlaceholderStreamProcessor(DataFileStreamProcessor):
    """Performs a placeholder task (writing out a file to the local system) for every
    data file reconstructed from a topic
    """

    def _process_downloaded_data_file(self, datafile, lock):
        "Writes out a file with a timestamp for each reconstructed file"
        try:
            timestamp = datetime.datetime.now()
            rel_filepath = datafile.relative_filepath
            rel_fp_str = str(rel_filepath.as_posix()).replace("/","_").replace(".","_")
            output_filepath = self._output_dir / f"{rel_fp_str}_placeholder.txt"
            with lock:
                with open(output_filepath, "w") as filep:
                    filep.write(
                        f"Processing timestamp: {timestamp.strftime('%m/%d/%Y, %H:%M:%S')}"
                    )
        except Exception as exc:
            return exc
        return None

    @classmethod
    def run_from_command_line(cls, args=None):
        "Not used in this example... stay tuned for the live coding tomorrow!"
        pass

In [14]:
def stream_processor_task(stream_processor):
    """Run "process_files_as_read" for the given stream processor, and log a message
    when it gets shuts down
    
    Args:
        stream_processor (openmsistream.DataFileStreamProcessor): The stream processor to run
    """
    # This call to "process_files_as_read" hangs until the stream processor is shut down
    (
        n_m_r, # The number of messages read
        n_m_p, # The number of messages processed
        n_f_p, # The number of files successfully processed
        p_fps, # Paths to the most recently-processed files
    ) = stream_processor.process_files_as_read()
    stream_processor.close()
    msg = f"{n_m_r} total messages were consumed"
    if n_f_p > 0:
        msg += (
            f", {n_m_p} messages were processed,"
            f" and {n_f_p} files were successfully processed"
        )
    else:
        msg += f" and {n_m_p} messages were successfully processed"
    msg += (
        f". Up to {stream_processor.N_RECENT_FILES} most recently "
        "processed files:\n\t"
    )
    msg += "\n\t".join([str(fp) for fp in p_fps])
    stream_processor.logger.info(msg)

In [15]:
# Path to the directory to store the StreamProcessor output
STREAM_PROCESSOR_OUTPUT_DIR = repo_root_dir / "streaming" / "PlaceholderStreamProcessor_output"

In [16]:
# Create the StreamProcessor
psp = PlaceholderStreamProcessor(
    CONFIG_FILE_PATH,
    CONSUMER_TOPIC_NAME,
    output_dir=STREAM_PROCESSOR_OUTPUT_DIR,
    logger=logger,
)
# Start running its "process_files_as_read" function in a separate thread
processor_thread = Thread(
    target=stream_processor_task,
    args=(psp,),
)
processor_thread.start()

[OpenMSIConsumers 2024-07-18 09:36:35] Log files and output will be in /Users/namanparikh/Documents/GitHub/paradim/PlaceholderStreamProcessor_output
[OpenMSIConsumers 2024-07-18 09:36:35] Will process files from messages in the tutorial_data topic using 2 threads
[OpenMSIConsumers 2024-07-18 09:38:38] 11 total messages were consumed, 11 messages were processed, and 11 files were successfully processed. Up to 50 most recently processed files:
	.ipynb_checkpoints/test_data-checkpoint.txt
	test_data.txt
	test_watchdog.txt
	test_watchdog_folder/test_1.txt
	test_watchdog_folder/.ipynb_checkpoints/test_1-checkpoint.txt
	test_watchdog_folder/test_2.txt
	test_watchdog_folder/.ipynb_checkpoints/test_2-checkpoint.txt
	test_watchdog_process_folder/test_2.txt
	test_watchdog_process_folder/.ipynb_checkpoints/test_1-checkpoint.txt
	test_watchdog_process_folder/test_1.txt
	test_watchdog_process_folder/.ipynb_checkpoints/test_2-checkpoint.txt


In [17]:
# Manually shut down the stream processor (if running from the command line this would
# be like typing "q" in the Terminal window)
psp.control_command_queue.put("q")
processor_thread.join()