In [1]:
# Parameters
kms_key = "arn:aws:kms:us-west-2:000000000000:1234abcd-12ab-34cd-56ef-1234567890ab"

In [2]:
import sys

!{sys.executable} -m pip install "sagemaker>=2.99.0"



In [3]:
import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
default_bucket_prefix = sagemaker_session.default_bucket_prefix
default_bucket_prefix_path = ""



sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [4]:
!mkdir -p data

In [8]:
# === S3 PARAMETERS ===
S3_BUCKET = "itam-analytics-thmrudolf"
S3_KEY = "final_project/clean/beer_names_breweries_with_images.csv"
# === local parameters
local_path = "data/beer_names_breweries_with_images.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"{S3_BUCKET}").download_file(
    f"{S3_KEY}", local_path
)


base_uri = f"s3://{S3_BUCKET}/{S3_KEY}"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(input_data_uri)

s3://itam-analytics-thmrudolf/final_project/clean/beer_names_breweries_with_images.csv/beer_names_breweries_with_images.csv


In [9]:
# === CONFIGURATION ===
import json
with open("credentials_google_search.json", 'r') as f:
    creds = json.load(f)
GOOGLE_API_KEY = creds['google']['api_key']
GOOGLE_CSE_ID = creds['google']['cx']




In [10]:
# === INITIALIZATION ===
# Abres un cliente de S3
import boto3
from io import StringIO
import pandas as pd
from pipeline_actualization_images import add_image_url_column_if_not_exists, find_beer_image_url 
s3 = boto3.client('s3')  # sin profile_name

all_records = []

# === SEARCH LOOP ===
print("Starting image search...")
# Read the CSV from S3 bucket
db_beers_names_breweries = pd.read_csv(local_path)

db_beers = add_image_url_column_if_not_exists(db_beers_names_breweries,
                                              GOOGLE_API_KEY,
                                              GOOGLE_CSE_ID,
                                              s3, S3_BUCKET, S3_KEY)
#all_records = db_beers.to_dict(orient="records")

# === SAVE TO S3 ===
#print(f"Saving {len(all_records)} image URLs to S3...")

#df = pd.DataFrame(all_records)
#csv_buffer = StringIO()
#df.to_csv(csv_buffer, index=False)

#s3.put_object(Bucket=S3_BUCKET, Key=S3_KEY, Body=csv_buffer.getvalue())
#print(f"Done! Saved to s3://{S3_BUCKET}/{S3_KEY}")


Starting image search...
Beer: amber, Brewery: alaskan brewing co.
Skipping row 0, image_url already exists:
https://tse2.mm.bing.net/th?id=OIP.p2UqLwq4WBWB4rhAdHq--gHaLO&pid=Api&P=0&h=180
Beer: double bag, Brewery: long trail brewing co.
Skipping row 1, image_url already exists:
https://www.instacart.com/image-server/1200x1200/www.instacart.com/assets/domains/product-image/file/large_08cdae49-739c-44d8-beaf-be7849984cfa.jpg
Beer: long trail ale, Brewery: long trail brewing co.
Skipping row 2, image_url already exists:
https://longtrail.com/wp-content/uploads/2022/06/LTB313-18-SummerAle-Rebrand-Bottle-3D-LR_1.png
Beer: doppelsticke, Brewery: uerige obergarige hausbrauerei gmbh / zum uerige
Skipping row 3, image_url already exists:
https://www.bierverkostung.de/bilder_bier/4144_2020-07-25_Uerige_DoppelSticke.jpg
Beer: sleigh'r dark double alt ale, Brewery: ninkasi brewing company
Skipping row 4, image_url already exists:
https://cdn.shopify.com/s/files/1/0227/0581/products/Ninkasi-Sleig

In [13]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role
import sagemaker
import boto3

role = get_execution_role()
region = "us-east-1"
bucket = "itam-analytics-thmrudolf"
S3_KEY = "final_project/clean/beer_names_breweries_with_images.csv"
output_path = f"s3://{bucket}/{S3_KEY}/"

sklearn_processor = SKLearnProcessor(
    framework_version="1.2-1",
    role=role,
    instance_type="ml.m5.large",
    instance_count=1,
    base_job_name="csv-image-finder-job"
)

sklearn_processor.run(
    code="pipeline_actualization_images.py",
    outputs=[
        ProcessingOutput(source="/opt/ml/processing/output", destination=output_path)
    ]
)


INFO:sagemaker:Creating processing-job with name csv-image-finder-job-2025-05-27-02-50-02-261


...............
..

In [21]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="FindingNewImageUrls", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(
    name="ApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
#batch_data = ParameterString(
#    name="BatchData",
#    default_value=batch_data_uri,
#)
#mse_threshold = ParameterFloat(name="MseThreshold", default_value=6.0)

In [23]:
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.session import Session
from sagemaker import get_execution_role

role = get_execution_role()
sagemaker_session = Session()

sklearn_processor = SKLearnProcessor(
    framework_version="1.2-1",
    role=role,
    instance_type="ml.m5.large",
    instance_count=1
)

step_process = ProcessingStep(
    name="GenerateCSVStep",
    processor=sklearn_processor,
    code="pipeline_actualization_images.py",
    outputs=[]
)

pipeline = Pipeline(
    name="DailyCsvPipeline",
    steps=[step_process],
    sagemaker_session=sagemaker_session
)


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [24]:
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:535002863660:pipeline/DailyCsvPipeline',
 'ResponseMetadata': {'RequestId': 'f07299ad-dfc5-44f0-8cb3-e88cb650f325',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'f07299ad-dfc5-44f0-8cb3-e88cb650f325',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '84',
   'date': 'Sun, 25 May 2025 16:59:40 GMT'},
  'RetryAttempts': 0}}

In [30]:
execution = pipeline.start()
print(execution)

_PipelineExecution(arn='arn:aws:sagemaker:us-east-1:535002863660:pipeline/DailyCsvPipeline/execution/nm2rad1kkfu3', sagemaker_session=<sagemaker.session.Session object at 0x7fe2002a22a0>)


In [26]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:535002863660:pipeline/DailyCsvPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:535002863660:pipeline/DailyCsvPipeline/execution/53ucogcwao0a',
 'PipelineExecutionDisplayName': 'execution-1748192401909',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'dailycsvpipeline',
  'TrialName': '53ucogcwao0a'},
 'CreationTime': datetime.datetime(2025, 5, 25, 17, 0, 1, 806000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2025, 5, 25, 17, 0, 1, 806000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:535002863660:user-profile/d-a0uxfxrem9gp/datascientist',
  'UserProfileName': 'datascientist',
  'DomainId': 'd-a0uxfxrem9gp',
  'IamIdentity': {'Arn': 'arn:aws:sts::535002863660:assumed-role/SageMakerStudioExecutionRole/SageMaker',
   'PrincipalId': 'AROAXZEFH3AWBZL57IMHS:SageMaker'}},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:5350

In [27]:
execution.list_steps()

[{'StepName': 'GenerateCSVStep',
  'StartTime': datetime.datetime(2025, 5, 25, 17, 0, 2, 777000, tzinfo=tzlocal()),
  'StepStatus': 'Executing',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:535002863660:processing-job/pipelines-53ucogcwao0a-GenerateCSVStep-G0JPp6sz7q'}},
  'AttemptCount': 1}]