In [None]:
import os

In [None]:
%pwd

In [None]:
os.chdir("../")

In [None]:
%pwd

In [None]:
import cProfile
import pstats
import logging.handlers
import queue
import threading
from concurrent.futures import ThreadPoolExecutor
import nest_asyncio
import asyncio
import os
import zipfile
import gdown
from se489_group_project import logger
from se489_group_project.utility.common import get_size

In [None]:
# # Create a thread-safe queue for log messages
# log_queue = queue.Queue(-1)  # No size limit

# # Create a QueueHandler to send log messages to the queue
# queue_handler = logging.handlers.QueueHandler(log_queue)

# # Set up the root logger to use the QueueHandler
# logger = logging.getLogger()
# logger.setLevel(logging.DEBUG)
# logger.addHandler(queue_handler)

# # Create a handler for console output (or file output)
# console_handler = logging.StreamHandler()
# console_handler.setLevel(logging.DEBUG)
# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# console_handler.setFormatter(formatter)

# # Create a QueueListener to process log messages from the queue
# listener = logging.handlers.QueueListener(log_queue, console_handler)

# # Start the listener thread
# listener.start()
# #made slower

In [None]:
from dataclasses import dataclass
from pathlib import Path


@dataclass(frozen=True)
class GettingDataConfig:
    root_dir: Path
    source_URL: str
    local_data_file: Path
    unzip_dir: Path

In [None]:
from se489_group_project.constants import *
from se489_group_project.utility.common import read_yaml, create_directories

In [None]:
class ConfigurationManager:
    def __init__(
        self,
        config_filepath = CONFIG_FILE_PATH,
        params_filepath = PARAMS_FILE_PATH):

        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)

        create_directories([self.config.data_storage])

    def get_data_ingestion_config(self) -> GettingDataConfig:
        config = self.config.data_ingestion
        self.executor = ThreadPoolExecutor(max_workers=4)  # Adjust the number of workers as needed


        create_directories([config.root_dir])

        data_ingestion_config = GettingDataConfig(
            root_dir=config.root_dir,
            source_URL=config.source_URL,
            local_data_file=config.local_data_file,
            unzip_dir=config.unzip_dir 
        )

        return data_ingestion_config

In [None]:
class DataIngestion:
    def __init__(self, config: GettingDataConfig):
        self.config = config

    
    async def download_file(self)-> str:
        '''
        Fetch data from the url
        '''

        try: 
            dataset_url = self.config.source_URL
            zip_download_dir = self.config.local_data_file
            os.makedirs("data/raw", exist_ok=True)
            logger.info(f"Downloading data from {dataset_url} into file {zip_download_dir}")

            file_id = dataset_url.split("/")[-2]
            prefix = 'https://drive.google.com/uc?/export=download&id='
            gdown.download(prefix+file_id,zip_download_dir)

            logger.info(f"Downloaded data from {dataset_url} into file {zip_download_dir}")

        except Exception as e:
            raise e
        
    

    async def extract_zip_file(self):
        """
        zip_file_path: str
        Extracts the zip file into the data directory
        Function returns None
        """
        unzip_path = self.config.unzip_dir
        os.makedirs(unzip_path, exist_ok=True)
        with zipfile.ZipFile(self.config.local_data_file, 'r') as zip_ref:
            zip_ref.extractall(unzip_path)

In [None]:
async def analyze(file):
    p = pstats.Stats(file)
    # Print top 10 functions sorted by cumulative time
    print("\nTop 10 functions sorted by cumulative time:")
    p.sort_stats('cumtime').print_stats(10)
    
    # Print top 10 functions sorted by total time
    print("\nTop 10 functions sorted by total time:")
    p.sort_stats('tottime').print_stats(10)
    

In [None]:
import subprocess

#added async to main function
async def main():
    
    try:
        log_dir = os.path.join(os.getcwd(), "se489_group_project", "visualizations")
        file = os.path.join(log_dir, 'cprofile_stats_data_ingestion.prof')
        config = ConfigurationManager()
        data_ingestion_config = config.get_data_ingestion_config()
        data_ingestion = DataIngestion(config=data_ingestion_config)
        # await data_ingestion.download_file()
        # await data_ingestion.extract_zip_file()
        profiler = cProfile.Profile()
        profiler.enable()
            
        #Profile the download_file function
        logger.info("Profiling download_file()")
        await data_ingestion.download_file()

        # Profile the extract_zip_file function
        logger.info("Profiling extract_zip_file()")
        await data_ingestion.extract_zip_file()
        profiler.disable()
        profiler.dump_stats(file)

        profile_file_full_path = os.path.abspath(file)
        await analyze(profile_file_full_path)
        #Automatically open snakeviz to visualize the profiling results
        try:
            subprocess.Popen(["snakeviz", profile_file_full_path])
        except FileNotFoundError:
            print("snakeviz is not installed or not found in the system path.")
    except Exception as e:
        logger.error(f"Error: {e}") # Log the error
        raise e
    # finally:
    #     # Stop the listener thread
    #     listener.stop()
if __name__ == "__main__":
    nest_asyncio.apply()
    asyncio.run(main())
    #main()