## Summary: All Lab Pillars Covered

✅ **Pillar 1: Installation & Setup** - Docker cluster with Hadoop and Spark  
✅ **Pillar 2: SparkPi Example** - Calculate PI with different partitions  
✅ **Pillar 3: Scala WordCount** - spark-shell RDD operations  
✅ **Pillar 4: Python Applications** - RDD and DataFrame based implementations  
✅ **Pillar 5: PySpark on Colab** - Complete installation and setup steps  
✅ **Pillar 6: Data Manipulation** - CSV loading, filtering, groupBy, sorting  
✅ **Pillar 7: Spark SQL** - SQL queries with aggregations and joins  
✅ **Pillar 8: MongoDB Integration** - Atlas connection, CRUD, aggregations  
✅ **Pillar 9: Visualization** - Barplots, histograms, status comparisons  

### Key Takeaways

1. **Spark is a unified framework** for batch and stream processing
2. **RDDs** are low-level, **DataFrames** are optimized via Catalyst
3. **Spark SQL** provides SQL interface for complex queries
4. **MongoDB integration** enables NoSQL data analysis
5. **Visualization** makes insights actionable
6. **Colab** allows free Spark experimentation

### Files Generated

- `wordcount_analysis.py` - Exercise 1 (DataFrame + SQL)
- `dataframe_agg.py` - Exercise 2 (aggregations)
- `dataframe_join.py` - Exercise 3 (joins)
- `mongodb_integration.py` - Exercise 4 (MongoDB)
- This complete notebook covering all pillars

In [None]:
# 8.4 Advanced visualization - Amount by Type and Status
df_detailed = df.groupBy("Transaction Type", "Transaction Status").agg(spark_sum("Amount")).toPandas()
df_detailed.columns = ["Transaction Type", "Transaction Status", "Total Amount"]

plt.figure(figsize=(12, 6))
sns.barplot(data=df_detailed, x="Transaction Type", y="Total Amount", hue="Transaction Status", palette="Set2")
plt.title("Amount by Transaction Type and Status", fontsize=14, fontweight='bold')
plt.xlabel("Transaction Type", fontsize=12)
plt.ylabel("Total Amount (€)", fontsize=12)
plt.legend(title="Status", loc="upper right")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

print("✓ Advanced visualization created")

In [None]:
# 8.3 Successful vs Failed Transactions (Countplot)
df_status = df.groupBy("Transaction Status").count().toPandas()
df_status.columns = ["Transaction Status", "Count"]

plt.figure(figsize=(8, 5))
sns.barplot(data=df_status, x="Transaction Status", y="Count", palette="pastel")
plt.title("Successful vs Failed Transactions", fontsize=14, fontweight='bold')
plt.xlabel("Transaction Status", fontsize=12)
plt.ylabel("Number of Transactions", fontsize=12)
plt.tight_layout()
plt.show()

print("✓ Status comparison plot created")

In [None]:
# 8.2 Distribution of transaction amounts (Histogram)
df_pandas = df.select("Amount").toPandas()

plt.figure(figsize=(10, 5))
sns.histplot(df_pandas["Amount"], bins=30, kde=True, color="skyblue", edgecolor="black")
plt.title("Distribution of Transaction Amounts", fontsize=14, fontweight='bold')
plt.xlabel("Amount (€)", fontsize=12)
plt.ylabel("Frequency", fontsize=12)
plt.tight_layout()
plt.show()

print("✓ Histogram created")

In [None]:
# 8.1 Convert Spark DF to Pandas and visualize
import matplotlib.pyplot as plt
import seaborn as sns

# Aggregate data by transaction type
df_grouped = df.groupBy("Transaction Type").agg(spark_sum("Amount")).toPandas()
df_grouped.columns = ["Transaction Type", "Total Amount"]
df_grouped = df_grouped.sort_values(by="Total Amount", ascending=False)

# Create barplot
plt.figure(figsize=(10, 5))
sns.barplot(data=df_grouped, x="Transaction Type", y="Total Amount", palette="coolwarm")
plt.title("Total Amount by Transaction Type", fontsize=14, fontweight='bold')
plt.xlabel("Transaction Type", fontsize=12)
plt.ylabel("Total Amount (€)", fontsize=12)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

print("✓ Barplot created")

## Section 8: Data Visualization with Matplotlib & Seaborn

### 8.1 Visualizing Total Amount by Transaction Type

In [None]:
# 7.3 Aggregation - Accounts with multiple transactions
print("\n=== Accounts with More Than 2 Transactions ===")
pipeline2 = [
    {
        "$group": {
            "_id": "$Sender Account ID",
            "count": {"$sum": 1},
            "total_amount": {"$sum": "$Amount"}
        }
    },
    {"$match": {"count": {"$gt": 2}}},
    {"$sort": {"count": -1}}
]

result2 = transactions_col.aggregate(pipeline2)
for doc in result2:
    print(f"Account: {doc['_id']}, Transactions: {doc['count']}, Total: ${doc['total_amount']:.2f}")

In [None]:
# 7.2 Aggregation - Average amount by transaction type
print("\n=== MongoDB Aggregation: Average Amount by Type ===")
pipeline = [
    {
        "$group": {
            "_id": "$Transaction Type",
            "avg_amount": {"$avg": "$Amount"},
            "count": {"$sum": 1},
            "total": {"$sum": "$Amount"}
        }
    },
    {"$sort": {"total": -1}}
]

result = transactions_col.aggregate(pipeline)
for doc in result:
    print(f"Type: {doc['_id']}, Avg: ${doc['avg_amount']:.2f}, Count: {doc['count']}, Total: ${doc['total']:.2f}")

In [None]:
# 7.1 Insert transaction data into MongoDB Atlas
from pymongo import MongoClient

# MongoDB Atlas connection
MONGO_URI = "mongodb+srv://abdelilahhalim05_db_user:C2X4AXnO7MJWm52Z@cluster0.htrgtlb.mongodb.net/"
client = MongoClient(MONGO_URI)

# Access database and collection
db = client['spark_lab']
transactions_col = db['transactions']

# Clear existing data
transactions_col.delete_many({})

# Convert DataFrame to dictionary records
records = df_transactions.to_dict('records')

# Insert into MongoDB
if records:
    result = transactions_col.insert_many(records)
    print(f"✓ Inserted {len(result)} transaction records into MongoDB")
else:
    print("No records to insert")

# Display sample
sample = list(transactions_col.find().limit(3))
print(f"\nSample from MongoDB:")
for record in sample:
    print(f"  Transaction ID: {record.get('Transaction ID')}, Amount: {record.get('Amount')}")

## Section 7: Case Study - Spark Integration with MongoDB Atlas

### MongoDB Setup with Your Credentials

Use your MongoDB credentials to connect and analyze data.

In [None]:
# 6.3 SQL Query - Successful transactions > 500
print("\n=== SQL: Successful Transactions > 500 ===")
result2 = spark.sql("""
    SELECT 
        `Transaction ID`,
        `Sender Account ID`,
        `Amount`,
        `Transaction Type`,
        `Date`
    FROM transactions
    WHERE `Transaction Status` = 'Success' AND `Amount` > 500
    ORDER BY `Amount` DESC
    LIMIT 10
""")
result2.show()

In [None]:
# 6.2 SQL Query - Total amount by transaction type
print("\n=== SQL: Total Amount by Transaction Type ===")
result = spark.sql("""
    SELECT 
        `Transaction Type`, 
        SUM(`Amount`) as Total,
        COUNT(*) as Count,
        ROUND(AVG(`Amount`), 2) as Average
    FROM transactions
    GROUP BY `Transaction Type`
    ORDER BY Total DESC
""")
result.show()

In [None]:
# 6.1 Create temporary table/view
df.createOrReplaceTempView("transactions")
print("✓ Temporary view 'transactions' created")

## Section 6: Spark SQL Operations

### 6.1 Creating Temporary Views and SQL Queries

In [None]:
# 5.7 Sorting operations - Order by amount descending
print("\n=== Top 10 Transactions by Amount (Descending) ===")
from pyspark.sql.functions import col, desc
df.orderBy(desc("Amount")).show(10)

In [None]:
# 5.6 GroupBy aggregation - Total amount by transaction type
print("\n=== Total Amount by Transaction Type (GroupBy + Sum) ===")
from pyspark.sql.functions import sum as spark_sum
df.groupBy("Transaction Type").agg(spark_sum("Amount")).show()

In [None]:
# 5.5 Column selection
print("\n=== Selected Columns (ID, Type, Amount) ===")
df.select("Transaction ID", "Transaction Type", "Amount").show(5)

In [None]:
# 5.4 Filtering operations - Transactions > 1000
print("\n=== Transactions with Amount > 1000 ===")
df.filter(df["Amount"] > 1000).show()

In [None]:
# 5.3 Display schema information
print("=== DataFrame Schema ===")
df.printSchema()

In [None]:
# 5.2 Load CSV data into Spark DataFrame
df = spark.createDataFrame(df_transactions)
print("✓ Spark DataFrame created from transaction data")
df.show(5)

In [None]:
# Create sample transaction data
import pandas as pd
from datetime import datetime, timedelta
import random

# Generate sample transaction data
data = []
transaction_types = ["Purchase", "Transfer", "Withdrawal", "Deposit"]
statuses = ["Success", "Failed", "Pending"]

for i in range(100):
    data.append({
        "Transaction ID": i + 1,
        "Sender Account ID": f"ACC{random.randint(1000, 9999)}",
        "Amount": round(random.uniform(10, 5000), 2),
        "Transaction Type": random.choice(transaction_types),
        "Transaction Status": random.choices(statuses, weights=[0.8, 0.1, 0.1])[0],
        "Date": (datetime.now() - timedelta(days=random.randint(0, 90))).strftime("%Y-%m-%d")
    })

# Create DataFrame
df_transactions = pd.DataFrame(data)
print("✓ Sample transaction data created")
print(f"Shape: {df_transactions.shape}")
df_transactions.head()

## Section 5: Loading and Manipulating Data with Spark

### 5.1 Creating Sample Transaction Data

In [None]:
# 4.5 Create Spark Session
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ColabSpark") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

print("✓ Spark session created successfully!")
print(f"Spark version: {spark.version}")

In [None]:
# 4.4 Configure environment and initialize Spark
import os
import sys
import findspark

# Configure environment (commented out - uncomment if needed)
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

# Initialize findspark
findspark.init()
findspark.find()
print("✓ findspark initialized")

In [None]:
# 4.3 Install Python packages
!pip install -q findspark
!pip install -q pyspark
!pip install -q py4j
!pip install -q pymongo matplotlib seaborn
print("✓ All Python packages installed")

In [None]:
# 4.2 Download and install Apache Spark
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
print("✓ Apache Spark downloaded and extracted")

In [None]:
# 4.1 Install Java JDK and dependencies
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
print("✓ Java JDK installed")

## Section 4: Installing PySpark on Google Colab

Execute these commands in Colab cells (in order):

### 3.2 DataFrame-Based WordCount in Python

```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, split, count

spark = SparkSession.builder.master("yarn").appName('wordcount_df').getOrCreate()

# Using DataFrame API (higher-level, optimized)
lines = spark.read.text("hdfs://hadoop-master:9000/user/root/input/alice.txt")
words = lines.select(explode(split(col("value"), " ")).alias("word"))
word_counts = words.filter(col("word") != "").groupBy("word").count().orderBy(col("count").desc())

word_counts.show(20)
word_counts.write.csv("hdfs://hadoop-master:9000/user/root/output/df_wordcount", mode="overwrite")
spark.stop()
```

**Difference:** RDD vs DataFrame
- **RDD:** Low-level, functional programming, manual optimization
- **DataFrame:** High-level, SQL-like, Catalyst optimizer

## Section 3: Submitting Python Applications

### 3.1 RDD-Based WordCount in Python

Create `wordcount_rdd.py`:

```python
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("yarn").appName('wordcount').getOrCreate()

# Using RDD API (lower-level)
data = spark.sparkContext.textFile("hdfs://hadoop-master:9000/user/root/input/alice.txt")
words = data.flatMap(lambda line: line.split(" "))
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
wordCounts.saveAsTextFile("hdfs://hadoop-master:9000/user/root/output/rdd_wordcount")

print("✓ WordCount RDD processing complete")
spark.stop()
```

Submit with:
```bash
spark-submit --master yarn wordcount_rdd.py
```

### 2.2 WordCount with Scala (spark-shell Equivalent)

Execute in spark-shell (inside Docker container):

```scala
// Open spark-shell
spark-shell

// Write and execute Scala code
val data = sc.textFile("hdfs://hadoop-master:9000/user/root/input/alice.txt")
val count = data.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_)
count.saveAsTextFile("hdfs://hadoop-master:9000/user/root/output/respark1")

// Exit spark-shell
:quit
```

**Note:** This demonstrates RDD operations in Scala, showing how to:
- Load text files from HDFS
- Use flatMap to split lines
- Use map/reduce to count occurrences
- Save results back to HDFS

## Section 2: First Examples with Apache Spark

### 2.1 SparkPi Example - Calculate PI Using spark-submit

Execute the following command in your Docker container to calculate PI:

```bash
# Basic SparkPi with 100 partitions
spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master local[*] \
  $SPARK_HOME/examples/jars/spark-examples_2.12-3.2.1.jar \
  100

# Test with different partition values
spark-submit --class org.apache.spark.examples.SparkPi --master local[*] \
  $SPARK_HOME/examples/jars/spark-examples_2.12-3.2.1.jar 50

spark-submit --class org.apache.spark.examples.SparkPi --master local[*] \
  $SPARK_HOME/examples/jars/spark-examples_2.12-3.2.1.jar 500
```

**Expected Output:** PI is approximately 3.14159...

## Section 1: Installation and Setup of Spark Cluster with Docker

### Docker Cluster Initialization Commands

```bash
# Start the Docker cluster
docker-compose up -d

# Access the Hadoop master container
docker exec -it hadoop-master bash

# Start Hadoop (inside the container)
./start-hadoop.sh

# Start Spark (inside the container)
./start-spark.sh

# Verify Spark and YARN are running using jps
jps

# Access web UIs
# - Yarn Web UI: https://localhost:8088
# - Spark Web UI: https://localhost:8080
```

After startup verification, exit the container with `exit` and continue with this notebook.

# Complete Apache Spark Lab - All Pillars Covered
## BIG DATA - ANNÉE UNIVERSITAIRE 2025-2026
## TP: Cluster Spark avec Docker

This comprehensive notebook covers all required pillars:
- ✅ Spark Cluster installation with Docker
- ✅ SparkPi example calculation
- ✅ WordCount with Scala (spark-shell equivalent)
- ✅ Python application submission
- ✅ PySpark installation on Colab
- ✅ CSV data loading and manipulation
- ✅ Spark SQL operations
- ✅ MongoDB Atlas integration
- ✅ Data visualization with matplotlib/seaborn