# Setup

In [1]:
import boto3

from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql import SparkSession

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,,pyspark,idle,,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
S3_MOCK_ENDPOINT = "http://localstack:4566"
BUCKET = 'glue-bucket'

# Values do not really matter
ACCESS_KEY = "mock"
SECRET_KEY = "mock"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
glueContext = GlueContext(SparkContext.getOrCreate())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Create spark session and configure S3A for writing to S3
spark = SparkSession.builder.getOrCreate()
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.path.style.access", "true")
hadoop_conf.set("fs.s3a.connection.ssl.enabled", "false")
hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", ACCESS_KEY)
hadoop_conf.set("fs.s3a.secret.key", SECRET_KEY)
hadoop_conf.set("fs.s3a.session.token", "mock")
hadoop_conf.set("fs.s3a.endpoint", S3_MOCK_ENDPOINT)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Glue example 

Taken from https://aws.amazon.com/blogs/big-data/building-an-aws-glue-etl-pipeline-locally-without-an-aws-account/

In [5]:

order_list = [
    ['1005', '623', 'YES', '1418901234', '75091'], \
    ['1006', '547', 'NO', '1418901256', '75034'], \
    ['1007', '823', 'YES', '1418901300', '75023'], \
    ['1008', '912', 'NO', '1418901400', '82091'], \
    ['1009', '321', 'YES', '1418902000', '90093'] \
    ]


# Define schema for the order_list
order_schema = StructType([
    StructField("order_id", StringType()),
    StructField("customer_id", StringType()),
    StructField("essential_item", StringType()),
    StructField("timestamp", StringType()),
    StructField("zipcode", StringType())
])

# Create a Spark Dataframe from the python list and the schema
df_orders = spark.createDataFrame(order_list, schema=order_schema)

dyf_orders = DynamicFrame.fromDF(df_orders, glueContext, "dyf")

# Input 
dyf_applyMapping = ApplyMapping.apply(frame=dyf_orders, mappings=[
    ("order_id", "String", "order_id", "Long"),
    ("customer_id", "String", "customer_id", "Long"),
    ("essential_item", "String", "essential_item", "String"),
    ("timestamp", "String", "timestamp", "Long"),
    ("zipcode", "String", "zip", "Long")
])


def next_day_air(rec):
    if rec["zip"] == 75034:
        rec["next_day_air"] = True
    return rec


mapped_dyF = Map.apply(frame=dyf_applyMapping, f=next_day_air)

glueContext.write_dynamic_frame.from_options(
    frame=mapped_dyF,
    connection_options={'path': f's3a://{BUCKET}/output'},
    connection_type='s3',
    format='json'
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<awsglue.dynamicframe.DynamicFrame object at 0x7fef6a868e10>

# Browse output using Boto

In [6]:
# Create connection to S3
s3 = boto3.client(
    's3',
    region_name="eu-central-1",
    endpoint_url=S3_MOCK_ENDPOINT,
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# List items in bucket
response = s3.list_objects(Bucket=BUCKET)
for item in response['Contents']:
    print(item['Key'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

output/run-1649940782552-part-r-00000
output/run-1649942603534-part-r-00000