In [1]:
spark.stop()

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import split
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import *
import sys
import os

In [None]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
spark = SparkSession.builder \
    .appName("Batch Log Processing with Iceberg") \
    .config("spark.ui.port", "4041") \
    .config(
        "spark.jars",
        ",".join([
            "/opt/spark/jars/spark-sql-kafka-0-10_2.12-3.5.5.jar",
            "/opt/spark/jars/hadoop-aws-3.2.2.jar",
            "/opt/spark/jars/aws-java-sdk-1.11.375.jar",
            "/opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.8.1.jar",
            "/opt/spark/jars/postgresql-42.5.0.jar",
            "/opt/spark/jars/commons-pool2-2.11.1.jar"
        ])
    ) \
    .config("spark.sql.catalog.hadoop_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.hadoop_catalog.type", "hadoop") \
    .config("spark.sql.catalog.hadoop_catalog.warehouse", "s3a://warehouse/logs") \
    .config("spark.sql.catalog.hadoop_catalog.io", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.catalog.hadoop_catalog.s3.endpoint", "http://minio:9000") \
    .config("spark.sql.catalog.hadoop_catalog.s3.access.key.id", "admin") \
    .config("spark.sql.catalog.hadoop_catalog.s3.secret.access.key", "password") \
    .config("spark.sql.catalog.hadoop_catalog.s3.region", "us-east-1") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "admin") \
    .config("spark.hadoop.fs.s3a.secret.key", "password") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .getOrCreate()

25/11/19 05:13:37 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [5]:
spark.sparkContext.setLogLevel("ERROR")

In [6]:

customers_df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "customers_dataset") \
    .load()

geolocation_df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "geolocation_dataset") \
    .load()

order_items_df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "order_items_dataset") \
    .load()

order_payments_df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "order_payments_dataset") \
    .load()

order_reviews_df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "order_reviews_dataset") \
    .load()

orders_df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "orders_dataset") \
    .load()

products_df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "products_dataset") \
    .load()

sellers_df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "sellers_dataset") \
    .load()

product_category_name_translation_df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "product_category_name_translation") \
    .load()

In [7]:

# Convertir value a STRING
#logs_df = logs_df.selectExpr("CAST(value AS STRING) as log_line")
customers_df = customers_df.selectExpr("CAST(value AS STRING) as customers_line")
geolocation_df = geolocation_df.selectExpr("CAST(value AS STRING) as geolocation_line")
order_items_df = order_items_df.selectExpr("CAST(value AS STRING) as order_items_line")
order_payments_df = order_payments_df.selectExpr("CAST(value AS STRING) as order_payments_line")
order_reviews_df = order_reviews_df.selectExpr("CAST(value AS STRING) as order_reviews_line")
orders_df = orders_df.selectExpr("CAST(value AS STRING) as orders_line")
products_df = products_df.selectExpr("CAST(value AS STRING) as products_line")
sellers_df = sellers_df.selectExpr("CAST(value AS STRING) as sellers_line")
product_category_name_translation_df = product_category_name_translation_df.selectExpr("CAST(value AS STRING) as product_category_name_translation_line")


In [8]:
# castear el string de filebeat de customers
if len(customers_df.head(1)) > 0:
    json_rdd = customers_df.select("customers_line").rdd.map(lambda r: r[0])
    json_df = spark.read.json(json_rdd)
    
    clean_df = json_df.withColumn("message_clean", regexp_replace(col("message"), '\\"', '"'))
    clean_df = clean_df.withColumn("message_fields", split(col("message_clean"), ","))
    
    clean_df = clean_df.withColumn("customer_id", col("message_fields").getItem(0)) \
                       .withColumn("customer_unique_id", col("message_fields").getItem(1)) \
                       .withColumn("customer_zip_code_prefix", col("message_fields").getItem(2)) \
                       .withColumn("customer_city", col("message_fields").getItem(3)) \
                       .withColumn("customer_state", col("message_fields").getItem(4))
    #clean_df = clean_df.withColumn("customer_zip_code_prefix", col("customer_zip_code_prefix").cast("int"))
    #clean_df = clean_df.withColumn(
    #    "customer_zip_code_prefix",
    #    regexp_replace(col("customer_zip_code_prefix"), '"', '').cast("int")
    #)
    clean_df1 = clean_df[['customer_id', 'customer_unique_id', 'customer_zip_code_prefix', 'customer_city', 'customer_state']]

                                                                                

In [9]:
# castear el string de filebeat de geolocation_line
if len(geolocation_df.head(1)) > 0:
    
    json_rdd = geolocation_df.select("geolocation_line").rdd.map(lambda r: r[0])
    json_df = spark.read.json(json_rdd)
    
    clean_df = json_df.withColumn("message_clean", regexp_replace(col("message"), '\\"', '"'))
    clean_df = clean_df.withColumn("message_fields", split(col("message_clean"), ","))
    
    clean_df = clean_df.withColumn("geolocation_zip_code_prefix", col("message_fields").getItem(0)) \
                       .withColumn("geolocation_latitude", col("message_fields").getItem(1)) \
                       .withColumn("geolocation_longitude", col("message_fields").getItem(2)) \
                       .withColumn("geolocation_city", col("message_fields").getItem(3)) \
                       .withColumn("geolocation_state", col("message_fields").getItem(4))
    
    clean_df2 = clean_df[['geolocation_zip_code_prefix','geolocation_latitude','geolocation_longitude','geolocation_city','geolocation_state']]

                                                                                

In [10]:
# castear el string de filebeat de order_items_line
if len(order_items_df.head(1)) > 0:
    
    json_rdd = order_items_df.select("order_items_line").rdd.map(lambda r: r[0])
    json_df = spark.read.json(json_rdd)
    
    clean_df = json_df.withColumn("message_clean", regexp_replace(col("message"), '\\"', '"'))
    clean_df = clean_df.withColumn("message_fields", split(col("message_clean"), ","))
    
    clean_df = clean_df.withColumn("order_id", col("message_fields").getItem(0)) \
                       .withColumn("order_item_id", col("message_fields").getItem(1)) \
                       .withColumn("product_id", col("message_fields").getItem(2)) \
                       .withColumn("seller_id", col("message_fields").getItem(3)) \
                       .withColumn("shipping_limit_date", col("message_fields").getItem(4)) \
                       .withColumn("price", col("message_fields").getItem(5)) \
                       .withColumn("freight_value", col("message_fields").getItem(6))
    
    clean_df3 = clean_df[['order_id','order_item_id','product_id','seller_id','shipping_limit_date','price','freight_value']]
    

                                                                                

In [11]:

# castear el string de filebeat de order_payments_line
if len(order_payments_df.head(1)) > 0:
    
    json_rdd = order_payments_df.select("order_payments_line").rdd.map(lambda r: r[0])
    json_df = spark.read.json(json_rdd)
    
    clean_df = json_df.withColumn("message_clean", regexp_replace(col("message"), '\\"', '"'))
    clean_df = clean_df.withColumn("message_fields", split(col("message_clean"), ","))
    
    clean_df = clean_df.withColumn("order_id", col("message_fields").getItem(0)) \
                       .withColumn("payment_sequential", col("message_fields").getItem(1)) \
                       .withColumn("payment_type", col("message_fields").getItem(2)) \
                       .withColumn("payment_installments", col("message_fields").getItem(3)) \
                       .withColumn("payment_value", col("message_fields").getItem(4))
    
    clean_df4 = clean_df[['order_id','payment_sequential','payment_type','payment_installments','payment_value']]
    

                                                                                

In [12]:

# castear el string de filebeat de order_reviews_line
if len(order_reviews_df.head(1)) > 0:
    
    json_rdd = order_reviews_df.select("order_reviews_line").rdd.map(lambda r: r[0])
    json_df = spark.read.json(json_rdd)
    
    clean_df = json_df.withColumn("message_clean", regexp_replace(col("message"), '\\"', '"'))
    clean_df = clean_df.withColumn("message_fields", split(col("message_clean"), ","))
    
    clean_df = clean_df.withColumn("review_id", col("message_fields").getItem(0)) \
                       .withColumn("order_id", col("message_fields").getItem(1)) \
                       .withColumn("review_score", col("message_fields").getItem(2)) \
                       .withColumn("review_comment_title", col("message_fields").getItem(3)) \
                       .withColumn("review_comment_message", col("message_fields").getItem(4)) \
                       .withColumn("review_creation_date", col("message_fields").getItem(5)) \
                       .withColumn("review_answer_timestamp", col("message_fields").getItem(6))
    
    clean_df5 = clean_df[['review_id','order_id','review_score','review_comment_title','review_comment_message','review_creation_date','review_answer_timestamp']]
    

                                                                                

In [13]:

# castear el string de filebeat de orders_line
if len(orders_df.head(1)) > 0:
    
    json_rdd = orders_df.select("orders_line").rdd.map(lambda r: r[0])
    json_df = spark.read.json(json_rdd)
    
    clean_df = json_df.withColumn("message_clean", regexp_replace(col("message"), '\\"', '"'))
    clean_df = clean_df.withColumn("message_fields", split(col("message_clean"), ","))
    
    clean_df = clean_df.withColumn("order_id", col("message_fields").getItem(0)) \
                       .withColumn("customer_id", col("message_fields").getItem(1)) \
                       .withColumn("order_status", col("message_fields").getItem(2)) \
                       .withColumn("order_purchase_timestamp", col("message_fields").getItem(3)) \
                       .withColumn("order_approved_at", col("message_fields").getItem(4)) \
                       .withColumn("order_delivered_carrier_date", col("message_fields").getItem(5)) \
                       .withColumn("order_delivered_customer_date", col("message_fields").getItem(6)) \
                       .withColumn("order_estimated_delivery_date", col("message_fields").getItem(6))
    
    clean_df6 = clean_df[['order_id','customer_id','order_status','order_purchase_timestamp','order_approved_at','order_delivered_carrier_date','order_delivered_customer_date','order_estimated_delivery_date']]
    

                                                                                

In [14]:

# castear el string de filebeat de products_line
if len(products_df.head(1)) > 0:
    
    json_rdd = products_df.select("products_line").rdd.map(lambda r: r[0])
    json_df = spark.read.json(json_rdd)
    
    clean_df = json_df.withColumn("message_clean", regexp_replace(col("message"), '\\"', '"'))
    clean_df = clean_df.withColumn("message_fields", split(col("message_clean"), ","))
    
    clean_df = clean_df.withColumn("product_id", col("message_fields").getItem(0)) \
                       .withColumn("product_category_name", col("message_fields").getItem(1)) \
                       .withColumn("product_name_lenght", col("message_fields").getItem(2)) \
                       .withColumn("product_description_lenght", col("message_fields").getItem(3)) \
                       .withColumn("product_photos_qty", col("message_fields").getItem(4)) \
                       .withColumn("product_weight_g", col("message_fields").getItem(5)) \
                       .withColumn("product_length_cm", col("message_fields").getItem(6)) \
                       .withColumn("product_height_cm", col("message_fields").getItem(7)) \
                       .withColumn("product_width_cm", col("message_fields").getItem(8))
    
    clean_df7 = clean_df[['product_id','product_category_name','product_name_lenght','product_description_lenght','product_photos_qty','product_weight_g','product_length_cm','product_height_cm','product_width_cm']]
    

                                                                                

In [15]:

# castear el string de filebeat de sellers_line
if len(sellers_df.head(1)) > 0:
    
    json_rdd = sellers_df.select("sellers_line").rdd.map(lambda r: r[0])
    json_df = spark.read.json(json_rdd)
    
    clean_df = json_df.withColumn("message_clean", regexp_replace(col("message"), '\\"', '"'))
    clean_df = clean_df.withColumn("message_fields", split(col("message_clean"), ","))
    
    clean_df = clean_df.withColumn("seller_id", col("message_fields").getItem(0)) \
                       .withColumn("seller_zip_code_prefix", col("message_fields").getItem(1)) \
                       .withColumn("seller_city", col("message_fields").getItem(2)) \
                       .withColumn("seller_state", col("message_fields").getItem(3)) 
    
    clean_df8 = clean_df[['seller_id','seller_zip_code_prefix','seller_city','seller_state']]
    

In [16]:

# castear el string de filebeat de product_category_name_translation_line
if len(product_category_name_translation_df.head(1)) > 0:
    
    json_rdd = product_category_name_translation_df.select("product_category_name_translation_line").rdd.map(lambda r: r[0])
    json_df = spark.read.json(json_rdd)
    
    clean_df = json_df.withColumn("message_clean", regexp_replace(col("message"), '\\"', '"'))
    clean_df = clean_df.withColumn("message_fields", split(col("message_clean"), ","))
    
    clean_df = clean_df.withColumn("product_category_name", col("message_fields").getItem(0)) \
                       .withColumn("product_category_name_english", col("message_fields").getItem(1))
    
    clean_df9 = clean_df[['product_category_name','product_category_name_english']]
    

In [18]:
spark.sql("DROP TABLE IF EXISTS hadoop_catalog.default.logs")
spark.sql("DROP TABLE IF EXISTS hadoop_catalog.default.customers")
spark.sql("DROP TABLE IF EXISTS hadoop_catalog.default.geolocation")
spark.sql("DROP TABLE IF EXISTS hadoop_catalog.default.order_items")
spark.sql("DROP TABLE IF EXISTS hadoop_catalog.default.order_payments")
spark.sql("DROP TABLE IF EXISTS hadoop_catalog.default.order_reviews")
spark.sql("DROP TABLE IF EXISTS hadoop_catalog.default.orders")
spark.sql("DROP TABLE IF EXISTS hadoop_catalog.default.products")
spark.sql("DROP TABLE IF EXISTS hadoop_catalog.default.sellers")
spark.sql("DROP TABLE IF EXISTS hadoop_catalog.default.product_category_name_translation")

DataFrame[]

In [None]:
NO
spark.sql("""
CREATE TABLE IF NOT EXISTS hadoop_catalog.default.logs (
    log_line STRING
)
USING iceberg
LOCATION 's3a://warehouse/logs/default/logs'
""")


spark.sql("""
CREATE TABLE IF NOT EXISTS hadoop_catalog.default.customers (
    customers_line STRING
)
USING iceberg
LOCATION 's3a://warehouse/logs/default/customers'
""")

In [19]:

spark.sql("""
CREATE TABLE IF NOT EXISTS hadoop_catalog.default.customers (
    customer_id STRING,
    customer_unique_id STRING,
    customer_zip_code_prefix STRING,
    customer_city STRING,
    customer_state STRING
)
USING iceberg
LOCATION 's3a://warehouse/logs/default/customers'
""")

DataFrame[]

In [20]:

spark.sql("""
CREATE TABLE IF NOT EXISTS hadoop_catalog.default.geolocation (
    geolocation_zip_code_prefix STRING,
    geolocation_latitude STRING,
    geolocation_longitude STRING,
    geolocation_city STRING,
    geolocation_state STRING
)
USING iceberg
LOCATION 's3a://warehouse/logs/default/geolocation'
""")

DataFrame[]

In [21]:
spark.sql("""
CREATE TABLE IF NOT EXISTS hadoop_catalog.default.order_items (
    order_id STRING,
    order_item_id STRING,
    product_id STRING,
    seller_id STRING,
    shipping_limit_date STRING,
    price STRING,
    freight_value STRING
)
USING iceberg
LOCATION 's3a://warehouse/logs/default/order_items'
""")

DataFrame[]

In [22]:
spark.sql("""
CREATE TABLE IF NOT EXISTS hadoop_catalog.default.order_payments (
    order_id STRING,
    payment_sequential STRING,
    payment_type STRING,
    payment_installments STRING,
    payment_value STRING
)
USING iceberg
LOCATION 's3a://warehouse/logs/default/order_payments'
""")

DataFrame[]

In [23]:
spark.sql("""
CREATE TABLE IF NOT EXISTS hadoop_catalog.default.order_reviews (
    review_id STRING,
    order_id STRING,
    review_score STRING,
    review_comment_title STRING,
    review_comment_message STRING,
    review_creation_date STRING,
    review_answer_timestamp STRING
)
USING iceberg
LOCATION 's3a://warehouse/logs/default/order_reviews'
""")

DataFrame[]

In [24]:
spark.sql("""
CREATE TABLE IF NOT EXISTS hadoop_catalog.default.orders (
    order_id STRING,
    customer_id STRING,
    order_status STRING,
    order_purchase_timestamp STRING,
    order_approved_at STRING,
    order_delivered_carrier_date STRING,
    order_delivered_customer_date STRING,
    order_estimated_delivery_date STRING
)
USING iceberg
LOCATION 's3a://warehouse/logs/default/orders'
""")

DataFrame[]

In [25]:
spark.sql("""
CREATE TABLE IF NOT EXISTS hadoop_catalog.default.products (
    product_id STRING,
    product_category_name STRING,
    product_name_lenght STRING,
    product_description_lenght STRING,
    product_photos_qty STRING,
    product_weight_g STRING,
    product_length_cm STRING,
    product_height_cm STRING,
    product_width_cm STRING
)
USING iceberg
LOCATION 's3a://warehouse/logs/default/products'
""")

DataFrame[]

In [26]:
spark.sql("""
CREATE TABLE IF NOT EXISTS hadoop_catalog.default.sellers (
    seller_id STRING,
    seller_zip_code_prefix STRING,
    seller_city STRING,
    seller_state STRING
)
USING iceberg
LOCATION 's3a://warehouse/logs/default/sellers'
""")

DataFrame[]

In [27]:
spark.sql("""
CREATE TABLE IF NOT EXISTS hadoop_catalog.default.product_category_name_translation (
    product_category_name STRING,
    product_category_name_english STRING
)
USING iceberg
LOCATION 's3a://warehouse/logs/default/product_category_name_translation'
""")

DataFrame[]

In [28]:
clean_df1.writeTo("hadoop_catalog.default.customers").append()
clean_df2.writeTo("hadoop_catalog.default.geolocation").append()
clean_df3.writeTo("hadoop_catalog.default.order_items").append()
clean_df4.writeTo("hadoop_catalog.default.order_payments").append()
clean_df5.writeTo("hadoop_catalog.default.order_reviews").append()
clean_df6.writeTo("hadoop_catalog.default.orders").append()
clean_df7.writeTo("hadoop_catalog.default.products").append()
clean_df8.writeTo("hadoop_catalog.default.sellers").append()
clean_df9.writeTo("hadoop_catalog.default.product_category_name_translation").append()

                                                                                

In [None]:

no
# Si prefieres escribir en PostgreSQL, usa este código
logs_parsed.write \
 .format("jdbc") \
 .option("url", "jdbc:postgresql://postgres:5432/iceberg") \
 .option("dbtable", "logs") \
 .option("user", "iceberg") \
 .option("password", "password") \
 .option("driver", "org.postgresql.Driver") \
 .mode("append") \
 .save()


In [29]:

df_test = spark.sql("SELECT * FROM hadoop_catalog.default.customers LIMIT 5")
df_test.show(truncate=False)


+----------------------------------+----------------------------------+--------------------------+---------------------+----------------+
|customer_id                       |customer_unique_id                |customer_zip_code_prefix  |customer_city        |customer_state  |
+----------------------------------+----------------------------------+--------------------------+---------------------+----------------+
|b2d1536598b73a9abd18e0d75d92f0a3  |"918dc87cd72cd9f6ed4bd442ed785235"|"18682"                   |lencois paulista     |SP              |
|"customer_id"                     |"customer_unique_id"              |"customer_zip_code_prefix"|"customer_city"      |"customer_state"|
|"06b8999e2fba1a1fbc88172c00ba8bc7"|"861eff4711a542e4b93843c6dd7febb0"|"14409"                   |franca               |SP              |
|"18955e83d337fd6b2def6b18a428ac77"|"290c77bc529b7ac935b93aa66c333dc3"|"09790"                   |sao bernardo do campo|SP              |
|"4e7b3e00288586ebd08712fdd0374a03

In [31]:
df_test = spark.sql("SELECT * FROM hadoop_catalog.default.geolocation LIMIT 5")
df_test.show(truncate=False)

+-----------------------------+--------------------+---------------------+------------------+-------------------+
|geolocation_zip_code_prefix  |geolocation_latitude|geolocation_longitude|geolocation_city  |geolocation_state  |
+-----------------------------+--------------------+---------------------+------------------+-------------------+
|"01035"                      |-23.541577961711493 |-46.64160722329613   |sao paulo         |SP                 |
|"01012"                      |-23.547762303364266 |-46.63536053788448   |são paulo         |SP                 |
|"01047"                      |-23.546273112412678 |-46.64122516971552   |sao paulo         |SP                 |
|"01013"                      |-23.546923208436723 |-46.6342636964915    |sao paulo         |SP                 |
|"geolocation_zip_code_prefix"|"geolocation_lat"   |"geolocation_lng"    |"geolocation_city"|"geolocation_state"|
+-----------------------------+--------------------+---------------------+--------------

In [32]:
df_test = spark.sql("SELECT * FROM hadoop_catalog.default.order_items LIMIT 5")
df_test.show(truncate=False)

+----------------------------------+---------------+----------------------------------+----------------------------------+---------------------+-------+---------------+
|order_id                          |order_item_id  |product_id                        |seller_id                         |shipping_limit_date  |price  |freight_value  |
+----------------------------------+---------------+----------------------------------+----------------------------------+---------------------+-------+---------------+
|"00048cc3ae777c65dbb7d2a0634bc1ea"|1              |ef92defde845ab8450f9d70c526ef70f  |"6426d21aca402a131fc0a5d0960a3c90"|2017-05-23 03:55:27  |21.90  |12.69          |
|"00054e8431b9d7675808bcb819fb4a32"|1              |"8d4f2bb7e93e6710a28f34fa83ee7d28"|"7040e82f899a04d1b434b795a43b4617"|2017-12-14 12:10:31  |19.90  |11.85          |
|"000576fe39319847cbb9d288c5617fa6"|1              |"557d850972a7d6f792fd18ae1400d9b6"|"5996cddab893a4652a15592fb58ab8db"|2018-07-10 12:30:45  |810.00 |70.

In [33]:
df_test = spark.sql("SELECT * FROM hadoop_catalog.default.order_payments LIMIT 5")
df_test.show(truncate=False)

+----------------------------------+------------------+------------+--------------------+-------------+
|order_id                          |payment_sequential|payment_type|payment_installments|payment_value|
+----------------------------------+------------------+------------+--------------------+-------------+
|ba78997921bbcdc1373bb41e913ab953  |1                 |credit_card |8                   |107.78       |
|"42fdf880ba16b47b59251dd489d4441a"|1                 |credit_card |2                   |128.45       |
|"298fcdf1f73eb413e4d26d01b25bc1cd"|1                 |credit_card |2                   |96.12        |
|"771ee386b001f06208a7419e4fc1bbd7"|1                 |credit_card |1                   |81.16        |
|"3d7239c394a212faae122962df514ac7"|1                 |credit_card |3                   |51.84        |
+----------------------------------+------------------+------------+--------------------+-------------+



In [34]:
df_test = spark.sql("SELECT * FROM hadoop_catalog.default.order_reviews LIMIT 5")
df_test.show(truncate=False)

+----------------------------------+----------------------------------+------------+--------------------+----------------------------------------------------------------------------------------------------+--------------------+-----------------------+
|review_id                         |order_id                          |review_score|review_comment_title|review_comment_message                                                                              |review_creation_date|review_answer_timestamp|
+----------------------------------+----------------------------------+------------+--------------------+----------------------------------------------------------------------------------------------------+--------------------+-----------------------+
|"e64fb393e7b32834bb789ff8bb30750e"|"658677c97b385a9be170737859d3511b"|5           |                    |Recebi bem antes do prazo estipulado.                                                               |2017-04-21 00:00:00 |2017-04-21 22:02:

In [35]:
df_test = spark.sql("SELECT * FROM hadoop_catalog.default.orders LIMIT 5")
df_test.show(truncate=False)

+----------------------------------+----------------------------------+--------------+--------------------------+-------------------+------------------------------+-------------------------------+-------------------------------+
|order_id                          |customer_id                       |order_status  |order_purchase_timestamp  |order_approved_at  |order_delivered_carrier_date  |order_delivered_customer_date  |order_estimated_delivery_date  |
+----------------------------------+----------------------------------+--------------+--------------------------+-------------------+------------------------------+-------------------------------+-------------------------------+
|ad21c59c0840e6cb83a9ceb5573f8159  |"8ab97904e6daea8866dbdbc4fb7aad2c"|delivered     |2018-02-13 21:18:39       |2018-02-13 22:20:29|2018-02-14 19:46:34           |2018-02-16 18:17:02            |2018-02-16 18:17:02            |
|a4591c265e18cb1dcee52889e2d8acc3  |"503740e9ca751ccdda7ba28e9ab8f608"|delivered    

In [36]:
df_test = spark.sql("SELECT * FROM hadoop_catalog.default.products LIMIT 5")
df_test.show(truncate=False)

+----------------------------------+-----------------------+---------------------+----------------------------+--------------------+------------------+-------------------+-------------------+------------------+
|product_id                        |product_category_name  |product_name_lenght  |product_description_lenght  |product_photos_qty  |product_weight_g  |product_length_cm  |product_height_cm  |product_width_cm  |
+----------------------------------+-----------------------+---------------------+----------------------------+--------------------+------------------+-------------------+-------------------+------------------+
|"9dc1a7de274444849c219cff195d0b71"|utilidades_domesticas  |37                   |402                         |4                   |625               |20                 |17                 |13                |
|"41d3672d4792049fa1779bb35283ed13"|instrumentos_musicais  |60                   |745                         |1                   |200               |38   

In [37]:
df_test = spark.sql("SELECT * FROM hadoop_catalog.default.sellers LIMIT 5")
df_test.show(truncate=False)

+----------------------------------+----------------------+-----------------+------------+
|seller_id                         |seller_zip_code_prefix|seller_city      |seller_state|
+----------------------------------+----------------------+-----------------+------------+
|c0f3eea2e14555b6faeea3dd58c1b1c3  |"04195"               |sao paulo        |SP          |
|"51a04a8a6bdcb23deccc82b0b80742cf"|"12914"               |braganca paulista|SP          |
|c240c4061717ac1806ae6ee72be3533b  |"20920"               |rio de janeiro   |RJ          |
|e49c26c3edfa46d227d5121a6b6e4d37  |"55325"               |brejao           |PE          |
|"1b938a7ec6ac5061a66a3766e0e75f90"|"16304"               |penapolis        |SP          |
+----------------------------------+----------------------+-----------------+------------+



In [3]:
df_test = spark.sql("SELECT * FROM hadoop_catalog.default.product_category_name_translation LIMIT 5")
df_test.show(truncate=False)

25/11/19 05:14:01 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
25/11/19 05:14:01 WARN FileSystem: Failed to initialize fileystem s3a://warehouse/logs: java.io.IOException: From option fs.s3a.aws.credentials.provider java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider not found


Py4JJavaError: An error occurred while calling o36.sql.
: org.apache.iceberg.exceptions.RuntimeIOException: Failed to get file system for path: s3a://warehouse/logs
	at org.apache.iceberg.hadoop.Util.getFs(Util.java:58)
	at org.apache.iceberg.hadoop.HadoopCatalog.initialize(HadoopCatalog.java:112)
	at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:274)
	at org.apache.iceberg.CatalogUtil.buildIcebergCatalog(CatalogUtil.java:328)
	at org.apache.iceberg.spark.SparkCatalog.buildIcebergCatalog(SparkCatalog.java:153)
	at org.apache.iceberg.spark.SparkCatalog.initialize(SparkCatalog.java:752)
	at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:65)
	at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:54)
	at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
	at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:54)
	at org.apache.spark.sql.connector.catalog.LookupCatalog$CatalogAndIdentifier$.unapply(LookupCatalog.scala:122)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$resolveRelation$1(Analyzer.scala:1297)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$resolveRelation(Analyzer.scala:1296)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$14.applyOrElse(Analyzer.scala:1153)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$14.applyOrElse(Analyzer.scala:1117)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$2(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1216)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1215)
	at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:71)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$2(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1216)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1215)
	at org.apache.spark.sql.catalyst.plans.logical.LocalLimit.mapChildren(basicLogicalOperators.scala:1608)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$2(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1216)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1215)
	at org.apache.spark.sql.catalyst.plans.logical.GlobalLimit.mapChildren(basicLogicalOperators.scala:1587)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1117)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1076)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:240)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:236)
	at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:187)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:236)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:202)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:223)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:222)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.io.IOException: From option fs.s3a.aws.credentials.provider java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider not found
	at org.apache.hadoop.fs.s3a.S3AUtils.loadAWSProviderClasses(S3AUtils.java:631)
	at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet(S3AUtils.java:597)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:268)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.iceberg.hadoop.Util.getFs(Util.java:56)
	... 99 more
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592)
	at org.apache.hadoop.conf.Configuration.getClasses(Configuration.java:2663)
	at org.apache.hadoop.fs.s3a.S3AUtils.loadAWSProviderClasses(S3AUtils.java:628)
	... 108 more


### otros ejemplos

In [4]:
df_count = spark.sql("SELECT count(*) FROM hadoop_catalog.default.customers")
df_count.show(truncate=False)

25/11/19 05:14:24 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
25/11/19 05:14:24 WARN FileSystem: Failed to initialize fileystem s3a://warehouse/logs: java.io.IOException: From option fs.s3a.aws.credentials.provider java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider not found


Py4JJavaError: An error occurred while calling o36.sql.
: org.apache.iceberg.exceptions.RuntimeIOException: Failed to get file system for path: s3a://warehouse/logs
	at org.apache.iceberg.hadoop.Util.getFs(Util.java:58)
	at org.apache.iceberg.hadoop.HadoopCatalog.initialize(HadoopCatalog.java:112)
	at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:274)
	at org.apache.iceberg.CatalogUtil.buildIcebergCatalog(CatalogUtil.java:328)
	at org.apache.iceberg.spark.SparkCatalog.buildIcebergCatalog(SparkCatalog.java:153)
	at org.apache.iceberg.spark.SparkCatalog.initialize(SparkCatalog.java:752)
	at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:65)
	at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:54)
	at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
	at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:54)
	at org.apache.spark.sql.connector.catalog.LookupCatalog$CatalogAndIdentifier$.unapply(LookupCatalog.scala:122)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$resolveRelation$1(Analyzer.scala:1297)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$resolveRelation(Analyzer.scala:1296)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$14.applyOrElse(Analyzer.scala:1153)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$14.applyOrElse(Analyzer.scala:1117)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$2(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1216)
	at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1215)
	at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:71)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:135)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1117)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1076)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:240)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:236)
	at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:187)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:236)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:202)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:223)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:222)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.io.IOException: From option fs.s3a.aws.credentials.provider java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider not found
	at org.apache.hadoop.fs.s3a.S3AUtils.loadAWSProviderClasses(S3AUtils.java:631)
	at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet(S3AUtils.java:597)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:268)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.iceberg.hadoop.Util.getFs(Util.java:56)
	... 81 more
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592)
	at org.apache.hadoop.conf.Configuration.getClasses(Configuration.java:2663)
	at org.apache.hadoop.fs.s3a.S3AUtils.loadAWSProviderClasses(S3AUtils.java:628)
	... 90 more


In [18]:
df_count = spark.sql("SELECT count(*) FROM hadoop_catalog.default.geolocation")
df_count.show(truncate=False)

+--------+
|count(1)|
+--------+
|0       |
+--------+



In [19]:
spark.sql("SHOW TABLES IN hadoop_catalog.default").show()

+---------+-----------+-----------+
|namespace|  tableName|isTemporary|
+---------+-----------+-----------+
|  default|  customers|      false|
|  default|geolocation|      false|
+---------+-----------+-----------+



In [20]:
spark.table('hadoop_catalog.default.customers').show()

+--------------------+--------------------+------------------------+--------------------+----------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|  customer_state|
+--------------------+--------------------+------------------------+--------------------+----------------+
|"879864dab9bc3047...|"4c93744516667ad3...|                 "89254"|      jaragua do sul|              SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|                 "04534"|           sao paulo|              SP|
|"5e274e7a0c3809e1...|"57b2a98a409812fe...|                 "35182"|             timoteo|              MG|
|       "customer_id"|"customer_unique_id"|    "customer_zip_cod...|     "customer_city"|"customer_state"|
|"4b7139f34592b3a3...|"9afe194fb833f79e...|                 "30575"|      belo horizonte|              MG|
|"9fb35e4ed6f0a14a...|"2a7745e1ed516b28...|                 "39400"|       montes claros|              MG|
|"5aa9e4fdd4dfd209...|"2a46fb94aef5cb

In [21]:
spark.table('hadoop_catalog.default.geolocation').show()

+---------------------------+--------------------+---------------------+----------------+-----------------+
|geolocation_zip_code_prefix|geolocation_latitude|geolocation_longitude|geolocation_city|geolocation_state|
+---------------------------+--------------------+---------------------+----------------+-----------------+
+---------------------------+--------------------+---------------------+----------------+-----------------+

