##### Importando as bibliotecas

In [None]:
#libs
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace

##### Criando a pasta que ira receber os arquivos parquet

In [None]:
create_bdfs = dbutils.fs.mkdirs("/dbfs/FileStore/data")
if create_bdfs:
    print("DBFS Created")
else:
    print("DBFS creation Failed")

##### Codigo para os items 3 e 4

In [None]:
class PostgresqlToParquet:
    def __init__(self, host: str, database_name: str, username: str, password: str, table_list: list) -> None:
        self.db_params = {
            "url": f"jdbc:postgresql://{host}/{database_name}",
            "user": username,
            "password": password,
            "driver": "org.postgresql.Driver"
        }
        self.spark = SparkSession.builder.appName("Items_3_and_4").getOrCreate()
        self.table_list = table_list

    def sql_to_parquet(self) -> None:
        try:
            for table_name in self.table_list:
                query = f"(SELECT * FROM {table_name}) AS subquery"
                df = self.spark.read.jdbc(url=self.db_params["url"], table=query, properties=self.db_params)
                parquet_filename = f"/FileStore/data/{table_name}.parquet"
                df.write.parquet(parquet_filename)
                print(f"{parquet_filename} Saved!!!")
        except:
            print("Parquet file already saved")
            pass

    def load_postgresql_table(self):
        postgresql_dfs = {}
        for table_name in self.table_list:
            query = f"(SELECT * FROM {table_name}) AS subquery"
            df = self.spark.read.jdbc(url=self.db_params["url"], table=query, properties=self.db_params)
            postgresql_dfs[table_name] = df

        return postgresql_dfs

    def merge_postgresql_to_parquet(self):
        # Load PostgreSQL tables into Spark session
        postgresql_dfs = self.load_postgresql_table()

        for table_name in self.table_list:
            parquet_path = f"/FileStore/data/{table_name}.parquet"

            # Read Parquet file into a DataFrame
            parquet_df = self.spark.read.parquet(parquet_path)
            parquet_df.createOrReplaceTempView("parquet_table")

            # Get the corresponding PostgreSQL DataFrame
            postgresql_df = postgresql_dfs[table_name]
            postgresql_df.createOrReplaceTempView("postgresql_table")

            # Merge logic using Spark SQL
            merge_sql = f"""
                MERGE INTO parquet_table AS target
                USING postgresql_table AS source
                ON target.check_number = source.check_number
                WHEN MATCHED THEN
                    UPDATE SET
                        target.customer_number = source.customer_number,
                        target.check_number = source.check_number,
                        target.payment_date = source.payment_date,
                        target.amount = source.amount
                WHEN NOT MATCHED BY target THEN
                    INSERT *
                WHEN NOT MATCHED BY source THEN
                    DELETE
                """

            self.spark.sql(merge_sql)
            self.spark.catalog.dropTempView("postgresql_table")
            print(f"Merge operation for {table_name} Completed!!!")
            self.spark.stop()

table_list=[
    "payments",
    "customers",
    "employees",
    "offices",
    "orderdetails",
    "orders",
    "product_lines",
    "products",
]

database = PostgresqlToParquet(
    database_name="ecom1692111647951egphgoeocortbeiy",
    username="deumjhxhzvgcoaunnugtbenu@psql-mock-database-cloud",
    password="tcinixmdjluazpkqptdhnxrh",
    host="psql-mock-database-cloud.postgres.database.azure.com",
    table_list=table_list
)

#### 3 - Extraindo cada tabela do PostgreSQL e salvando como parquet

In [None]:
database.sql_to_parquet()

# Observando os arquivos parquet baixados
display(dbutils.fs.ls("dbfs:/FileStore/data"))

#### 4 - Fazendo o Merge entre as tabelas postgreSQL e os arquivos parquet

In [None]:
database.merge_postgresql_to_parquet()

#### 5 - Lendo os arquivos parquet para a analise

In [None]:
# Spark Session
spark = SparkSession.builder.appName("Item_5").getOrCreate()

# Parquet para Dataframes
customers_df = spark.read.parquet("dbfs:/FileStore/data/customers.parquet")
orders_df = spark.read.parquet("dbfs:/FileStore/data/orders.parquet")
orderdetails_df = spark.read.parquet("dbfs:/FileStore/data/orderdetails.parquet")
products_df = spark.read.parquet("dbfs:/FileStore/data/products.parquet")
payments_df = spark.read.parquet("dbfs:/FileStore/data/payments.parquet")
employees_df = spark.read.parquet("dbfs:/FileStore/data/employees.parquet")
offices_df = spark.read.parquet("dbfs:/FileStore/data/offices.parquet")

##### 5.1 - Qual país possui a maior quantidade de itens cancelados?

In [None]:
# Analise
cancelled_orders_df = customers_df.join(orders_df, "customer_number") \
    .filter(col("status") == "Cancelled") \
    .groupBy("country") \
    .count() \
    .orderBy(col("count").desc())

cancelled_orders_df.show(5)


##### 5.2 - Qual o faturamento da linha de produto mais vendido, considere como os itens Shipped, cujo o pedido foi realizado no ano de 2005?

In [None]:
# Usando os filtros necessarios na tabela orders
shipped_2005_df = orders_df.filter((col("status") == "Shipped") & (col("order_date").between("2005-01-01", "2006-01-01")))

# Analise
product_sales_df = shipped_2005_df.join(orderdetails_df, "order_number") \
    .join(products_df, "product_code") \
    .join(payments_df, "customer_number") \
    .groupBy("product_name") \
    .agg({"amount": "sum", "order_number": "count"}) \
    .withColumnRenamed("count(order_number)", "sales_quantity") \
    .withColumnRenamed("sum(amount)", "billing") \
    .orderBy(col("sales_quantity").desc())

product_sales_df.show(5)

##### 5.3 - Nome, sobrenome e e-mail dos vendedores do Japão, o local-part do e-mail deve estar mascarado.

In [None]:
# Analise
japan_sellers = employees_df.join(offices_df, employees_df.office_code == offices_df.office_code, "left") \
    .filter(offices_df.country == "Japan") \
    .select(
        employees_df.first_name,
        employees_df.last_name,
        regexp_replace(employees_df.email, ".*@", "*****@").alias("masked_email"),  # Extract username part of email
        offices_df.country
    )

japan_sellers.show()


#### 6 - Salvando os resultados em tabelas com o formato delta.

In [None]:
cancelled_orders_df.write.format("delta").mode("overwrite").save("/dbfs/FileStore/table/cancelled_orders_result.delta")
product_sales_df.write.format("delta").mode("overwrite").save("/dbfs/FileStore/table/product_sales_result.delta")
japan_sellers.write.format("delta").mode("overwrite").save("/dbfs/FileStore/table/japan_sellers.delta")

# Observando as tabelas salvas
display(dbutils.fs.ls("/dbfs/FileStore/table"))

##### Conferindo se os dados estao corretos

In [None]:
# Lendo as tabelas
cancelled_orders_df = spark.read.format("delta").load("/dbfs/FileStore/table/cancelled_orders_result.delta")
product_sales_df = spark.read.format("delta").load("/dbfs/FileStore/table/product_sales_result.delta")
japan_sellers_df = spark.read.format("delta").load("/dbfs/FileStore/table/japan_sellers.delta")

cancelled_orders_df.show(5)
product_sales_df.show(5)
japan_sellers_df.show(5)

spark.stop()