In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType, StructField, StringType, FloatType,
    LongType, TimestampType, IntegerType
)
from pyspark.sql import functions as F, Window
from delta import configure_spark_with_delta_pip
from pathlib import Path
from helpers import build_schema, load_schema_json, paths
import os


In [2]:
postgres_package = "org.postgresql:postgresql:42.7.1"

builder = (
    SparkSession.builder
    .appName("TesteLocal")
    .master("local[*]")
    .config("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
    .config("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED")
    .config("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

# Adicionamos o pacote do Postgres atrav√©s do extra_packages
spark = configure_spark_with_delta_pip(builder, extra_packages=[postgres_package]).getOrCreate()

print("Sess√£o iniciada com pacotes Delta + Postgres!")

print("Spark version:", spark.version)
print(spark.sparkContext.applicationId)
print(spark.sparkContext.uiWebUrl)

Picked up _JAVA_OPTIONS: -Xms512m -Xmx4g
Picked up _JAVA_OPTIONS: -Xms512m -Xmx4g


:: loading settings :: url = jar:file:/workspaces/ApacheSpark-CD/.venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/vscode/.ivy2/cache
The jars for the packages stored in: /home/vscode/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-00f986c9-852a-494d-a0c6-b8a4e39568d2;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.2.0 in central
	found io.delta#delta-storage;3.2.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
	found org.postgresql#postgresql;42.7.1 in central
	found org.checkerframework#checker-qual;3.41.0 in central
downloading https://repo1.maven.org/maven2/io/delta/delta-spark_2.12/3.2.0/delta-spark_2.12-3.2.0.jar ...
	[SUCCESSFUL ] io.delta#delta-spark_2.12;3.2.0!delta-spark_2.12.jar (242ms)
downloading https://repo1.maven.org/maven2/org/postgresql/postgresql/42.7.1/postgresql-42.7.1.jar ...
	[SUCCESSFUL ] org.postgresql#postgresql;42.7.1!postgresql.jar (64ms)
downloading https://repo1.maven.org/maven2/io/delta/delta

Sess√£o iniciada com pacotes Delta + Postgres!
Spark version: 3.5.2
local-1767740710769
http://936b530c46dc:4040


In [None]:
db_host = os.getenv("POSTGRES_HOST", "db") 

print(f"Tentando conectar em: {db_host}...")

db_url = f"jdbc:postgresql://{db_host}:5432/retail_db"

db_properties = {
    "user": "postgres",
    "password": "postgres_password",
    "driver": "org.postgresql.Driver"
}

# Lendo uma tabela do Postgres
try:
    df_orders = spark.read.jdbc(url=db_url, table="orders", properties=db_properties)
    print("Sucesso! Schema da tabela orders:")
    df_orders.printSchema()
    
    print("A mostrar 5 registos:")
    df_orders.show(5)
    print("Conex√£o Spark -> Postgres realizada com sucesso!")
except Exception as e:
    print(f"Erro ao conectar: {e}")

Tentando conectar em: db...


Sucesso! Schema da tabela orders:
root
 |-- order_id: integer (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)

A mostrar 5 registos:
+--------+----------+-----------------+------------+
|order_id|order_date|order_customer_id|order_status|
+--------+----------+-----------------+------------+
+--------+----------+-----------------+------------+

Conex√£o Spark -> Postgres realizada com sucesso! üöÄ


In [None]:
# spark.stop()

In [7]:
repo_root = Path.cwd()               
schema_base_dir = (repo_root/"data"/"retail_db").as_posix()
print(schema_base_dir) ## /workspaces/ApacheSpark-CD/data/retail_db

/workspaces/ApacheSpark-CD/data/retail_db


In [8]:
type_mapping = {
    "integer": IntegerType(),
    "string": StringType(),
    "timestamp": TimestampType(),
    "float": FloatType()
}

In [9]:
schema_base_dir = (repo_root/"data"/"retail_db").as_posix()
schema_paths = paths(schema_base_dir,"file", "schemas*")
schema_json = load_schema_json(schema_paths)
ds_list = paths(schema_base_dir,"folder")

for ds in ds_list:
    # print(f"Processing {ds}")
    ds = Path(ds).name
    print(f"Processing {ds.capitalize()} data")
    print(f"Processing {ds} data")
    
    output_parq = (Path(f"{schema_base_dir}_parquet")/ds).as_posix()
    output_delta = (Path(f"{schema_base_dir}_delta")/ds).as_posix()
    
    schema_table = build_schema(ds,schema_json,type_mapping)
    files=paths(f"{schema_base_dir}/{ds}","file", "part-*")
    if not files:
        continue
    print(files)

    df = (
        spark.read
        .schema(schema_table)
        .option("header", "false")
        .option("sep", ",")
        .option("mode", "PERMISSIVE")
        .csv(files)
    )
     
    df.cache()  ### df.persist(StorageLevel.MEMORY_AND_DISK) -->>> estudar
    # print(output_dir)
    # df.show(5)
    (
        df.write
        .mode("overwrite")      # ou "append"
        .format("parquet")
        .save(output_parq)
    )
    print(f"{output_parq} written successfully.")
    (
        df.write
        .mode("overwrite")      # ou "append"
        .format("delta")
        .save(output_delta)
    )
    
    print(f"{output_delta} written successfully.")
    
    
    df.unpersist()


Processing Departments data
Processing departments data
['/workspaces/ApacheSpark-CD/data/retail_db/departments/part-00000']


                                                                                

/workspaces/ApacheSpark-CD/data/retail_db_parquet/departments written successfully.


                                                                                

/workspaces/ApacheSpark-CD/data/retail_db_delta/departments written successfully.
Processing Categories data
Processing categories data
['/workspaces/ApacheSpark-CD/data/retail_db/categories/part-00000']
/workspaces/ApacheSpark-CD/data/retail_db_parquet/categories written successfully.
/workspaces/ApacheSpark-CD/data/retail_db_delta/categories written successfully.
Processing Orders data
Processing orders data
['/workspaces/ApacheSpark-CD/data/retail_db/orders/part-00000']


                                                                                

/workspaces/ApacheSpark-CD/data/retail_db_parquet/orders written successfully.


                                                                                

/workspaces/ApacheSpark-CD/data/retail_db_delta/orders written successfully.
Processing Customers data
Processing customers data
['/workspaces/ApacheSpark-CD/data/retail_db/customers/part-00000']
/workspaces/ApacheSpark-CD/data/retail_db_parquet/customers written successfully.


                                                                                

/workspaces/ApacheSpark-CD/data/retail_db_delta/customers written successfully.
Processing Products data
Processing products data
['/workspaces/ApacheSpark-CD/data/retail_db/products/part-00000']
/workspaces/ApacheSpark-CD/data/retail_db_parquet/products written successfully.
/workspaces/ApacheSpark-CD/data/retail_db_delta/products written successfully.
Processing Order_items data
Processing order_items data
['/workspaces/ApacheSpark-CD/data/retail_db/order_items/part-00000']


                                                                                

/workspaces/ApacheSpark-CD/data/retail_db_parquet/order_items written successfully.


                                                                                

/workspaces/ApacheSpark-CD/data/retail_db_delta/order_items written successfully.


In [None]:
from pathlib import Path
import shutil

tabela_deltasql = "/workspaces/ApacheSpark-CD/minha_delta_table"

# 1) Dropa a tabela do cat√°logo (se existir)
spark.sql("DROP TABLE IF EXISTS minha_tabela_delta")

# 2) Apaga o diret√≥rio f√≠sico
path = Path(tabela_deltasql)
shutil.rmtree(path, ignore_errors=True)

# 3) Recria a tabela Delta
spark.sql(f"""
CREATE TABLE minha_tabela_delta
USING DELTA
LOCATION '{tabela_deltasql}'
AS
SELECT 'b' as letra, 2 as numero
""")

In [None]:
spark.sql(f"""
CREATE TABLE minha_tabela_delta
USING DELTA
LOCATION '{tabela_deltasql}'
AS
SELECT 'b' as letra, 2 as numero
""")

# primeiras consultas

In [None]:
schema = StructType([
    StructField("stock_id",    StringType(), True),
    StructField("trans_date",  StringType(), True),
    StructField("open_price",  FloatType(),  True),
    StructField("low_price",   FloatType(),  True),
    StructField("high_price",  FloatType(),  True),
    StructField("close_price", FloatType(),  True),
    StructField("volume",      LongType(),   True)
])

dir_data = (repo_root/"data"/"nyse_all/nyse_data/*.txt.gz").as_posix()
df = spark.read.csv(
    dir_data,
    schema=schema,
    header=True,
    sep=","
)
df.show(5)

In [None]:
dir_data = (repo_root/"data/nyse_data_parquet").as_posix()
df.write.mode("overwrite").parquet(dir_data)

In [None]:
dict(df.dtypes)

df.printSchema()

df.count()

In [None]:
count_filter=(
df
#  df.filter(F.col("stock_id" ) == "ABRN")
 .groupBy("stock_id")
 .agg(F.count("*").alias("num_records"))
)

In [None]:
w = Window.partitionBy("stock_id").orderBy(F.desc("trans_date"))

count_filter = (
    df
    # .filter(F.col("stock_id") == "ABRN")
    .select(
        "stock_id",
        "trans_date",
        "close_price"
    )
    .withColumn("num_records",F.row_number().over(w))
)

In [None]:
w = Window.partitionBy("stock_id").orderBy(F.desc("trans_date"))

count_filter = (
    df
    .filter(F.col("stock_id") == "ABRN")
    .select(
        "stock_id",
        "trans_date",
        "close_price"
    )
    .withColumn("num_records", F.row_number().over(w))
)

In [None]:
count_filter.explain(True)
