# Exercise 7: Spark and Hive Integration

## The Power of Unified Metadata

One of Spark's most powerful features is its ability to read and write tables from the **Hive Metastore**. This means:

- Tables created in Hive can be queried by Spark
- Tables created by Spark are visible to Hive
- Other tools (Presto, Trino, Flink) can also access the same tables

```
┌──────────────────────────────────────────────────────────┐
│                  HIVE METASTORE                         │
│              (Single Source of Truth)                   │
└────────────────────────┬─────────────────────────────────┘
                         │
         ┌───────────────┼───────────────┐
         │               │               │
         ▼               ▼               ▼
    ┌─────────┐    ┌─────────┐    ┌─────────┐
    │  Spark  │    │  Hive   │    │ Presto  │
    │ (Fast!) │    │  (SQL)  │    │ (OLAP)  │
    └─────────┘    └─────────┘    └─────────┘
         │               │               │
         └───────────────┼───────────────┘
                         ▼
                    ┌─────────┐
                    │  HDFS   │
                    │  Data   │
                    └─────────┘
```

## Learning Objectives
- Connect Spark to the Hive Metastore
- Read tables created by Hive using Spark
- Create tables from Spark that are visible in Hive
- Use Spark's DataFrame API with Hive tables

---

## Part 1: Connecting Spark to Hive Metastore

In [1]:
from pyspark.sql import SparkSession

# Key configuration: enableHiveSupport() and metastore URI
spark = SparkSession.builder \
    .appName("Spark-Hive Integration Lab") \
    .master("yarn") \
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.sql.catalogImplementation", "hive") \
    .enableHiveSupport() \
    .getOrCreate()

print("✓ Spark connected to Hive Metastore!")
print(f"Application ID: {spark.sparkContext.applicationId}")

✓ Spark connected to Hive Metastore!
Application ID: application_1768170757723_0001


In [2]:
# Verify connection by listing databases
print("=== Available Databases ===")
spark.sql("SHOW DATABASES").show(truncate=False)

=== Available Databases ===
+---------------+
|namespace      |
+---------------+
|default        |
|sales_analytics|
+---------------+



## Part 2: Accessing Hive Tables from Spark

Tables in Hive are seamlessly accessible via Spark:

In [None]:
# Create a database if it doesn't exist
spark.sql("CREATE DATABASE IF NOT EXISTS spark_hive_demo")
spark.sql("USE spark_hive_demo")

# Create a sample table (this will be visible in Hive!)
spark.sql("""
    CREATE TABLE IF NOT EXISTS employees (
        emp_id INT,
        name STRING,
        department STRING,
        salary DECIMAL(10,2),
        hire_date DATE
    )
    STORED AS PARQUET
""")

print("Table 'employees' created in Hive Metastore!")

In [None]:
# Insert data using Spark DataFrame API
from datetime import date

employee_data = [
    (1, "Alice Johnson", "Engineering", 85000.00, date(2020, 3, 15)),
    (2, "Bob Smith", "Marketing", 72000.00, date(2019, 7, 1)),
    (3, "Carol Williams", "Engineering", 92000.00, date(2018, 11, 20)),
    (4, "David Brown", "Sales", 68000.00, date(2021, 1, 10)),
    (5, "Eva Martinez", "Engineering", 95000.00, date(2017, 5, 5)),
]

emp_df = spark.createDataFrame(
    employee_data,
    ["emp_id", "name", "department", "salary", "hire_date"]
)

# Write to Hive table
emp_df.write.mode("overwrite").saveAsTable("employees")
print("Data inserted into employees table!")

In [None]:
# Query the table - works the same in Spark AND Hive!
print("=== Employees Table (via Spark SQL) ===")
spark.sql("SELECT * FROM employees").show()

## Part 3: Using DataFrame API with Hive Tables

The real power comes from combining Spark's DataFrame API with Hive tables:

In [None]:
# Read Hive table as DataFrame
employees_df = spark.table("employees")

# Use DataFrame operations
from pyspark.sql.functions import avg, count, col, max as spark_max

# Department statistics
dept_stats = employees_df \
    .groupBy("department") \
    .agg(
        count("*").alias("employee_count"),
        avg("salary").alias("avg_salary"),
        spark_max("salary").alias("max_salary")
    ) \
    .orderBy(col("avg_salary").desc())

print("=== Department Statistics ===")
dept_stats.show()

In [None]:
# Save aggregated results as a new Hive table
dept_stats.write.mode("overwrite").saveAsTable("department_summary")

print("Aggregated data saved as new Hive table!")
spark.sql("SHOW TABLES").show()

## Part 4: Creating Partitioned Tables with Spark

Spark can create and write to partitioned Hive tables:

In [None]:
# Create sales data with dates
from datetime import datetime
from pyspark.sql.functions import year, month, dayofmonth

sales_data = [
    ("S001", "Widget A", 100, 29.99, datetime(2024, 1, 15)),
    ("S002", "Widget B", 50, 49.99, datetime(2024, 1, 20)),
    ("S003", "Widget A", 75, 29.99, datetime(2024, 2, 5)),
    ("S004", "Widget C", 200, 19.99, datetime(2024, 2, 10)),
    ("S005", "Widget B", 30, 49.99, datetime(2024, 3, 1)),
]

sales_df = spark.createDataFrame(
    sales_data,
    ["sale_id", "product", "quantity", "unit_price", "sale_date"]
)

# Add partition columns
sales_df = sales_df \
    .withColumn("sale_year", year("sale_date")) \
    .withColumn("sale_month", month("sale_date"))

sales_df.show()

In [None]:
# Write as partitioned Hive table
sales_df.write \
    .mode("overwrite") \
    .partitionBy("sale_year", "sale_month") \
    .saveAsTable("sales_partitioned")

print("Partitioned table created!")
spark.sql("SHOW PARTITIONS sales_partitioned").show()

In [None]:
# Query with partition pruning
print("=== January 2024 Sales ===")
jan_sales = spark.sql("""
    SELECT sale_id, product, quantity, unit_price, sale_date
    FROM sales_partitioned
    WHERE sale_year = 2024 AND sale_month = 1
""")
jan_sales.show()

# Verify partition pruning in query plan
print("\n=== Query Plan (Notice PartitionFilters) ===")
jan_sales.explain()

## Part 5: Verifying Cross-Tool Access

Tables created by Spark are visible to Hive and vice versa:

In [None]:
# Check table metadata - same query works in HiveServer2
print("=== Table Details (visible in Hive too!) ===")
spark.sql("DESCRIBE FORMATTED employees").show(50, truncate=False)

In [None]:
# You can run the same query in Hive CLI:
# beeline -u "jdbc:hive2://hiveserver2:10000/spark_hive_demo"
# > SELECT * FROM employees;

print("""
╔════════════════════════════════════════════════════════════════╗
║  To verify in Hive, run in terminal:                          ║
║                                                                ║
║  docker exec -it hiveserver2 beeline -u \\                     ║
║    "jdbc:hive2://localhost:10000/spark_hive_demo"              ║
║                                                                ║
║  Then: SELECT * FROM employees;                                ║
╚════════════════════════════════════════════════════════════════╝
""")

## Part 6: Advanced Integration Patterns

In [None]:
# Pattern 1: Join Hive tables in Spark
print("=== Joining Hive Tables in Spark ===")

# Create a departments reference table
dept_data = [
    ("Engineering", "Building A", 50),
    ("Marketing", "Building B", 25),
    ("Sales", "Building C", 40),
]

dept_df = spark.createDataFrame(
    dept_data,
    ["dept_name", "location", "budget_millions"]
)
dept_df.write.mode("overwrite").saveAsTable("departments")

# Join employees with departments
joined = spark.sql("""
    SELECT e.name, e.department, e.salary, d.location
    FROM employees e
    JOIN departments d ON e.department = d.dept_name
""")
joined.show()

In [None]:
# Pattern 2: Insert data into existing Hive table
new_employee = [(6, "Frank Lee", "Marketing", 75000.00, date(2024, 1, 15))]
new_emp_df = spark.createDataFrame(
    new_employee,
    ["emp_id", "name", "department", "salary", "hire_date"]
)

# Append to existing table
new_emp_df.write.mode("append").saveAsTable("employees")

print("=== Updated Employees ===")
spark.sql("SELECT * FROM employees ORDER BY emp_id").show()

In [None]:
# Pattern 3: CTAS (Create Table As Select)
spark.sql("""
    CREATE TABLE IF NOT EXISTS high_earners AS
    SELECT emp_id, name, department, salary
    FROM employees
    WHERE salary > 80000
""")

print("=== High Earners Table ===")
spark.sql("SELECT * FROM high_earners").show()

## Summary: Spark + Hive Integration Benefits

| Feature | Benefit |
|---------|--------|
| Unified Catalog | Single source of truth for all tools |
| Schema on Read | Tables are always consistent |
| Partition Pruning | Efficient queries on large datasets |
| Parallel Processing | Spark's fast execution engine |
| SQL Compatibility | Same queries work in Spark and Hive |
| DataFrame API | Python/Scala programmatic access |

In [None]:
# Cleanup (optional)
# spark.sql("DROP TABLE IF EXISTS employees")
# spark.sql("DROP TABLE IF EXISTS departments")
# spark.sql("DROP TABLE IF EXISTS department_summary")
# spark.sql("DROP TABLE IF EXISTS sales_partitioned")
# spark.sql("DROP TABLE IF EXISTS high_earners")
# spark.sql("DROP DATABASE IF EXISTS spark_hive_demo CASCADE")

In [None]:
spark.stop()
print("Session stopped.")