In [9]:
from pyspark import SparkContext 
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame

sc = SparkContext.getOrCreate()
gc = GlueContext(sc)
spark = gc.spark_session

plist = '[{"Product":"P1","Qty":"10"},{"Product":"P2","Qty":"5"}]'

spark_csv = spark.read.option("delimiter", ";").csv("teste.csv", header=True)
spark_df = spark.read.json(sc.parallelize([plist]))
glue_df_csv = DynamicFrame.fromDF(spark_csv, gc, "glue_df")
glue_df = DynamicFrame.fromDF(spark_df, gc, "glue_df")

glue_df.printSchema()
glue_df.toDF().show()

glue_df_csv.toDF().show()

root
|-- Product: string
|-- Qty: string

+-------+---+
|Product|Qty|
+-------+---+
|     P1| 10|
|     P2|  5|
+-------+---+

+---+------+-----+
| id|  nome|idade|
+---+------+-----+
|  1|   Ana|   60|
|  2|Carine|   26|
+---+------+-----+



In [11]:
import boto3
import os
from pyspark.sql import SparkSession

In [14]:
def add_to_bucket(bucket_name: str, file_name: str):
    try:
        # host.docker.internal
        s3 = boto3.client('s3',
                          endpoint_url="http://host.docker.internal:4566",
                          use_ssl=False,
                          aws_access_key_id='mock',
                          aws_secret_access_key='mock',
                          region_name='us-east-1')
        s3.create_bucket(Bucket=bucket_name)

        file_key = f'{os.getcwd()}/{file_name}'
        with open(file_key, 'rb') as f:
            s3.put_object(Body=f, Bucket=bucket_name, Key=file_name)
        print(file_name)

        return s3
    except Exception as e:
        print(e)
        return None

In [16]:
def create_testing_pyspark_session():
    print('creating pyspark session')
    sparksession = (SparkSession.builder
                    .master('local[2]')
                    .appName('pyspark-demo')
                    .enableHiveSupport()
                    .getOrCreate())

    hadoop_conf = sparksession.sparkContext._jsc.hadoopConfiguration()
    hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    hadoop_conf.set("fs.s3a.path.style.access", "true")
    hadoop_conf.set("fs.s3a.connection.ssl.enabled", "false")
    hadoop_conf.set("com.amazonaws.services.s3a.enableV4", "true")
    hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
    hadoop_conf.set("fs.s3a.access.key", "mock")
    hadoop_conf.set("fs.s3a.secret.key", "mock")
    hadoop_conf.set("fs.s3a.session.token", "mock")
    hadoop_conf.set("fs.s3a.endpoint", "http://host.docker.internal:4566")
    return sparksession

In [21]:
test_bucket = 'teste'
# Write to S3 bucket
add_to_bucket(bucket_name=test_bucket, file_name='teste.csv')
spark_session = create_testing_pyspark_session()
file_path = f's3://{test_bucket}/teste.csv'

# Read from s3 bucket
data_df = spark_session.read.option('delimiter', ';').option('header', 'true').option('inferSchema',
                                                                                      'False').format('csv').load(file_path)
print(data_df.show())

teste.csv
creating pyspark session
+---+------+-----+
| id|  nome|idade|
+---+------+-----+
|  1|   Ana|   60|
|  2|Carine|   26|
+---+------+-----+

None


In [22]:
write_path = f's3a://{test_bucket}/testparquet/'
data_df.write.parquet(write_path, mode='overwrite')