In [9]:
from kfp.dsl import component, pipeline
from google.cloud import aiplatform
from vertexai.generative_models import GenerativeModel

from kfp.dsl import component, pipeline

@component(base_image="python:3.9", packages_to_install=[
    "google-cloud-aiplatform==1.60.0",
    "vertexai==1.43.0"
])
def interact_with_gemini_model_once(
    initial_prompt: str,
    project: str,
    region: str,
    endpoint_id: str
) -> str:
    import vertexai
    from vertexai.generative_models import GenerativeModel

    # Vertex AI 초기화
    vertexai.init(project=project, location=region)

    # GenerativeModel을 엔드포인트를 통해 로드
    model = GenerativeModel(endpoint_id)

    # 채팅 시작
    chat = model.start_chat()

    # 첫 번째 대화에서 응답 생성
    response = chat.send_message(initial_prompt)
    
    # 응답 텍스트를 반환
    return response.text

@component(base_image="python:3.9", packages_to_install=[
    "google-cloud-aiplatform==1.60.0",
    "vertexai==1.43.0"
])
def interact_with_gemini_model_second(
    previous_response_text: str,
    user_prompt: str,
    project: str,
    region: str,
    endpoint_id: str
) -> str:
    import vertexai
    from vertexai.generative_models import GenerativeModel

    # Vertex AI 초기화
    vertexai.init(project=project, location=region)

    # GenerativeModel을 엔드포인트를 통해 로드
    model = GenerativeModel(endpoint_id)

    # 채팅 시작
    chat = model.start_chat()

    # 이전의 대화 맥락을 유지하며 새로운 프롬프트로 대화 이어나감
    combined_prompt = f"{previous_response_text}\n\n{user_prompt}"
    response = chat.send_message(combined_prompt)
    
    # 응답 텍스트를 반환
    return response.text

@component(base_image="asia-northeast3-docker.pkg.dev/andong-24-team-101/components-image/com-image:latest")
def generate_music(prompt: str, gcs_bucket_name: str, gcs_output_path: str):
    """
    텍스트 프롬프트를 기반으로 음악을 생성하고 GCS 버킷에 WAV 파일로 바로 저장합니다.
    """
    from transformers import AutoProcessor, MusicgenForConditionalGeneration
    import torch
    import scipy.io.wavfile
    from google.cloud import storage
    import io
    import numpy as np

    try:
        # 디버깅 로그 추가
        print("Loading MusicGen model...")
        
        processor = AutoProcessor.from_pretrained("facebook/musicgen-medium")
        model = MusicgenForConditionalGeneration.from_pretrained("facebook/musicgen-medium")

        # Move model to GPU (cuda) if available
        device = "cuda" if torch.cuda.is_available() else "cpu"
        print(f"Using device: {device}")
        model = model.to(device)
        
        # 쿠다 체크
        if torch.cuda.is_available():
            print(f"CUDA is available. Number of GPUs: {torch.cuda.device_count()}")
            print(f"CUDA Device Name: {torch.cuda.get_device_name(0)}")
        else:
            print("CUDA is not available.")

        # Process the prompt and move inputs to the appropriate device
        inputs = processor(text=[prompt], padding=True, return_tensors="pt").to(device)

        # Generate audio
        with torch.no_grad():
            audio_values = model.generate(**inputs, guidance_scale=2, max_new_tokens=800)

        # Set sampling rate from model configuration
        sampling_rate = model.config.audio_encoder.sampling_rate

        # Convert audio tensor to CPU and to numpy array
        audio_data = audio_values[0, 0].cpu().numpy()

        # Convert numpy array to WAV format in memory using BytesIO
        wav_io = io.BytesIO()
        scipy.io.wavfile.write(wav_io, rate=sampling_rate, data=audio_data.astype(np.float32))

        # Seek to the beginning of the BytesIO object to prepare for upload
        wav_io.seek(0)

        # Initialize the Google Cloud Storage client
        storage_client = storage.Client()

        # Upload the in-memory WAV file to the specified GCS bucket
        bucket = storage_client.bucket(gcs_bucket_name)
        blob = bucket.blob(gcs_output_path)
        
        # Upload the WAV file directly from memory
        blob.upload_from_file(wav_io, content_type="audio/wav")

        print(f"File uploaded to {gcs_bucket_name}/{gcs_output_path}")
        
    except Exception as e:
        print(f"Error during generate_music: {str(e)}")
        raise

@pipeline(name="music-generation-pipeline", description="A pipeline to generate music based on two rounds of conversation with Gemini model.")
def music_generation_pipeline(
    initial_prompt: str,
    user_prompt: str,
    endpoint_id: str,
    project: str,
    region: str,
    gcs_bucket_name: str,
    gcs_output_path: str
):
    # 첫 번째 대화 실행
    first_response = interact_with_gemini_model_once(
        initial_prompt=initial_prompt,
        project=project,
        region=region,
        endpoint_id=endpoint_id
    ).set_caching_options(True)  # 캐싱 활성화

    # 두 번째 대화 실행
    second_response = interact_with_gemini_model_second(
        previous_response_text=first_response.output,
        user_prompt=user_prompt,
        project=project,
        region=region,
        endpoint_id=endpoint_id
    ).set_caching_options(True)  # 캐싱 활성화

    # 생성된 텍스트로 음악 생성 및 바로 GCS에 저장
    generate_music_task = generate_music(
        prompt=second_response.output,
        gcs_bucket_name=gcs_bucket_name,
        gcs_output_path=gcs_output_path
    ).set_caching_options(True)  # 캐싱 활성화

In [10]:
from google.cloud import aiplatform
from kfp.v2 import compiler
from google.cloud.aiplatform.pipeline_jobs import PipelineJob

if __name__ == "__main__":
    # 파이프라인을 JSON 파일로 컴파일
    compiler.Compiler().compile(
        pipeline_func=music_generation_pipeline,
        package_path="music_generation_pipeline.json"
    )

    # Vertex AI 초기화
    aiplatform.init(project='andong-24-team-101', location='asia-northeast3')

    # 파이프라인 제출
    pipeline_job = PipelineJob(
        display_name="music-generation-pipeline",
        template_path="music_generation_pipeline.json",
        pipeline_root="gs://test_music_team_101/test_pipeline",
        parameter_values={
            "initial_prompt": "Input Description of desire for musical expression!",
            "user_prompt": "beautiful woman is looking at Eiffel Tower",  # 웹에서 이미지 분석해서 생성한 텍스트 위치
            "endpoint_id": "projects/535442247184/locations/asia-northeast3/endpoints/7748060528844472320",
            "project": "andong-24-team-101",
            "region": "asia-northeast3",
            "gcs_bucket_name": "test_music_team_101",  # GCS 버킷 이름
            "gcs_output_path": "test_wav/advanced_New.wav"  # 버킷 내 파일을 저장할 경로
        }
    )

    # 파이프라인을 실행하여 제출 (동기식)
    pipeline_job.run(sync=True)


Creating PipelineJob
PipelineJob created. Resource name: projects/535442247184/locations/asia-northeast3/pipelineJobs/music-generation-pipeline-20240827213328
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/535442247184/locations/asia-northeast3/pipelineJobs/music-generation-pipeline-20240827213328')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/asia-northeast3/pipelines/runs/music-generation-pipeline-20240827213328?project=535442247184
PipelineJob projects/535442247184/locations/asia-northeast3/pipelineJobs/music-generation-pipeline-20240827213328 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/535442247184/locations/asia-northeast3/pipelineJobs/music-generation-pipeline-20240827213328 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/535442247184/locations/asia-northeast3/pipelineJobs/music-generation-pipeline-20240827213328 current state:
PipelineState.PIPELINE_S