In [1]:
import os
import subprocess
import mlflow
import logging
import datetime
from hydra import initialize, compose
import mlflow as mf 
import joblib
import ftzard.utils.mlflow as mf_utils
import json

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [2]:
base_path = '/app/ftzard'
config_path = f'{base_path}/config/'
try:
    os.symlink(config_path, "config_link")
except Exception as e:
    print("Symlink already created...")
config_name = 'config'

Symlink already created...


In [3]:
metadata = joblib.load(f"{base_path}/data/step1_run_metadata.joblib")

In [4]:
with initialize(version_base=None, config_path="config_link"):
    cfg = compose(config_name=config_name)
    tracking_uri, experiment_name = cfg.MLFLOW_TRACKING_URI, cfg.MLFLOW_EXPERIMENT_NAME
    

In [5]:
os.environ['MLFLOW_TRACKING_URI'] = tracking_uri
logger.info(f'Mlflow Experiment Name: {experiment_name}')
logger.info(f'Mlflow Run Name: {metadata["run_name"]}')

INFO:__main__:Mlflow Experiment Name: senetiment_analysis
INFO:__main__:Mlflow Run Name: data_cleaning


In [6]:
# Set up logging
def get_current_date_time():
    now = datetime.datetime.now()
    date = str(now.date())
    hour = str(now.hour)
    minute = str(now.minute)
    return "_".join([date, hour, minute])
    

def run_command(command):
    """Run a shell command and return the output"""
    result = subprocess.run(command, shell=True, check=True, text=True, capture_output=True)
    return result.stdout.strip()

def check_dvc_files(directory):
    """Check if DVC files exist in the given directory"""
    dvc_files = [f for f in os.listdir(directory) if f.endswith('.dvc')]
    
    if not dvc_files:
        logger.warning(f"No .dvc files found in {directory}")
        return False
    return True


def main(run_name, date):
    
    try:
        os.chdir('/app/ftzard/')
        parent_dir = os.getcwd()
        
        logger.info(f"Changed working directory to: {parent_dir}")

        # Check for DVC files in the parent directory
        if not check_dvc_files(parent_dir):
            raise FileNotFoundError("DVC files not found in the parent directory.")

        run_command("dvc add data")

        # 1. Add the DVC lock file
        run_command("git add -f *.dvc")
        
        # 2. Commit the DVC lock file
        commit_message = f"Update DVC lock file {run_name} {date}"
        run_command(f'git commit -m "{commit_message}"')
        
        # 3. Push changes to GitHub
        run_command("git push -f origin main")  # Adjust branch name if needed
        
        # 4. Get the commit ID
        commit_id = run_command("git rev-parse HEAD")
        logger.info(f"Commit ID: {commit_id}")
        
        # 5. Perform DVC checkout
        run_command("dvc checkout")

        return commit_id
    
    except subprocess.CalledProcessError as e:
        logger.error(f"Error executing command: {e.cmd}")
        logger.error(f"Error output: {e.stderr}")
    except Exception as e:
        logger.error(f"An error occurred: {str(e)}")

def get_artifact(run_id:str, artifact_name:str):
    try:
        client = mf.MlflowClient()
        artifact_path = client.download_artifacts(run_id, artifact_name)
        with open(artifact_path, 'w') as f:
            artifact = json.load(f)
        return artifact
    except Exception as e:
        logger.info(e)


In [7]:
if __name__ == "__main__":
    run_name = metadata["run_name"]
    experiment_id = mf_utils.create_experiment(exp_name=experiment_name)
    print('Experiment Id: ', experiment_id)
    run_id = mf_utils.get_run_id_by_name(run_name=run_name, experiment_ids=[experiment_id])
    print('Run Id: ', run_id)
    if run_id:
        mf.start_run(run_id=run_id, run_name=run_name, experiment_id=experiment_id)
    else:
        mf.start_run(run_name=run_name, experiment_id=experiment_id)
        
    try:
        artifact_name = "commit_history.json"
        run_id = run_id if run_id else metadata["run_id"]
        artifact = get_artifact(run_id, artifact_name)
        now = get_current_date_time()
        commit_id = main(run_name, now)
        commit_id = commit_id if commit_id else 'NA'
        mf.set_tag("current_full_commit_id", commit_id)
        mf.log_metric("commit_id", hash(commit_id))
        if artifact:
            artifact[now] = commit_id
            mf.log_dict(artifact, 
                    artifact_name,
                   metadata["run_id"])
    except Exception as e:
        logger.info(e)
            
    mf.end_run()


INFO:alembic.runtime.migration:Context impl SQLiteImpl.
INFO:alembic.runtime.migration:Will assume non-transactional DDL.
INFO:__main__:No such file or directory: '/e:/FTzard/ftzard/pipeline/notebooks/mlruns/1/6f2f7a3538de462eb0137477aa1fcf7d/artifacts/commit_history.json'
INFO:__main__:Changed working directory to: /app/ftzard


The provided experiment name senetiment_analysis already exists, the run will be logged in this experiment.
                                 
Experiment Id:  1
Run Id:  6f2f7a3538de462eb0137477aa1fcf7d


INFO:__main__:Commit ID: 28460ad33c29e928d7b0a332bea21092dfa83be9
