In [34]:
import luigi
import pandas as pd
import os
import shutil

class ReadFile(luigi.Task):
    input_file = luigi.Parameter()

    def output(self):
        # Generate a standardized output file path with a .csv extension
        output_file = os.path.splitext(os.path.basename(self.input_file))[0]
        return luigi.LocalTarget(output_file)

    def run(self):
        file_extension = os.path.splitext(self.input_file)[1].lower()
        
        if file_extension == '.csv':
            # If the input format is CSV, just copy the file
            shutil.copyfile(self.input_file, self.output().path)
        elif file_extension == '.xlsx':
            # Read Excel file and save as CSV
            df = pd.read_excel(self.input_file)
            df.to_csv(self.output().path, index=False)
        else:
            raise ValueError(f"Unsupported file format: {file_extension}")


class ConsolidateData(luigi.Task):
    input_folder = luigi.Parameter()
    output_folder = luigi.Parameter(default="./processed_data")

    def requires(self):
        input_files = [os.path.join(self.input_folder, filename) for filename in os.listdir(self.input_folder)]
        return [ReadFile(input_file=input_file) for input_file in input_files]

    def output(self):
        return luigi.LocalTarget(self.output_folder)

    def complete(self):
        remaining_files = [filename for filename in os.listdir(self.input_folder)]
        return len(remaining_files) == 0

    def run(self):
        for input_task in self.requires():
            input_file = input_task.input_file  # Store the input file path
            output_format = os.path.splitext(input_file)[1].lower()
            print(os.path.splitext(input_file))
            if output_format == '.csv':
                # If the output format is CSV, you can continue with your existing CSV processing logic
                input_df = pd.read_csv(input_file)
                category = input("Enter the category for dataset '{}': ".format(os.path.basename(input_file)))
                category_folder = os.path.join(self.output_folder, category)
                os.makedirs(category_folder, exist_ok=True)
                output_csv_path = os.path.join(category_folder, os.path.basename(input_file))
                input_df.to_csv(output_csv_path, index=False)
                print(f"Processed CSV file: {input_file}")
            elif output_format == '.xlsx':
                # Read Excel file and save as CSV
                input_df = pd.read_excel(input_file)  # Corrected this line
                category = input("Enter the category for dataset '{}': ".format(os.path.basename(input_file)))
                category_folder = os.path.join(self.output_folder, category)
                os.makedirs(category_folder, exist_ok=True)
                output_csv_path = os.path.join(category_folder, os.path.basename(input_file))
                input_df.to_csv(output_csv_path, index=False)
                print(f"Processed XLSX file: {input_file}")
            
            # Delete the input file after processing
            os.remove(input_file)

if __name__ == "__main__":
    input_folder_path = "./input_data" 
    output_folder_path = "./processed_data" 
    luigi.build([ConsolidateData(input_folder=input_folder_path, output_folder=output_folder_path)], local_scheduler=True)


DEBUG: Checking if ConsolidateData(input_folder=./input_data, output_folder=./processed_data) is complete
DEBUG: Checking if ReadFile(input_file=./input_data/Unemployment_Insurance_Program_Monthly_Claims_Data_for_California.csv) is complete
INFO: Informed scheduler that task   ConsolidateData___input_data___processed_data_bae0eb4364   has status   PENDING
INFO: Informed scheduler that task   ReadFile___input_data_Une_3cd13d818e   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 97078] Worker Worker(salt=7535332105, workers=1, host=Rohits-MacBook-Pro.local, username=rohitvalmeekam, pid=97078) running   ReadFile(input_file=./input_data/Unemployment_Insurance_Program_Monthly_Claims_Data_for_California.csv)
INFO: [pid 97078] Worker Worker(salt=7535332105, workers=1, host=Rohits-MacBook-Pro.local, username=rohitvalmeekam, pid=97078) done      ReadFile(input_file=./input_data/Unemploy

('./input_data/Unemployment_Insurance_Program_Monthly_Claims_Data_for_California', '.csv')


INFO: [pid 97078] Worker Worker(salt=7535332105, workers=1, host=Rohits-MacBook-Pro.local, username=rohitvalmeekam, pid=97078) done      ConsolidateData(input_folder=./input_data, output_folder=./processed_data)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   ConsolidateData___input_data___processed_data_bae0eb4364   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=7535332105, workers=1, host=Rohits-MacBook-Pro.local, username=rohitvalmeekam, pid=97078) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 ConsolidateData(input_folder=./input_data, output_folder=./processed_data)
    - 1 ReadFile(input_file=./input_data/Unemployment_Insurance_Program_Monthly_Claims_Data_for_California.csv)

This progress looks :) because there were no failed tasks or missing 

Processed CSV file: ./input_data/Unemployment_Insurance_Program_Monthly_Claims_Data_for_California.csv
