In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from spark_utils import init_spark_utils

spark = SparkSession.builder \
    .appName("ReadDataFromBucket") \
    .master("spark://spark-master:7077") \
    .config("spark.executor.memory", "512mb") \
    .config("spark.cores.max", "3") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0,io.delta:delta-storage:3.2.0,org.antlr:antlr4-runtime:4.9.3,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.access.key", "accesskey") \
    .config("spark.hadoop.fs.s3a.secret.key", "secretkey") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.hadoop.hive.metastore.warehouse.dir", "s3a://wba/warehouse") \
    .getOrCreate()

utils = init_spark_utils(spark)

hadoop_conf = spark._jsc.hadoopConfiguration()
print("fs.s3a.endpoint:", hadoop_conf.get("fs.s3a.endpoint"))
print("fs.s3a.access.key:", hadoop_conf.get("fs.s3a.access.key"))
print("fs.s3a.secret.key:", hadoop_conf.get("fs.s3a.secret.key"))
print("fs.s3a.path.style.access:", hadoop_conf.get("fs.s3a.path.style.access"))

fs.s3a.endpoint: http://minio:9000
fs.s3a.access.key: accesskey
fs.s3a.secret.key: secretkey
fs.s3a.path.style.access: true


In [2]:
paths = utils.fs.ls("s3a://wba/warehouse/ga4_anima_cpf_user")

In [3]:
df_list = []
for path in paths:
    print(path['path'])
    try:
        df = spark.read.format("parquet").option("header", "true").load(path['path'])
        df_list.append(df)
    except Exception as e:
        print(f"Erro ao carregar o arquivo {path['path']}: {str(e)}")

s3a://wba/warehouse/ga4_anima_cpf_user/ga4__anima_cpf_user
s3a://wba/warehouse/ga4_anima_cpf_user/ga4__anima_cpf_user_property295122410
s3a://wba/warehouse/ga4_anima_cpf_user/ga4__anima_cpf_user_property295356968


In [6]:
df_concat = df_list[0]
for df in df_list[1:]:
    df_concat = df_concat.union(df)

In [7]:
# Registrar o DataFrame como uma tabela temporária
df_concat.createOrReplaceTempView("anima_test")

# Executar consulta SQL
resultado = spark.sql("SELECT * FROM anima_test WHERE customUser_insc_cpf_user != '(not set)' LIMIT 5")
resultado.limit(100).toPandas()

Unnamed: 0,_airbyte_raw_id,_airbyte_extracted_at,_airbyte_meta,_airbyte_generation_id,city,date,hour,hostName,pagePath,sessions,property_id,customUser_insc_cpf_user,customUser_insc_ies_user
0,5940da3d-3f49-46a0-a413-01819c075b76,2025-04-16 16:00:15.227,"(52, [])",44,(not set),20250401,13,portal.anhembi.br,/unidades/piracicaba/,3,255713153,ecf4f3fbfbc4acfec8bcb7d1d72a4d54fb34,1630780969.174342
1,f9994406-73d5-4031-b647-486b775c0368,2025-04-16 16:00:15.229,"(52, [])",44,(not set),20250401,20,portal.anhembi.br,/blog/cientistas-brasileiros,3,255713153,0a1949c57ebe2c1fe92c3a647f66727c74f3,1810342158.1730497
2,60255f2e-11ec-440d-b07b-9889f13a7a3b,2025-04-16 16:00:15.234,"(52, [])",44,Osasco,20250401,15,portal.anhembi.br,/internacional/,3,255713153,e3b0c44298fc1c149afbf4c8996fb92427ae,1992312009.1742184
3,9902664d-9fb0-4b80-9c61-cdd8a5127162,2025-04-16 16:00:15.238,"(52, [])",44,Sao Paulo,20250401,10,portal.anhembi.br,/internacional/,3,255713153,617928564e7a9104e9ed825f266df634e8f0,689723803.1743513
4,7067d675-234b-4c5b-86ec-723b902fd64f,2025-04-16 16:00:15.241,"(52, [])",44,Sao Paulo,20250401,13,portal.anhembi.br,/internacional/,3,255713153,617928564e7a9104e9ed825f266df634e8f0,1034025654.173928


In [8]:
df_final = df_concat.drop("_airbyte_raw_id", "_airbyte_extracted_at", "_airbyte_meta", "_airbyte_generation_id")

In [9]:
df_final.limit(10).toPandas()

Unnamed: 0,city,date,hour,hostName,pagePath,sessions,property_id,customUser_insc_cpf_user,customUser_insc_ies_user
0,Sao Paulo,20250401,11,portal.anhembi.br,/,57,255713153,(not set),(not set)
1,Sao Paulo,20250401,16,portal.anhembi.br,/,55,255713153,(not set),(not set)
2,Sao Paulo,20250401,10,portal.anhembi.br,/,53,255713153,(not set),(not set)
3,Sao Paulo,20250401,13,portal.anhembi.br,/,51,255713153,(not set),(not set)
4,Sao Paulo,20250401,14,portal.anhembi.br,/,50,255713153,(not set),(not set)
5,Sao Paulo,20250401,17,portal.anhembi.br,/,48,255713153,(not set),(not set)
6,Sao Paulo,20250401,9,portal.anhembi.br,/,46,255713153,(not set),(not set)
7,Sao Paulo,20250401,15,portal.anhembi.br,/,39,255713153,(not set),(not set)
8,(not set),20250401,11,landing.anhembi.br,/cursos-de-faculdade/,38,255713153,(not set),(not set)
9,Sao Paulo,20250401,20,landing.anhembi.br,/curso/biomedicina-bacharelado/,37,255713153,(not set),(not set)


In [10]:
# Criar e verificar uma tabela teste
spark.sql("CREATE DATABASE IF NOT EXISTS sb_digital")
spark.sql("SHOW DATABASES").show()
spark.sql("USE sb_digital")

+------------+
|   namespace|
+------------+
|     default|
|  sb_digital|
|warehouse_db|
+------------+



DataFrame[]

In [44]:
df_final.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("ga4_cpf_user")

In [11]:
spark.sql("SHOW TABLES").show()

+----------+------------+-----------+
| namespace|   tableName|isTemporary|
+----------+------------+-----------+
|sb_digital|ga4_cpf_user|      false|
|          |  anima_test|      false|
+----------+------------+-----------+



In [13]:
df_minio = spark.read.parquet("s3a://wba/warehouse/sb_digital.db/ga4_cpf_user")
df_minio.limit(10).toPandas()

Unnamed: 0,city,date,hour,hostName,pagePath,sessions,property_id,customUser_insc_cpf_user,customUser_insc_ies_user
0,Sao Paulo,20250401,11,portal.anhembi.br,/,57,255713153,(not set),(not set)
1,Sao Paulo,20250401,16,portal.anhembi.br,/,55,255713153,(not set),(not set)
2,Sao Paulo,20250401,10,portal.anhembi.br,/,53,255713153,(not set),(not set)
3,Sao Paulo,20250401,13,portal.anhembi.br,/,51,255713153,(not set),(not set)
4,Sao Paulo,20250401,14,portal.anhembi.br,/,50,255713153,(not set),(not set)
5,Sao Paulo,20250401,17,portal.anhembi.br,/,48,255713153,(not set),(not set)
6,Sao Paulo,20250401,9,portal.anhembi.br,/,46,255713153,(not set),(not set)
7,Sao Paulo,20250401,15,portal.anhembi.br,/,39,255713153,(not set),(not set)
8,(not set),20250401,11,landing.anhembi.br,/cursos-de-faculdade/,38,255713153,(not set),(not set)
9,Sao Paulo,20250401,20,landing.anhembi.br,/curso/biomedicina-bacharelado/,37,255713153,(not set),(not set)


In [4]:
# Registrar o DataFrame como uma tabela temporária
df_minio.createOrReplaceTempView("minio_test")

# Executar consulta SQL
resultado = spark.sql("SELECT DISTINCT COUNT(*) FROM minio_test")
resultado.limit(100).toPandas()

Unnamed: 0,count(1)
0,7727258


In [16]:
# Executar consulta SQL
resultado = spark.sql("""
CREATE OR REPLACE TABLE ga4_cpf_user AS
SELECT
    CAST(city AS STRING) AS city,
    TO_DATE(date, 'yyyyMMdd') AS dt_date,
    CAST(hour AS INT) AS hour,
    CAST(hostName AS STRING) AS hostName,
    CAST(pagePath AS STRING) AS pagePath,
    CAST(sessions AS BIGINT) AS sessions,
    CAST(property_id AS STRING) AS property_id,
    CAST(customUser_insc_cpf_user AS STRING) AS customUser_insc_cpf_user,
    CAST(customUser_insc_ies_user AS STRING) AS customUser_insc_ies_user
FROM ga4_cpf_user;
""")
resultado.limit(100).toPandas()

AnalysisException: [UNSUPPORTED_FEATURE.TABLE_OPERATION] The feature is not supported: Table `spark_catalog`.`sb_digital`.`ga4_cpf_user` does not support REPLACE TABLE AS SELECT. Please check the current catalog and namespace to make sure the qualified table name is expected, and also check the catalog implementation which is configured by "spark.sql.catalog".

[Database(name='default', catalog='spark_catalog', description='Default Hive database', locationUri='s3a://wba/warehouse'),
 Database(name='sb_digital', catalog='spark_catalog', description='', locationUri='s3a://wba/warehouse/sb_digital.db'),
 Database(name='warehouse_db', catalog='spark_catalog', description='', locationUri='s3a://wba/warehouse/warehouse_db.db')]