In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from spark_utils import init_spark_utils

spark = SparkSession.builder \
    .appName("S3Init") \
    .master("spark://spark-master:7077") \
    .config("spark.executor.memory", "512mb") \
    .config("spark.cores.max", "3") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0,io.delta:delta-storage:3.2.0,org.antlr:antlr4-runtime:4.9.3,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.access.key", "accesskey") \
    .config("spark.hadoop.fs.s3a.secret.key", "secretkey") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.hadoop.hive.metastore.warehouse.dir", "s3a://wba/warehouse") \
    .getOrCreate()

utils = init_spark_utils(spark)

hadoop_conf = spark._jsc.hadoopConfiguration()
print("fs.s3a.endpoint:", hadoop_conf.get("fs.s3a.endpoint"))
print("fs.s3a.access.key:", hadoop_conf.get("fs.s3a.access.key"))
print("fs.s3a.secret.key:", hadoop_conf.get("fs.s3a.secret.key"))
print("fs.s3a.path.style.access:", hadoop_conf.get("fs.s3a.path.style.access"))

fs.s3a.endpoint: http://minio:9000
fs.s3a.access.key: accesskey
fs.s3a.secret.key: secretkey
fs.s3a.path.style.access: true


In [2]:
spark.sql("SHOW DATABASES").show()

+------------+
|   namespace|
+------------+
|     default|
|warehouse_db|
+------------+



In [3]:
# Criar e verificar uma tabela teste
spark.sql("CREATE DATABASE IF NOT EXISTS warehouse_db")
spark.sql("SHOW DATABASES").show()
spark.sql("USE warehouse_db")

+------------+
|   namespace|
+------------+
|     default|
|warehouse_db|
+------------+



DataFrame[]

In [4]:
spark.sql("USE warehouse_db")
spark.sql("SHOW TABLES").show()

+------------+---------+-----------+
|   namespace|tableName|isTemporary|
+------------+---------+-----------+
|warehouse_db|new_table|      false|
+------------+---------+-----------+



In [None]:
data = [
    {"first_name": "bob", "age": 47},
    {"first_name": "li", "age": 23},
    {"first_name": "leah", "age": 51},
    {"first_name": "ivan", "age": 30},
]
df = spark.createDataFrame(data=data)
df.show()

In [None]:
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("new_table")

In [5]:
df = spark.read.parquet("s3a://wba/warehouse/warehouse_db.db/new_table")
df.show()

+---+----------+
|age|first_name|
+---+----------+
| 51|      leah|
| 30|      ivan|
| 47|       bob|
| 23|        li|
+---+----------+



In [6]:
spark.catalog.listDatabases()

[Database(name='default', catalog='spark_catalog', description='Default Hive database', locationUri='s3a://wba/warehouse'),
 Database(name='warehouse_db', catalog='spark_catalog', description='', locationUri='s3a://wba/warehouse/warehouse_db.db')]