In [0]:
# etl_hive_pipeline_databricks.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, sum as _sum,to_date,expr

# Spark session already exists in Databricks as 'spark'

# ===== STEP 1: Load Data =====
input_path = "/Volumes/workspace/data/data/Sample - Superstore.csv"
df = spark.read.csv(input_path, header=True, inferSchema=True)

df_transformed = df.withColumn(
    "Order_Date_Parsed",
    to_date(col("Order Date"), "M/d/yyyy")
).withColumn(
    "Year",
    year(col("Order_Date_Parsed"))
).withColumn(
    "Month",
    month(col("Order_Date_Parsed"))
).withColumn(
    "Sales_Double",
    expr("try_cast(Sales AS DOUBLE)")
)

agg_df = df_transformed.groupBy(
    "Year",
    "Month",
    "Region"
).agg(
    _sum("Sales_Double").alias("TotalSales")
)


# ===== STEP 4: Save as Hive Table =====
agg_df.write \
    .mode("overwrite") \
    .format("delta") \
    .partitionBy("Year", "Month") \
    .saveAsTable("retail_sales")  # Saves to Hive metastore in Databricks

print("✅ ETL complete — data saved & Hive table created.")


✅ ETL complete — data saved & Hive table created.


In [0]:
# query_hive_databricks.py

# Run SQL query directly on Hive Metastore table
query = """
SELECT Region, SUM(TotalSales) AS TotalSales
FROM retail_sales
WHERE Year = 2017 AND Month = 12
GROUP BY Region
"""

df = spark.sql(query)
df.show()   # Show in Databricks notebook


+-------+------------------+
| Region|        TotalSales|
+-------+------------------+
|   West|         28016.793|
|   East|19715.352999999996|
|  South|         15002.349|
|Central|18493.120799999997|
+-------+------------------+



In [0]:
%sql
CREATE EXTERNAL TABLE IF NOT EXISTS retail_sales_external (
    Region STRING,
    TotalSales DOUBLE
)
PARTITIONED BY (Year INT, Month INT)
STORED AS PARQUET
LOCATION 'my_external_location/retail_sales_external';