In [None]:
# Import required libraries
import os
import sys
from datetime import datetime, timedelta
import random

# Spark imports
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark.sql.functions import col, when, avg, count, max as spark_max, min as spark_min

# Configure display options
import warnings
warnings.filterwarnings('ignore')

print("✓ Libraries imported successfully")
print(f"Notebook started at: {datetime.now()}")


In [None]:
# Configuration
HDFS_NAMENODE = "hdfs://localhost:9000"  # Update this to your HDFS namenode
BASE_PATH = "/data/acceldata-examples"

# Create Spark session
spark = SparkSession.builder \
    .appName("HDFS Operations Notebook - Acceldata") \
    .master("local[*]") \
    .config("spark.hadoop.fs.defaultFS", HDFS_NAMENODE) \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

# Set log level to reduce verbosity
spark.sparkContext.setLogLevel("WARN")

print("✓ Spark session created successfully")
print(f"Spark Version: {spark.version}")
print(f"HDFS NameNode: {HDFS_NAMENODE}")
print(f"Base Path: {BASE_PATH}")


In [None]:
def create_employee_dataframe():
    """Create sample employee DataFrame"""
    schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("name", StringType(), False),
        StructField("email", StringType(), False),
        StructField("department", StringType(), False),
        StructField("salary", DoubleType(), False),
        StructField("hire_date", TimestampType(), False)
    ])
    
    data = [
        (1, "John Doe", "john.doe@acceldata.io", "Engineering", 75000.0, datetime(2023, 1, 15)),
        (2, "Jane Smith", "jane.smith@acceldata.io", "Sales", 82000.0, datetime(2023, 2, 20)),
        (3, "Bob Johnson", "bob.johnson@acceldata.io", "Engineering", 68000.0, datetime(2023, 3, 10)),
        (4, "Alice Brown", "alice.brown@acceldata.io", "Marketing", 91000.0, datetime(2023, 4, 5)),
        (5, "Charlie Wilson", "charlie.wilson@acceldata.io", "HR", 77500.0, datetime(2023, 5, 12)),
        (6, "Diana Prince", "diana.prince@acceldata.io", "Engineering", 85000.0, datetime(2023, 6, 18)),
        (7, "Frank Miller", "frank.miller@acceldata.io", "Sales", 72000.0, datetime(2023, 7, 22)),
        (8, "Grace Lee", "grace.lee@acceldata.io", "Marketing", 88000.0, datetime(2023, 8, 30))
    ]
    
    return spark.createDataFrame(data, schema)

def create_sales_dataframe():
    """Create sample sales DataFrame"""
    schema = StructType([
        StructField("transaction_id", IntegerType(), False),
        StructField("employee_id", IntegerType(), False),
        StructField("product", StringType(), False),
        StructField("amount", DoubleType(), False),
        StructField("transaction_date", TimestampType(), False),
        StructField("region", StringType(), False)
    ])
    
    products = ["Data Platform", "Analytics Suite", "ML Tools", "Monitoring"]
    regions = ["North", "South", "East", "West"]
    
    data = []
    for i in range(50):
        data.append((
            i + 1,
            random.randint(1, 8),  # employee_id
            random.choice(products),
            round(random.uniform(1000, 50000), 2),
            datetime.now() - timedelta(days=random.randint(1, 365)),
            random.choice(regions)
        ))
    
    return spark.createDataFrame(data, schema)

# Create sample DataFrames
employee_df = create_employee_dataframe()
sales_df = create_sales_dataframe()

print("✓ Sample DataFrames created")
print(f"Employee records: {employee_df.count()}")
print(f"Sales records: {sales_df.count()}")

# Display sample data
print("\nEmployee Data Sample:")
employee_df.show(5)

print("\nSales Data Sample:")
sales_df.show(5)


In [None]:
# Define output paths
output_base = f"{BASE_PATH}/output"

print("=== Writing Data to HDFS ===")

# 1. Write as Parquet (recommended for analytics)
print("1. Writing employee data as Parquet...")
employee_df.write.mode("overwrite").parquet(f"{output_base}/employees_parquet")
print("   ✓ Employee data written as Parquet")

# 2. Write as CSV
print("2. Writing employee data as CSV...")
employee_df.coalesce(1).write.mode("overwrite") \
    .option("header", "true") \
    .csv(f"{output_base}/employees_csv")
print("   ✓ Employee data written as CSV")

# 3. Write as JSON
print("3. Writing employee data as JSON...")
employee_df.coalesce(1).write.mode("overwrite") \
    .json(f"{output_base}/employees_json")
print("   ✓ Employee data written as JSON")

# 4. Write with partitioning (by department)
print("4. Writing employee data with partitioning...")
employee_df.write.mode("overwrite") \
    .partitionBy("department") \
    .parquet(f"{output_base}/employees_partitioned")
print("   ✓ Employee data written with partitioning")

# 5. Write sales data with compression
print("5. Writing sales data with compression...")
sales_df.write.mode("overwrite") \
    .option("compression", "gzip") \
    .parquet(f"{output_base}/sales_compressed")
print("   ✓ Sales data written with compression")

print("\n✓ All data successfully written to HDFS")


In [None]:
print("=== Reading Data from HDFS ===")

# 1. Read Parquet data
print("1. Reading employee data from Parquet...")
employees_parquet = spark.read.parquet(f"{output_base}/employees_parquet")
print(f"   Records read: {employees_parquet.count()}")
print("   Schema:")
employees_parquet.printSchema()

# 2. Read CSV data
print("\n2. Reading employee data from CSV...")
employees_csv = spark.read.option("header", "true") \
    .option("inferSchema", "true") \
    .csv(f"{output_base}/employees_csv")
print(f"   Records read: {employees_csv.count()}")
employees_csv.show(3)

# 3. Read JSON data
print("3. Reading employee data from JSON...")
employees_json = spark.read.json(f"{output_base}/employees_json")
print(f"   Records read: {employees_json.count()}")

# 4. Read partitioned data
print("4. Reading partitioned employee data...")
employees_partitioned = spark.read.parquet(f"{output_base}/employees_partitioned")
print(f"   Records read: {employees_partitioned.count()}")
print("   Partitions by department:")
employees_partitioned.groupBy("department").count().show()

# 5. Read compressed sales data
print("5. Reading compressed sales data...")
sales_compressed = spark.read.parquet(f"{output_base}/sales_compressed")
print(f"   Records read: {sales_compressed.count()}")

print("\n✓ All data successfully read from HDFS")


In [None]:
print("=== Data Analysis ===")

# 1. Employee salary analysis
print("1. Employee Salary Analysis:")
salary_stats = employees_parquet.select("salary").describe()
salary_stats.show()

# 2. Department-wise analysis
print("2. Department-wise Analysis:")
dept_analysis = employees_parquet.groupBy("department") \
    .agg(
        count("*").alias("employee_count"),
        avg("salary").alias("avg_salary"),
        spark_max("salary").alias("max_salary"),
        spark_min("salary").alias("min_salary")
    ) \
    .orderBy("avg_salary", ascending=False)

dept_analysis.show()

# 3. Sales analysis
print("3. Sales Analysis:")
sales_analysis = sales_compressed.groupBy("region", "product") \
    .agg(
        count("*").alias("transaction_count"),
        avg("amount").alias("avg_amount"),
        sum("amount").alias("total_amount")
    ) \
    .orderBy("total_amount", ascending=False)

sales_analysis.show()

# 4. Create derived columns
print("4. Creating derived columns:")
employees_enhanced = employees_parquet.withColumn(
    "salary_category",
    when(col("salary") < 70000, "Low")
    .when(col("salary") < 85000, "Medium")
    .otherwise("High")
).withColumn(
    "years_of_service",
    (col("hire_date").cast("long") - datetime(2023, 1, 1).timestamp()) / (365.25 * 24 * 3600)
)

employees_enhanced.select("name", "department", "salary", "salary_category").show()

print("\n✓ Data analysis completed")


In [None]:
print("=== Advanced HDFS Operations ===")

# 1. Join operations across HDFS datasets
print("1. Performing JOIN operations:")
joined_data = employees_parquet.alias("emp") \
    .join(sales_compressed.alias("sales"), col("emp.id") == col("sales.employee_id")) \
    .select(
        col("emp.name"),
        col("emp.department"),
        col("sales.product"),
        col("sales.amount"),
        col("sales.region")
    )

print(f"   Joined records: {joined_data.count()}")
joined_data.show(10)

# 2. Write joined results back to HDFS
print("2. Writing joined results to HDFS...")
joined_data.write.mode("overwrite") \
    .partitionBy("department", "region") \
    .parquet(f"{output_base}/employee_sales_joined")
print("   ✓ Joined data written to HDFS")

# 3. Aggregated reporting
print("3. Creating aggregated reports:")
department_sales_report = joined_data.groupBy("department") \
    .agg(
        count("*").alias("total_transactions"),
        sum("amount").alias("total_sales"),
        avg("amount").alias("avg_transaction_amount")
    ) \
    .orderBy("total_sales", ascending=False)

department_sales_report.show()

# Write report to HDFS
department_sales_report.coalesce(1).write.mode("overwrite") \
    .option("header", "true") \
    .csv(f"{output_base}/department_sales_report")
print("   ✓ Department sales report written to HDFS")

# 4. Data quality checks
print("4. Data Quality Checks:")
print("   Checking for null values in employee data:")
null_counts = employees_parquet.select([
    sum(col(c).isNull().cast("int")).alias(c) for c in employees_parquet.columns
])
null_counts.show()

print("   Checking salary ranges:")
salary_ranges = employees_parquet.select(
    spark_min("salary").alias("min_salary"),
    spark_max("salary").alias("max_salary"),
    avg("salary").alias("avg_salary")
)
salary_ranges.show()

print("\n✓ Advanced operations completed")


In [None]:
print("=== Performance Optimization Examples ===")

# 1. Caching frequently accessed data
print("1. Caching frequently accessed DataFrames:")
employees_parquet.cache()
print("   ✓ Employee DataFrame cached in memory")

# Perform multiple operations on cached data
print("   Performing multiple operations on cached data...")
count1 = employees_parquet.count()
count2 = employees_parquet.filter(col("salary") > 80000).count()
print(f"   Total employees: {count1}")
print(f"   High salary employees: {count2}")

# 2. Optimal partitioning
print("\n2. Optimal partitioning strategies:")
print(f"   Current partitions in employee data: {employees_parquet.rdd.getNumPartitions()}")
print(f"   Current partitions in sales data: {sales_compressed.rdd.getNumPartitions()}")

# Repartition for better performance
sales_repartitioned = sales_compressed.repartition(4, "region")
print(f"   Sales data repartitioned by region: {sales_repartitioned.rdd.getNumPartitions()} partitions")

# 3. Coalescing for output optimization
print("\n3. Coalescing for output optimization:")
# Instead of many small files, create fewer larger files
employees_parquet.coalesce(2).write.mode("overwrite") \
    .parquet(f"{output_base}/employees_coalesced")
print("   ✓ Employee data written with coalescing (fewer output files)")

# 4. Predicate pushdown example
print("\n4. Predicate pushdown optimization:")
# Filter early to reduce data movement
high_salary_employees = spark.read.parquet(f"{output_base}/employees_parquet") \
    .filter(col("salary") > 80000) \
    .select("name", "department", "salary")

print(f"   High salary employees: {high_salary_employees.count()}")
high_salary_employees.show()

# 5. Compression comparison
print("\n5. Compression comparison:")
# Write same data with different compression
employees_parquet.write.mode("overwrite") \
    .option("compression", "snappy") \
    .parquet(f"{output_base}/employees_snappy")

employees_parquet.write.mode("overwrite") \
    .option("compression", "gzip") \
    .parquet(f"{output_base}/employees_gzip")

print("   ✓ Data written with different compression formats")
print("   Note: Check HDFS to compare file sizes")

print("\n✓ Performance optimization examples completed")


In [None]:
print("=== Summary of HDFS Operations ===")

print("✓ Successfully demonstrated:")
print("  1. Spark session configuration for HDFS")
print("  2. Creating sample datasets")
print("  3. Writing data to HDFS in multiple formats:")
print("     - Parquet (recommended for analytics)")
print("     - CSV (for interoperability)")
print("     - JSON (for semi-structured data)")
print("     - Partitioned data (for performance)")
print("     - Compressed data (for storage efficiency)")
print("  4. Reading data from HDFS")
print("  5. Data analysis and transformations")
print("  6. Advanced operations (joins, aggregations)")
print("  7. Performance optimization techniques")

print(f"\nData written to HDFS base path: {BASE_PATH}")
print("Files created:")
print("  - employees_parquet/")
print("  - employees_csv/")
print("  - employees_json/")
print("  - employees_partitioned/")
print("  - sales_compressed/")
print("  - employee_sales_joined/")
print("  - department_sales_report/")
print("  - employees_coalesced/")
print("  - employees_snappy/")
print("  - employees_gzip/")

print(f"\nNotebook completed at: {datetime.now()}")
print("Company: Acceldata Inc. (acceldata.io)")

# Cleanup
employees_parquet.unpersist()  # Remove from cache
print("\n✓ Resources cleaned up")

# Note: Keep Spark session running for interactive use
# Uncomment the next line to stop Spark session
# spark.stop()
