# üåü Full Stack Big Data Integration

This notebook demonstrates the complete integration of all Big Data components in our environment.

## Learning Objectives
- Orchestrate HDFS, Spark, and Hive together
- Build end-to-end data pipelines
- Perform comprehensive data analytics
- Demonstrate production-ready workflows
- Monitor and optimize performance

## 1. Environment Initialization and Health Check

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from hdfs import InsecureClient
import pandas as pd
import json
import time

print('üöÄ Big Data Environment - Full Stack Integration')
print('=' * 65)

# Initialize Spark session with full configuration
spark = SparkSession.builder \
    .appName("BigDataEnv-FullIntegration") \
    .master("spark://spark-master:7077") \
    .config("spark.sql.warehouse.dir", "hdfs://namenode:9000/user/hive/warehouse") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.adaptive.enabled", "true") \
    .enableHiveSupport() \
    .getOrCreate()

# Initialize HDFS client
try:
    hdfs_client = InsecureClient('http://namenode:9870', user='root')
except:
    hdfs_client = None

print('‚úÖ Spark Session initialized with full Hadoop/Hive integration')
print(f'üåê Master: {spark.sparkContext.master}')
print(f'üì± Application: {spark.sparkContext.applicationId}')
print(f'‚öôÔ∏è  Parallelism: {spark.sparkContext.defaultParallelism}')

## 2. Comprehensive Data Processing Pipeline

In [None]:
print('üèóÔ∏è  Building Comprehensive Data Pipeline:')

# Create analytics database
spark.sql("CREATE DATABASE IF NOT EXISTS bigdata_analytics")
spark.sql("USE bigdata_analytics")
print('‚úÖ Analytics database ready')

# Enhanced users processing with comprehensive transformations
print('\nüë• Processing Enhanced Users Data:')
try:
    users_df = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv("hdfs://namenode:9000/user/demo/input/users.csv")
    
    users_enhanced = users_df \
        .withColumn("email_domain", split(col("email"), "@").getItem(1)) \
        .withColumn("age_group", 
                   when(col("age") < 25, "Young")
                   .when(col("age") < 35, "Adult")
                   .when(col("age") < 50, "Mature")
                   .otherwise("Senior")) \
        .withColumn("continent",
                   when(col("country").isin(["USA", "Canada"]), "North America")
                   .when(col("country").isin(["UK", "Germany", "France", "Spain", "Italy"]), "Europe")
                   .when(col("country").isin(["Australia"]), "Oceania")
                   .when(col("country").isin(["Japan", "South Korea"]), "Asia")
                   .otherwise("Other")) \
        .withColumn("load_timestamp", current_timestamp())
    
    print(f'‚úÖ Users enhanced: {users_enhanced.count()} records')
    users_enhanced.cache()
    
except Exception as e:
    print(f'‚ö†Ô∏è  Using sample data: {e}')
    users_enhanced = spark.createDataFrame([
        (1, "John Smith", "john@email.com", 28, "New York", "USA", "email.com", "Adult", "North America"),
        (2, "Emma Johnson", "emma@email.com", 34, "London", "UK", "email.com", "Adult", "Europe"),
        (3, "Michael Chen", "michael@email.com", 22, "Toronto", "Canada", "email.com", "Young", "North America")
    ], ["user_id", "name", "email", "age", "city", "country", "email_domain", "age_group", "continent"])
    users_enhanced = users_enhanced.withColumn("load_timestamp", current_timestamp())

In [None]:
# Enhanced transactions processing
print('üí≥ Processing Enhanced Transactions Data:')
try:
    transactions_df = spark.read \
        .option("multiline", "true") \
        .json("hdfs://namenode:9000/user/demo/input/transactions.json")
    
    transactions_enhanced = transactions_df \
        .withColumn("transaction_date", to_date(col("timestamp"))) \
        .withColumn("amount_usd",
                   when(col("currency") == "USD", col("amount"))
                   .when(col("currency") == "EUR", col("amount") * 1.1)
                   .when(col("currency") == "GBP", col("amount") * 1.25)
                   .otherwise(col("amount"))) \
        .withColumn("transaction_size",
                   when(col("amount") < 50, "Small")
                   .when(col("amount") < 200, "Medium") 
                   .otherwise("Large")) \
        .withColumn("load_timestamp", current_timestamp())
    
    print(f'‚úÖ Transactions enhanced: {transactions_enhanced.count()} records')
    transactions_enhanced.cache()
    
except Exception as e:
    print(f'‚ö†Ô∏è  Using sample data: {e}')
    transactions_enhanced = spark.createDataFrame([
        ("TXN001", 1, 150.50, "USD", "Amazon", "Electronics", "completed", 150.50, "Medium"),
        ("TXN002", 2, 89.99, "GBP", "Tesco", "Groceries", "completed", 112.49, "Medium"),
        ("TXN003", 3, 299.00, "CAD", "Best Buy", "Electronics", "pending", 224.25, "Large")
    ], ["transaction_id", "user_id", "amount", "currency", "merchant", "category", "status", "amount_usd", "transaction_size"])
    transactions_enhanced = transactions_enhanced.withColumn("load_timestamp", current_timestamp())

print('\nüîç Sample Enhanced Data:')
users_enhanced.select("user_id", "name", "age_group", "continent").show(3)
transactions_enhanced.select("transaction_id", "user_id", "amount_usd", "transaction_size").show(3)

## 3. Advanced Analytics and Business Intelligence

In [None]:
# Create comprehensive analytics views
print('üìä Creating Analytics Views:')

# Register as temporary views for SQL analysis
users_enhanced.createOrReplaceTempView("users_enhanced")
transactions_enhanced.createOrReplaceTempView("transactions_enhanced")

# 1. Customer Segmentation Analysis
print('\nüéØ Customer Segmentation Analysis:')
customer_segments = spark.sql("""
    SELECT 
        u.age_group,
        u.continent,
        COUNT(DISTINCT u.user_id) as customer_count,
        COUNT(t.transaction_id) as total_transactions,
        SUM(t.amount_usd) as total_revenue,
        AVG(t.amount_usd) as avg_transaction_value,
        SUM(t.amount_usd) / COUNT(DISTINCT u.user_id) as revenue_per_customer
    FROM users_enhanced u
    LEFT JOIN transactions_enhanced t ON u.user_id = t.user_id
    WHERE t.status = 'completed'
    GROUP BY u.age_group, u.continent
    ORDER BY total_revenue DESC
""")
customer_segments.show()

# 2. Product Category Performance
print('\nüì¶ Product Category Performance:')
category_performance = spark.sql("""
    SELECT 
        category,
        COUNT(*) as transaction_count,
        SUM(amount_usd) as total_revenue,
        AVG(amount_usd) as avg_amount,
        COUNT(CASE WHEN status = 'completed' THEN 1 END) * 100.0 / COUNT(*) as success_rate
    FROM transactions_enhanced
    GROUP BY category
    ORDER BY total_revenue DESC
""")
category_performance.show()

# 3. Geographic Analysis
print('\nüåç Geographic Revenue Analysis:')
geographic_analysis = spark.sql("""
    SELECT 
        u.continent,
        u.country,
        COUNT(DISTINCT u.user_id) as customers,
        SUM(t.amount_usd) as revenue,
        AVG(t.amount_usd) as avg_transaction
    FROM users_enhanced u
    JOIN transactions_enhanced t ON u.user_id = t.user_id
    WHERE t.status = 'completed'
    GROUP BY u.continent, u.country
    ORDER BY revenue DESC
""")
geographic_analysis.show()

## 4. Data Warehouse Implementation

In [None]:
# Create data warehouse tables
print('üèõÔ∏è  Creating Data Warehouse:')

# Save dimension tables
print('\nüìã Creating Dimension Tables:')
try:
    users_enhanced.write \
        .mode("overwrite") \
        .saveAsTable("dim_users")
    print('‚úÖ dim_users created')
    
    transactions_enhanced.write \
        .mode("overwrite") \
        .saveAsTable("fact_transactions")
    print('‚úÖ fact_transactions created')

except Exception as e:
    print(f'‚ö†Ô∏è  Table creation issue: {e}')

# Create summary tables
print('\nüìä Creating Summary Tables:')
try:
    # User summary table
    user_summary = spark.sql("""
        SELECT 
            u.user_id,
            u.name,
            u.age_group,
            u.continent,
            COUNT(t.transaction_id) as total_transactions,
            COALESCE(SUM(CASE WHEN t.status = 'completed' THEN t.amount_usd END), 0) as total_spent,
            COALESCE(AVG(CASE WHEN t.status = 'completed' THEN t.amount_usd END), 0) as avg_transaction
        FROM users_enhanced u
        LEFT JOIN transactions_enhanced t ON u.user_id = t.user_id
        GROUP BY u.user_id, u.name, u.age_group, u.continent
    """)
    
    user_summary.createOrReplaceTempView("user_summary")
    print('‚úÖ user_summary view created')
    
    print('\nüë• User Summary Sample:')
    user_summary.show(5)
    
except Exception as e:
    print(f'‚ùå Summary creation error: {e}')

## 5. Advanced SQL Analytics

In [None]:
# Advanced SQL analytics with window functions
print('üî¨ Advanced SQL Analytics:')

# 1. Customer Ranking Analysis
print('\nüèÜ Top Customers by Spending:')
top_customers = spark.sql("""
    SELECT 
        name,
        continent,
        total_spent,
        total_transactions,
        RANK() OVER (ORDER BY total_spent DESC) as spending_rank,
        RANK() OVER (PARTITION BY continent ORDER BY total_spent DESC) as continent_rank
    FROM user_summary
    WHERE total_spent > 0
    ORDER BY total_spent DESC
""")
top_customers.show(10)

# 2. Category Trends Analysis
print('\nüìà Category Performance with Trends:')
category_trends = spark.sql("""
    SELECT 
        category,
        COUNT(*) as transactions,
        SUM(amount_usd) as revenue,
        AVG(amount_usd) as avg_amount,
        SUM(amount_usd) / SUM(SUM(amount_usd)) OVER () * 100 as revenue_percentage
    FROM transactions_enhanced
    WHERE status = 'completed'
    GROUP BY category
    ORDER BY revenue DESC
""")
category_trends.show()

# 3. Executive Summary Dashboard
print('\nüìã Executive Summary:')
executive_summary = spark.sql("""
    SELECT 
        'Total Customers' as metric,
        CAST(COUNT(DISTINCT user_id) AS STRING) as value
    FROM user_summary
    UNION ALL
    SELECT 
        'Total Revenue (USD)' as metric,
        CAST(ROUND(SUM(total_spent), 2) AS STRING) as value
    FROM user_summary
    UNION ALL
    SELECT 
        'Avg Customer Value' as metric,
        CAST(ROUND(AVG(total_spent), 2) AS STRING) as value
    FROM user_summary
    WHERE total_spent > 0
""")
executive_summary.show(truncate=False)

## 6. Data Export and Integration

In [None]:
# Export processed data
print('üì§ Exporting Processed Data:')

try:
    # Export customer segments
    print('\nüíæ Exporting Customer Segments:')
    customer_segments.coalesce(1) \
        .write \
        .mode("overwrite") \
        .parquet("hdfs://namenode:9000/data/exports/customer_segments")
    print('‚úÖ Customer segments exported to Parquet')
    
    # Export user summary
    print('\nüíæ Exporting User Summary:')
    user_summary.coalesce(1) \
        .write \
        .mode("overwrite") \
        .option("header", "true") \
        .csv("hdfs://namenode:9000/data/exports/user_summary")
    print('‚úÖ User summary exported to CSV')
    
    # Export executive summary
    print('\nüíæ Exporting Executive Summary:')
    executive_summary.coalesce(1) \
        .write \
        .mode("overwrite") \
        .json("hdfs://namenode:9000/data/exports/executive_summary")
    print('‚úÖ Executive summary exported to JSON')
    
except Exception as e:
    print(f'‚ö†Ô∏è  Export may have issues in this environment: {e}')
    print('üí° Data is still available in memory for analysis')

print('\nüìÅ Export Summary:')
export_info = [
    ('Customer Segments', 'Parquet', '/data/exports/customer_segments'),
    ('User Summary', 'CSV', '/data/exports/user_summary'),
    ('Executive Summary', 'JSON', '/data/exports/executive_summary')
]

for name, format_type, path in export_info:
    print(f'  {name:18}: {format_type:8} ‚Üí {path}')

## 7. Performance Monitoring and Optimization

In [None]:
# Performance monitoring
print('‚ö° Performance Monitoring:')

# Spark application metrics
sc = spark.sparkContext
print(f'\nüìä Spark Application:')
print(f'  Application ID: {sc.applicationId}')
print(f'  Master: {sc.master}')
print(f'  Cores: {sc.defaultParallelism}')
print(f'  Version: {sc.version}')

# Query performance test
print('\nüöÄ Query Performance Test:')
start_time = time.time()

performance_query = spark.sql("""
    SELECT 
        u.continent,
        COUNT(*) as transactions,
        SUM(t.amount_usd) as revenue
    FROM users_enhanced u
    JOIN transactions_enhanced t ON u.user_id = t.user_id
    WHERE t.status = 'completed'
    GROUP BY u.continent
    ORDER BY revenue DESC
""")

result = performance_query.collect()
end_time = time.time()

print(f'  Query time: {end_time - start_time:.3f} seconds')
print(f'  Results: {len(result)} rows')

# Show performance results
print('\nüìä Performance Query Results:')
performance_query.show()

# Cache statistics
print('\nüíæ Cache Status:')
cache_status = [
    ('users_enhanced', users_enhanced.is_cached),
    ('transactions_enhanced', transactions_enhanced.is_cached)
]

for table, cached in cache_status:
    status = "‚úÖ Cached" if cached else "‚ùå Not Cached"
    print(f'  {table:20}: {status}')

## 8. Integration Summary and Access Points

In [None]:
# Final integration summary
print('üéØ Big Data Integration Summary:')
print('=' * 50)

# Component status
components = {
    'HDFS': '‚úÖ Distributed file system ready',
    'Spark': '‚úÖ Distributed computing active', 
    'Hive': '‚úÖ Data warehouse with PostgreSQL metastore',
    'Jupyter': '‚úÖ Interactive development environment',
    'Data Pipeline': '‚úÖ ETL processing complete',
    'Analytics': '‚úÖ Business intelligence views created',
    'Export': '‚úÖ Multi-format data export ready'
}

print('\nüöÄ Component Status:')
for component, status in components.items():
    print(f'  {component:15}: {status}')

# Data pipeline stats
print('\nüìä Pipeline Statistics:')
try:
    user_count = users_enhanced.count()
    transaction_count = transactions_enhanced.count()
    
    stats = {
        'Users Processed': f'{user_count:,}',
        'Transactions': f'{transaction_count:,}',
        'Data Sources': '3 (CSV, JSON, generated)',
        'Export Formats': '3 (Parquet, CSV, JSON)',
        'Analytics Views': '5+ (segments, performance, geographic)'
    }
    
    for stat, value in stats.items():
        print(f'  {stat:18}: {value}')
except Exception as e:
    print(f'  Statistics calculation: {e}')

print('\nüåê Access Points:')
access_points = {
    'Jupyter Lab': 'http://localhost:8888 (token: bigdata123)',
    'Spark Master': 'http://localhost:8080',
    'Spark App UI': 'http://localhost:4040',
    'HDFS NameNode': 'http://localhost:9870',
    'YARN ResourceMgr': 'http://localhost:8088',
    'HiveServer2': 'http://localhost:10002'
}

for service, url in access_points.items():
    print(f'  {service:15}: {url}')

print('\nüí° Next Steps:')
next_steps = [
    '1. Explore the web UIs listed above',
    '2. Run custom analytics queries',
    '3. Add more data sources and processing',
    '4. Implement machine learning models',
    '5. Set up automated reporting',
    '6. Scale with additional data volumes'
]

for step in next_steps:
    print(f'  {step}')

print('\nüéâ Full Stack Big Data Integration Complete!')
print('\nüåü Environment Features:')
features = [
    '‚Ä¢ Production-ready Big Data stack',
    '‚Ä¢ Reliable PostgreSQL metastore',
    '‚Ä¢ Comprehensive data processing pipeline',
    '‚Ä¢ Advanced analytics and BI capabilities',
    '‚Ä¢ Multi-format data export options',
    '‚Ä¢ Performance monitoring tools',
    '‚Ä¢ Easy Windows automation scripts'
]

for feature in features:
    print(f'  {feature}')

print('\nüöÄ Ready for Big Data Analysis and Development!')
print('\nüìù Use the other notebooks to explore specific components:')
print('  ‚Ä¢ 01-hadoop-basics.ipynb - HDFS operations')
print('  ‚Ä¢ 02-spark-intro.ipynb - Spark fundamentals')
print('  ‚Ä¢ 03-hive-sql.ipynb - Hive SQL operations')