# Distributed Data Processing Pipeline - Example Notebook

This notebook demonstrates how to use the data processing pipeline for exploratory data analysis.

**Author:** Gabriel Demetrios Lafis

## 1. Setup

Import necessary libraries and initialize Spark session.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import matplotlib.pyplot as plt
import seaborn as sns

# Initialize Spark session
spark = SparkSession.builder \
    .appName("DataExploration") \
    .master("local[*]") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

print(f"Spark version: {spark.version}")

## 2. Generate Sample Data

Create sample transaction data for analysis.

In [None]:
import random
from datetime import datetime, timedelta

# Generate sample data
data = []
for i in range(1000):
    data.append((
        f"tx_{i}",
        f"cust_{random.randint(1, 100)}",
        f"prod_{random.randint(1, 50)}",
        round(random.uniform(10.0, 500.0), 2),
        random.randint(1, 10),
        int((datetime.now() - timedelta(days=random.randint(0, 30))).timestamp()),
        random.choice(["US", "UK", "CA", "AU", "DE"]),
        random.choice(["Electronics", "Books", "Clothing", "Food", "Home"])
    ))

columns = ["transactionId", "customerId", "productId", "amount", "quantity", "timestamp", "country", "category"]
df = spark.createDataFrame(data, columns)

print(f"Generated {df.count()} transactions")
df.show(5)

## 3. Data Transformation

Apply transformations to the data.

In [None]:
# Add derived columns
df_transformed = df \
    .withColumn("total_amount", col("amount") * col("quantity")) \
    .withColumn("date", to_date(from_unixtime(col("timestamp")))) \
    .withColumn("year", year(col("date"))) \
    .withColumn("month", month(col("date"))) \
    .withColumn("day", dayofmonth(col("date")))

df_transformed.show(5)

## 4. Data Quality Checks

Perform data quality validations.

In [None]:
# Check for nulls
print("Null counts:")
df_transformed.select([count(when(col(c).isNull(), c)).alias(c) for c in df_transformed.columns]).show()

# Check for duplicates
total_records = df_transformed.count()
distinct_records = df_transformed.select("transactionId").distinct().count()
print(f"\nTotal records: {total_records}")
print(f"Distinct records: {distinct_records}")
print(f"Duplicates: {total_records - distinct_records}")

## 5. Aggregations

Create summary statistics and aggregations.

In [None]:
# Aggregate by country and category
df_agg = df_transformed.groupBy("country", "category") \
    .agg(
        count("*").alias("transaction_count"),
        sum("total_amount").alias("total_revenue"),
        avg("total_amount").alias("avg_revenue"),
        countDistinct("customerId").alias("unique_customers")
    ) \
    .orderBy(desc("total_revenue"))

df_agg.show()

## 6. Visualization

Create visualizations of the data.

In [None]:
# Convert to Pandas for visualization
df_pandas = df_agg.toPandas()

# Revenue by country
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
revenue_by_country = df_pandas.groupby('country')['total_revenue'].sum().sort_values(ascending=False)
revenue_by_country.plot(kind='bar', color='steelblue')
plt.title('Total Revenue by Country')
plt.xlabel('Country')
plt.ylabel('Revenue')
plt.xticks(rotation=45)

# Revenue by category
plt.subplot(1, 2, 2)
revenue_by_category = df_pandas.groupby('category')['total_revenue'].sum().sort_values(ascending=False)
revenue_by_category.plot(kind='bar', color='coral')
plt.title('Total Revenue by Category')
plt.xlabel('Category')
plt.ylabel('Revenue')
plt.xticks(rotation=45)

plt.tight_layout()
plt.show()

## 7. Write Results to Delta Lake

Save processed data to Delta Lake format.

In [None]:
# Write to Delta Lake
output_path = "/tmp/delta-output/transactions"

df_transformed.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("country", "category") \
    .save(output_path)

print(f"Data written to: {output_path}")

# Verify the write
df_read = spark.read.format("delta").load(output_path)
print(f"Records written: {df_read.count()}")

## 8. Delta Lake Features

Demonstrate Delta Lake capabilities.

In [None]:
from delta.tables import DeltaTable

# Read Delta table
deltaTable = DeltaTable.forPath(spark, output_path)

# Show history
print("Delta table history:")
deltaTable.history().select("version", "timestamp", "operation", "operationMetrics").show(truncate=False)

# Show table details
print("\nTable details:")
deltaTable.detail().show(truncate=False)

## 9. Cleanup

Stop Spark session.

In [None]:
spark.stop()
print("Spark session stopped")

## Conclusion

This notebook demonstrated:
- Setting up a Spark session with Delta Lake
- Generating and transforming data
- Performing data quality checks
- Creating aggregations
- Visualizing results
- Writing to Delta Lake
- Using Delta Lake features

For more examples, see the [README](../README.md) and [code examples](../src/).