In [0]:
%run "../../config/project_config"

In [0]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {db_gold}")
print(f"✅ Schema Hazır: {db_gold}")

In [0]:
def create_dim_products():

    products_df = spark.read.table(f"{db_silver}.products")
    translations_df = spark.read.table(f"{db_silver}.category_translation")

    joined_df = products_df.join(translations_df,products_df.product_category_name ==  translations_df.product_category_name,
                                 "left")

    final_df = joined_df.select(
        products_df.product_id,
        # Eğer İngilizce isim yoksa (null), orijinal Portekizce ismini kullan
        translations_df.product_category_name_english.alias("category_name"),
        products_df.product_weight_g,
        products_df.product_length_cm,
        products_df.product_height_cm,
        products_df.product_width_cm
    )


    target_table = f"{db_gold}.dim_products"

    final_df.write.format("delta").mode("overwrite").option("pverwriteSchema","true").saveAsTable(target_table)

create_dim_products()

In [0]:
%sql
SELECT * FROM end_to_end_project.olist_gold.dim_products
WHERE category_name IS NOT NULL
LIMIT 10;

In [0]:
from pyspark.sql.functions import col

def create_fact_orders():

    orders_df = spark.read.table(f"{db_silver}.orders") 
    items_df = spark.read.table(f"{db_silver}.order_items") 

    fact_df = items_df.join(orders_df,
                            items_df.order_id == orders_df.order_id,
                            "inner") 
    
    final_df = fact_df.select(
        items_df.order_id,
        items_df.product_id,     # Ürüne gitmek için anahtar
        items_df.seller_id,      # Satıcıya gitmek için anahtar
        orders_df.customer_id,   # Müşteriye gitmek için anahtar
        orders_df.order_purchase_timestamp, # Zaman analizi için
        orders_df.order_status,
        items_df.price,
        items_df.freight_value,
        # Yeni Metrik: Toplam Tutar (Fiyat + Kargo)
        (items_df.price + items_df.freight_value).alias("total_amount")
    )

    target_table = f"{db_gold}.fact_orders"

    final_df.write.format("delta").mode("overwrite").option("overwriteSchema","true").saveAsTable(target_table)

create_fact_orders()

In [0]:
%sql
SELECT 
    order_id,
    product_id,
    price, 
    freight_value, 
    total_amount,   -- Hesapladığımız yeni sütun
    order_purchase_timestamp
FROM end_to_end_project.olist_gold.fact_orders
ORDER BY total_amount DESC -- En pahalı siparişleri görelim
LIMIT 10;

In [0]:
def create_other_dimensions():

    customers_df = spark.read.table(f"{db_silver}.customers")


    dim_customers =customers_df.select(
        "customer_id",
        "customer_unique_id",
        "customer_city",
        "customer_state"
    )


    dim_customers.write.format("delta").mode("overwrite").saveAsTable(f"{db_gold}.dim_customers")

    sellers_df = spark.read.table(f"{db_silver}.sellers")

    dim_sellers = sellers_df.select(
        "seller_id",
        "seller_city",
        "seller_state"
    )
    dim_sellers.write.format("delta").mode("overwrite").saveAsTable(f"{db_gold}.dim_sellers")

create_other_dimensions()

In [0]:
%sql
SELECT 
    c.customer_state AS Eyalet,
    p.category_name AS Kategori,
    COUNT(f.order_id) AS Toplam_Siparis,
    ROUND(SUM(f.total_amount), 2) AS Toplam_Ciro
FROM end_to_end_project.olist_gold.fact_orders f
JOIN end_to_end_project.olist_gold.dim_customers c ON f.customer_id = c.customer_id
JOIN end_to_end_project.olist_gold.dim_products p ON f.product_id = p.product_id
GROUP BY c.customer_state, p.category_name
ORDER BY Toplam_Ciro DESC
LIMIT 10;

In [0]:
# Bu kod sana Cluster ID'yi verecek
print(spark.conf.get("spark.databricks.clusterUsageTags.clusterId"))