In [1]:
import sagemaker
import boto3
import time
import json
import os

from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [4]:
%run sparkquest_pipeline.py

Initializing SageMaker session...


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


Region: us-east-2
Role: arn:aws:iam::717494507037:role/service-role/AmazonSageMaker-ExecutionRole-20251115T112643
Bucket: sagemaker-us-east-2-717494507037

Defining pipeline parameters...
Creating processing scripts...
Uploading processing scripts to S3...
  âœ“ text_analysis.py uploaded
  âœ“ video_analysis.py uploaded
  âœ“ aggregator.py uploaded

Defining pipeline steps...


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3



Creating pipeline...
Pipeline 'SparkQuest-Career-Matcher-Pipeline' defined with 3 steps

Deploying pipeline to SageMaker...





âœ… SUCCESS! Pipeline deployed successfully!

Pipeline Name: SparkQuest-Career-Matcher-Pipeline
Region: us-east-2
Bucket: sagemaker-us-east-2-717494507037

Pipeline ARN: arn:aws:sagemaker:us-east-2:717494507037:pipeline/SparkQuest-Career-Matcher-Pipeline

Pipeline steps:
  1. TextAnalysis - Analyzes personal assessment text
  2. VideoAnalysis - Analyzes creative content video
  3. TalentAssessmentAggregator - Matches candidate to careers

You can now run the interactive script to execute this pipeline.


In [5]:
import sagemaker
import boto3
import time
import os
import json
from sagemaker.workflow.pipeline import Pipeline

# --- 1. CONFIGURE AWS & SAGEMAKER ---
sagemaker_session = sagemaker.Session()
boto_session = boto3.Session()
s3_client = boto_session.client('s3')

role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
pipeline_name = "SparkQuest-Career-Matcher-Pipeline"

# --- 2. DEFINE HELPER FUNCTION ---
def upload_text_to_s3(content, s3_key):
    """Uploads a string as a text file to S3."""
    try:
        s3_client.put_object(
            Bucket=default_bucket,
            Key=s3_key,
            Body=content
        )
        return f"s3://{default_bucket}/{s3_key}"
    except Exception as e:
        print(f"Error uploading to S3: {e}")
        return None

# --- 3. GET USER INPUT ---
print("--- SparkQuest Interactive Analysis ---")

# Prompt for Personal Assessment
print("\n[1/2] Please paste your personal assessment text (or transcript):")
# --- FIX: Changed instructions to be clear ---
print("(You can type or paste multiple lines. Press [Enter] on an empty line when you are done.)")
assessment_lines = []
while True:
    # --- FIX: Added a '>' prompt to make it clear it's waiting for input ---
    line = input("> ") 
    if line == "":
        break
    assessment_lines.append(line)
assessment_text = "\n".join(assessment_lines)

if not assessment_text:
    print("No text provided, using default demo text.")
    assessment_text = "I am a creative person who enjoys analytics and structured projects."

print("\n...Assessment text captured.")

# Prompt for Video S3 URI
print("\n[2/2] Please provide the S3 URI for your creative video:")
print("(e.g., s3://my-videos-bucket/my-reel.mp4)")
video_s3_uri = input("Video S3 URI: ").strip()

if not video_s3_uri:
    print("No S3 URI provided, using default demo URI.")
    # You MUST upload a demo file to this exact path for the default to work
    video_s3_uri = f"s3://{default_bucket}/sparkquest/demo_video.mp4" 
    print(f"NOTE: Make sure a demo video actually exists at {video_s3_uri}")

print("...Video URI captured.")

# --- 4. PREPARE & UPLOAD INPUTS ---
print("\n--- Preparing and uploading inputs to S3... ---")

# Create a unique execution ID
execution_id = f"exec-{int(time.time())}"

# Define S3 paths for the text files
assessment_s3_key = f"sparkquest/inputs/{execution_id}/personal_assessment.txt"
video_uri_s3_key = f"sparkquest/inputs/{execution_id}/creative_content_uri.txt"

# Upload the collected data as text files to S3
assessment_s3_path = upload_text_to_s3(assessment_text, assessment_s3_key)
# We don't upload the video, just the *path* to the video in a text file
video_uri_s3_path = upload_text_to_s3(video_s3_uri, video_uri_s3_key)

if not (assessment_s3_path and video_uri_s3_path):
    print("Failed to upload input files to S3. Aborting.")
    # Use exit() in a script, but in a notebook, we'll just return
    # exit() 
else:
    print(f"   -> Assessment text uploaded to: {assessment_s3_path}")
    print(f"   -> Video URI uploaded to:     {video_uri_s3_path}")

    # --- 5. START THE PIPELINE ---
    print(f"\n--- Starting SageMaker Pipeline '{pipeline_name}'... ---")

    try:
        # Get the pipeline object by name
        pipeline = Pipeline(
            name=pipeline_name,
            sagemaker_session=sagemaker_session,
        )

        # Start the pipeline execution, passing in the S3 paths
        # of the files we just created as parameters.
        execution = pipeline.start(
            parameters={
                "PersonalAssessmentS3Uri": assessment_s3_path,
                "CreativeContentS3Uri": video_uri_s3_path,
            }
        )
        
        # --- THIS IS THE FIX ---
        execution_arn = execution.arn 
        print(f"\nSUCCESS: Pipeline execution started.")
        print(f"Execution ARN: {execution_arn}")
        
        print("\n---")
        print("You can monitor the pipeline progress in the SageMaker console:")
        print(f"httpsExample:://{boto_session.region_name}.console.aws.amazon.com/sagemaker/home?region={boto_session.region_name}#/pipelines/executions")
        
        # Wait for the execution to finish
        print("\nWaiting for pipeline to complete... (This can take 5-10 minutes)")
        execution.wait()
        
        print("Pipeline execution FINISHED.")
        
        # --- 6. GET AND DISPLAY RESULTS ---
        print("\n--- Retrieving Final Report... ---")
        
        # Find the S3 output of the final step
        step_list = execution.list_steps()
        final_step_output = None
        for step in step_list:
            if step['StepName'] == 'TalentAssessmentAggregator':
                # Find the output named 'final_job_list'
                for output in step['Metadata']['ProcessingJob']['ProcessingOutputConfig']['Outputs']:
                    if output['OutputName'] == 'final_job_list':
                        output_s3_uri = output['S3Output']['S3Uri']
                        final_step_output = os.path.join(output_s3_uri, 'final_job_list.json')
                        break
            if final_step_output:
                break
                
        if final_step_output:
            print(f"Loading results from: {final_step_output}")
            
            # Parse S3 URI
            s3_path_parts = final_step_output.replace("s3://", "").split("/")
            bucket = s3_path_parts[0]
            key = "/".join(s3_path_parts[1:])
            
            # Download and print the file
            result_obj = s3_client.get_object(Bucket=bucket, Key=key)
            result_data = json.loads(result_obj['Body'].read().decode('utf-8'))
            
            print("\n--- ðŸš€ Your SparkQuest Career Report ---")
            print(json.dumps(result_data, indent=2))
            
        else:
            print("Could not find the final output step. Please check the pipeline execution details.")

    except Exception as e:
        print(f"\n--- ERROR ---")
        print(f"Could not start pipeline. Did you run the 'sparkquest_pipeline.py' script first?")
        print(f"Error: {e}")

--- SparkQuest Interactive Analysis ---

[1/2] Please paste your personal assessment text (or transcript):
(You can type or paste multiple lines. Press [Enter] on an empty line when you are done.)


>  creative
>  design
>  



...Assessment text captured.

[2/2] Please provide the S3 URI for your creative video:
(e.g., s3://my-videos-bucket/my-reel.mp4)


Video S3 URI:  


No S3 URI provided, using default demo URI.
NOTE: Make sure a demo video actually exists at s3://sagemaker-us-east-2-717494507037/sparkquest/demo_video.mp4
...Video URI captured.

--- Preparing and uploading inputs to S3... ---
   -> Assessment text uploaded to: s3://sagemaker-us-east-2-717494507037/sparkquest/inputs/exec-1763235898/personal_assessment.txt
   -> Video URI uploaded to:     s3://sagemaker-us-east-2-717494507037/sparkquest/inputs/exec-1763235898/creative_content_uri.txt

--- Starting SageMaker Pipeline 'SparkQuest-Career-Matcher-Pipeline'... ---

SUCCESS: Pipeline execution started.
Execution ARN: arn:aws:sagemaker:us-east-2:717494507037:pipeline/SparkQuest-Career-Matcher-Pipeline/execution/2poe3pxjezsr

---
You can monitor the pipeline progress in the SageMaker console:
httpsExample:://us-east-2.console.aws.amazon.com/sagemaker/home?region=us-east-2#/pipelines/executions

Waiting for pipeline to complete... (This can take 5-10 minutes)

--- ERROR ---
Could not start pipe