In [None]:
# Update the AWS Profile depending on which AWS account and role you want to use

profiles = '''
[default]
sso_start_url = https://brtech.awsapps.com/start#/
sso_region = us-west-2
sso_account_id = 548136830426
sso_role_name = mesa_dev_admin
region = us-west-2
output = json
'''

PROFILE_NAME = 'default'

In [None]:
import os


# Create an .aws folder
path = '/root/.aws'
try:
  os.mkdir(path)
except FileExistsError:
  pass

# Write the AWS profile defined at the top of the notebook
with open(f'{path}/config', 'w') as f:
  f.write(profiles)

print('Profile saved to disk')

Profile saved to disk


In [None]:
import subprocess
import re
import os

import boto3


def get_profile_region(profile_name, profiles):
  profile_data_match = re.search(f'(\[{profile_name}\]\n)((.+ = .+\n)+)\[?', profiles)
  if profile_data_match is None:
    return None
  profile_data = profile_data_match.group(2)
  profile_region_match = re.search(f'region = (.*)\n', profile_data)
  if profile_region_match is None:
    return None
  profile_region = profile_region_match.group(1)
  return profile_region

def aws_sso_login(profile_name, profiles):
  if profile_name != 'default':
    config_profile_name = f'profile {profile_name}'
  else:
    config_profile_name = profile_name
  # Execute the aws command to authenticate the environment based
  # on the profile provided at the top of the notebook
  p = subprocess.Popen(
    ['aws', 'sso', 'login', '--profile', profile_name],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
  )
  # TODO: detect errors
  # Loop through stdout until we find the SSO code
  while True:
    line = p.stdout.readline()
    try:
      line = line.decode('utf-8')
    except Exception:
      pass
    match = re.search(r'[A-Z]{4}-[A-Z]{4}', line)
    if match:
      print('Please open this URL:')
      region = get_profile_region(config_profile_name, profiles)
      aws_sso_auth_url = f'https://device.sso.{region}.amazonaws.com/'
      link = f'{aws_sso_auth_url}?user_code={match.group(0)}'
      print(link)
      break
  # Exiting loop – waiting for URL to be opened
  print('Waiting for SSO code to be sent to AWS...')
  p.wait()
  print('Code received')

In [None]:
aws_sso_login(PROFILE_NAME, profiles)

Please open this URL:
https://device.sso.us-west-2.amazonaws.com/?user_code=QPRH-HCVB
Waiting for SSO code to be sent to AWS...
Code received


In [None]:
#ONE TIME LOADER FROM HERE
sqlContext.sql("use mesa_prod.mesa_lake_prod")
df = spark.sql(f'''
 SELECT image_jupiter.id,
    image_artifact_jupiter.kind,
    image_artifact_jupiter.web_s3_bucket,
    image_artifact_jupiter.web_s3_key,
    image_artifact_jupiter.id AS artifact_id
FROM image_jupiter
JOIN image_artifact_jupiter
    ON image_jupiter.id = image_artifact_jupiter.image
    WHERE image_artifact_jupiter.web_s3_key IS NOT NULL
ORDER BY group_id
LIMIT 10
'''
)

In [None]:
from pyspark.sql.functions import when
df = df.withColumn("region", when(df.web_s3_bucket == "mesa-data","us-west-2")
                                 .when(df.web_s3_bucket == "tartarus.images","us-west-2")
                                 .when(df.web_s3_bucket == "s3.aletheia.imports.prod" ,"us-west-1"))

In [None]:

import pyspark.sql.functions as f
df_agg = df.groupby("id").agg(f.concat_ws(", ", f.collect_list("artifact_id")).alias("artifact_id"),f.concat_ws(", ", f.collect_list("kind")).alias("kind"),f.concat_ws(", ", f.collect_list("web_s3_key")).alias("web_s3_key"),f.concat_ws(", ", f.collect_list("web_s3_bucket")).alias("web_s3_bucket"),f.concat_ws(", ", f.collect_list("region")).alias("region"))
#pd.set_option('display.max_rows', None)


In [None]:
p_df = df_agg.toPandas()

In [None]:
import pandas as pd
pd.set_option('display.max_rows', None)
p_df




Unnamed: 0,id,artifact_id,kind,web_s3_key,web_s3_bucket,region
0,61c3ba81909d1e70f761cc51,61c3ba81909d1e70f761cc52,rgb,dev/images/helm.png,mesa-data,us-west-2
1,61c3ba8068a1d5ad414da630,61c3ba8068a1d5ad414da631,rgb,dev/images/helm.png,mesa-data,us-west-2
2,61c3ba816b0c478c85d9138b,61c3ba816b0c478c85d9138c,rgb,dev/images/helm.png,mesa-data,us-west-2
3,61c3ba81a11c7feb82540ad4,61c3ba81a11c7feb82540ad5,rgb,dev/images/helm.png,mesa-data,us-west-2
4,61c3ba7d8bbdee5f30081319,61c3ba7d8bbdee5f3008131a,rgb,dev/images/helm.png,mesa-data,us-west-2
5,61c3ba7f1d52d718700c8865,61c3ba7f1d52d718700c8866,rgb,dev/images/helm.png,mesa-data,us-west-2
6,61c3ba7fbc33e04138f4d009,61c3ba7fbc33e04138f4d00a,rgb,dev/images/helm.png,mesa-data,us-west-2
7,632255f8180be9cb03faf2bb,632255f8180be9cb03faf2bc,rgb,prod/images/03dd8a32-6687-45aa-a32a-56c2595589...,mesa-data,us-west-2
8,61c3ba80a4294a09d6f67191,61c3ba80a4294a09d6f67192,rgb,dev/images/helm.png,mesa-data,us-west-2
9,61c3ba7e6f194498d0148643,61c3ba7e6f194498d0148644,rgb,dev/images/helm.png,mesa-data,us-west-2


In [None]:
json_array = []
#ADD REGIOIN AND ARTIFACT ID IS LOWERCASE
for index, row in p_df.iterrows():
    #check how many items in one of them, pattern follows for all 
    length = len(row['artifact_id'].split(","))
    id = row['id']
    artifacts = []
    for i in range(length):
        artifacts.append({"id":row['artifact_id'].split(",")[i],"kind": row['kind'].split(",")[i], "s3_bucket":row['web_s3_bucket'].split(",")[i],"s3_key":row['web_s3_key'].split(",")[i],"region":row['region'].split(",")[i]})
    json_format = {"image_id": id, "artifacts": artifacts }
    json_array.append(json_format)

In [None]:
json_array

In [None]:
'''
import time
import random
import boto3
import json
import numpy as np
client = boto3.client("kinesis", region_name="us-west-2")
STREAM_NAME = "mesa.images.ingest"

#in production we are splitting it up into 50 arrays, 1000 / 50 = 20 Kinesis put records call
#split_up_jsonarray = np.array_split(json_array,10)
try:
        im_id = json_arrays[0]['image_id']
        data_dict = json_arrays.tolist()
        data = json.dumps(data_dict, indent=2).encode('utf-8')

        print(f"Sending {data=}")
        
        response = client.put_records(Records =[{'Data':data, 'PartitionKey':im_id}],StreamName=STREAM_NAME)
        #response = client.put_record(StreamName=STREAM_NAME, Data =data, PartitionKey = 'A')
        print('NEXT BATCH')

        #print(f"Received {response=}")
except KeyboardInterrupt:
        print("Unexpected error")
'''

Out[41]: '\nimport time\nimport random\nimport boto3\nimport json\nimport numpy as np\nclient = boto3.client("kinesis", region_name="us-west-2")\nSTREAM_NAME = "mesa.images.ingest"\n\n#in production we are splitting it up into 50 arrays, 1000 / 50 = 20 Kinesis put records call\n#split_up_jsonarray = np.array_split(json_array,10)\ntry:\n        im_id = json_arrays[0][\'image_id\']\n        data_dict = json_arrays.tolist()\n        data = json.dumps(data_dict, indent=2).encode(\'utf-8\')\n\n        print(f"Sending {data=}")\n        \n        response = client.put_records(Records =[{\'Data\':data, \'PartitionKey\':im_id}],StreamName=STREAM_NAME)\n        #response = client.put_record(StreamName=STREAM_NAME, Data =data, PartitionKey = \'A\')\n        print(\'NEXT BATCH\')\n\n        #print(f"Received {response=}")\nexcept KeyboardInterrupt:\n        print("Unexpected error")\n'

In [None]:
import json
records = []
def create():
    for i in range(len(json_array)):
        record = json_array[i]
        image_id = record['image_id']
        records.append({"Data": json.dumps(record,indent =2).encode('utf-8'),"PartitionKey":image_id })
create()

In [None]:
import time
import random
import boto3
client = boto3.client("kinesis", region_name="us-west-2")
STREAM_NAME = "mesa.images.ingest"

#in production we are splitting it up into 50 arrays, 1000 / 50 = 20 Kinesis put records call
#split_up_jsonarray = np.array_split(json_array,10)

try:
    
        print(records)
        response = client.put_records(Records=records,StreamName=STREAM_NAME)
        #print(f"Received {response=}")
except KeyboardInterrupt:
        print("Unexpected error")

[{'Data': b'{\n  "image_id": "61c3ba81909d1e70f761cc51",\n  "artifacts": [\n    {\n      "id": "61c3ba81909d1e70f761cc52",\n      "kind": "rgb",\n      "s3_bucket": "mesa-data",\n      "s3_key": "dev/images/helm.png",\n      "region": "us-west-2"\n    }\n  ]\n}', 'PartitionKey': '61c3ba81909d1e70f761cc51'}, {'Data': b'{\n  "image_id": "61c3ba8068a1d5ad414da630",\n  "artifacts": [\n    {\n      "id": "61c3ba8068a1d5ad414da631",\n      "kind": "rgb",\n      "s3_bucket": "mesa-data",\n      "s3_key": "dev/images/helm.png",\n      "region": "us-west-2"\n    }\n  ]\n}', 'PartitionKey': '61c3ba8068a1d5ad414da630'}, {'Data': b'{\n  "image_id": "61c3ba816b0c478c85d9138b",\n  "artifacts": [\n    {\n      "id": "61c3ba816b0c478c85d9138c",\n      "kind": "rgb",\n      "s3_bucket": "mesa-data",\n      "s3_key": "dev/images/helm.png",\n      "region": "us-west-2"\n    }\n  ]\n}', 'PartitionKey': '61c3ba816b0c478c85d9138b'}, {'Data': b'{\n  "image_id": "61c3ba81a11c7feb82540ad4",\n  "artifacts": [\n