In [1]:
%%configure -f
{
    "conf": {
        "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
        "spark.sql.hive.convertMetastoreParquet": "false",
        "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
        "spark.sql.legacy.pathOptionBehavior.enabled": "true",
        "spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
    }
}

In [2]:
# Import all the necessary libraries 

import os, uuid, sys, boto3, time, sys
from pyspark.context import SparkContext
from pyspark.sql.functions import lit, udf
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame



spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://minio:9000/")
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "minioadmin")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "minioadmin")
spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")

glueContext = GlueContext(sc)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,,pyspark,idle,,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [4]:
# Read S3 sample csv file using pyspark dataframe
df = spark.read.option("header","true").option("inferSchema","true").csv("s3a://my-glue-bucket/input/annual-enterprise-survey-2023-financial-year-provisional.csv")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# print schema and check
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Year: integer (nullable = true)
 |-- Industry_aggregation_NZSIOC: string (nullable = true)
 |-- Industry_code_NZSIOC: string (nullable = true)
 |-- Industry_name_NZSIOC: string (nullable = true)
 |-- Units: string (nullable = true)
 |-- Variable_code: string (nullable = true)
 |-- Variable_name: string (nullable = true)
 |-- Variable_category: string (nullable = true)
 |-- Value: string (nullable = true)
 |-- Industry_code_ANZSIC06: string (nullable = true)

In [6]:
# Show sample data
df.show(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+---------------------------+--------------------+--------------------+------------------+-------------+-------------+--------------------+------+----------------------+
|Year|Industry_aggregation_NZSIOC|Industry_code_NZSIOC|Industry_name_NZSIOC|             Units|Variable_code|Variable_name|   Variable_category| Value|Industry_code_ANZSIC06|
+----+---------------------------+--------------------+--------------------+------------------+-------------+-------------+--------------------+------+----------------------+
|2023|                    Level 1|               99999|      All industries|Dollars (millions)|          H01| Total income|Financial perform...|930995|  ANZSIC06 division...|
+----+---------------------------+--------------------+--------------------+------------------+-------------+-------------+--------------------+------+----------------------+
only showing top 1 row

In [7]:
# MinIO connection configuration
aws_access_key = "minioadmin"
aws_secret_key = "minioadmin"
s3_endpoint_url = "http://minio:9000"  # MinIO's endpoint URL
file_path = f"s3a://my-glue-bucket/input/annual-enterprise-survey-2023-financial-year-provisional.csv" # Path to your file in MinIO S3
dynamic_frame = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [file_path]},
    format="csv",
    format_options={"withHeader": True, "inferSchema": True} 
)
dynamic_frame.show(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{"Industry_name_NZSIOC": "All industries", "Industry_code_NZSIOC": "99999", "Value": "930995", "Variable_name": "Total income", "Industry_code_ANZSIC06": "ANZSIC06 divisions A-S (excluding classes K6330, L6711, O7552, O760, O771, O772, S9540, S9601, S9602, and S9603)", "Industry_aggregation_NZSIOC": "Level 1", "Variable_category": "Financial performance", "Variable_code": "H01", "Year": "2023", "Units": "Dollars (millions)"}

In [8]:
# Filter on dynamic frame. Also you can try other transformations as you wish.
dynamic_frame_filtered = dynamic_frame.filter(lambda x:x["Value"] > "80000")
dynamic_frame_filtered.show(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{"Industry_name_NZSIOC": "All industries", "Industry_code_NZSIOC": "99999", "Value": "930995", "Variable_name": "Total income", "Industry_code_ANZSIC06": "ANZSIC06 divisions A-S (excluding classes K6330, L6711, O7552, O760, O771, O772, S9540, S9601, S9602, and S9603)", "Industry_aggregation_NZSIOC": "Level 1", "Variable_category": "Financial performance", "Variable_code": "H01", "Year": "2023", "Units": "Dollars (millions)"}

In [9]:
output_file_path = f"s3a://my-glue-bucket/output/filtered/" 
# Write DynamicFrame to S3 in CSV format
glueContext.write_dynamic_frame.from_options(
    frame=dynamic_frame_filtered,
    connection_type="s3",
    connection_options={"path": output_file_path},
    format="csv",
    format_options={"withHeader": True}
)
print("successfully written to s3")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

successfully written to s3

In [11]:
# stop spark context
spark.stop()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…