## Retrieve current AWS account and Region

In [None]:
import boto3
sts_client = boto3.client('sts')
account_id = sts_client.get_caller_identity()['Account']
current_region = sts_client.meta.region_name

print(f"Current AWS Account ID: {account_id}")
print(f"Current region : {current_region}")

## Configure our Spark Session to use a RMS backed catalog

This code initializes a PySpark session with specific configurations for working with Apache Iceberg tables through AWS Glue catalog. It sets up:

1. A custom catalog named 'images'
2. AWS Glue as the catalog implementation
3. Iceberg extensions for Spark
The session is configured to use Iceberg as the table format and AWS Glue as the metadata store, enabling data lake operations

In the code below, replace the Account-number with the AWS Account Id you get from the previous step.

In [None]:
%%pyspark emr-s.emr-image-processing
from pyspark.sql import SparkSession
import logging

spark = SparkSession.builder.appName('rms_images') \
.config('spark.sql.catalog.images', 'org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.defaultCatalog', 'images') \
.config('spark.sql.catalog.images.catalog-impl', 'org.apache.iceberg.aws.glue.GlueCatalog') \
.config('spark.sql.catalog.images.glue.id','Account-number:image-data-rms/dev') \
.config('spark.sql.catalog.images.client.region','us-east-1') \
.config('spark.sql.extensions','org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.getOrCreate()

## Let's configure the spark session to use the public namespace/database


In [None]:
%%pyspark emr-s.emr-image-processing
spark.sql("use public")

spark.sql("show databases")

## Create a dataframe from the JSON

This code sets up a data processing pipeline using PySpark. It defines a structured schema for the images dataset. The code then reads a JSON file from an S3 location using this schema, and loads it into a DataFrame.  The code then drops one of the columns  and creates a temporary SQL view  for potential SQL queries. 
                                                                                                                                                                    
In the code section below, replace *insert your S3 URI here* with the  S3 URI you saved after uploading the images JSON file.                                                                                                                                                        

In [None]:
%%pyspark emr-s.emr-image-processing
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

# Define the schema for the nested struct in associated_entities
associated_entities_schema = StructType([
    StructField("entity_submitter_id", StringType(), True),
    StructField("entity_type", StringType(), True),
    StructField("case_id", StringType(), True),
    StructField("entity_id", StringType(), True)
])

# Define the main schema
images_schema = StructType([
    StructField("data_format", StringType(), True),
    StructField("access", StringType(), True),
    StructField("associated_entities", ArrayType(associated_entities_schema), True),
    StructField("s3_uri", StringType(), True),
    StructField("file_name", StringType(), True),
    StructField("md5sum", StringType(), True),
    StructField("file_id", StringType(), True),
    StructField("data_type", StringType(), True),
    StructField("submitter_id", StringType(), True),
    StructField("data_category", StringType(), True),
    StructField("state", StringType(), True),
    StructField("experimental_strategy", StringType(), True),
    StructField("file_size", IntegerType(), True)
])

# Now you can use this schema to create a DataFrame
# For example:
#df = spark.createDataFrame(data, schema=images_schema)

# Or when reading from a data source:
df = spark.read.schema(images_schema).json("insert your S3 URI here")


# Show the first few rows
df.show(5)

# Print the schema to verify
df.printSchema()

# Get basic statistics of the numerical columns
#df.describe().show()

# Count total rows
print(f"Total number of records: {df.count()}")

df_dropped = df.drop("associated_entities")

df_dropped.show(5)

df_dropped.createOrReplaceTempView("view_images")

## Create Iceberg table in RMS catalog

This code creates a new table named 'images_rms_table' if it doesn't already exist. The table uses the Apache Iceberg format and is configured with RMS as the AWS write format through table properties.

If the SQL fails the first time, re-run the step.

In [None]:
%%pyspark emr-s.emr-image-processing
spark.sql("""
CREATE TABLE IF NOT EXISTS images_rms_table (
    data_format STRING,
    access STRING,
    s3_uri STRING ,
    file_name STRING ,
    md5sum STRING ,
    file_id STRING ,
    data_type STRING ,
    submitter_id STRING ,
    data_category STRING ,
    state STRING ,
    experimental_strategy STRING ,
    file_size INTEGER 
)USING iceberg TBLPROPERTIES ('aws.write.format'='RMS')
""")

## Add data into iceberg table from temporary view

In [None]:
%%pyspark emr-s.emr-image-processing
spark.sql("insert into public.images_rms_table select * from view_images")

In [None]:
%%pyspark emr-s.emr-image-processing
