## 🔁 2. ETL with PySpark from MongoDB
This notebook loads data from MongoDB Atlas, performs cleaning and transformations using **PySpark**, and saves the output as partitioned **Parquet** files for downstream data warehouse modeling.

### ⚙️ Setup PySpark + Mongo Connector

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark==3.1.2 pymongo dnspython --quiet

# Install MongoDB Spark Connector JAR
import os

spark_jars_dir = "/content/jars"
os.makedirs(spark_jars_dir, exist_ok=True)
mongo_jar_url = "https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/3.0.1/mongo-spark-connector_2.12-3.0.1.jar"
mongo_jar_path = os.path.join(spark_jars_dir, "mongo-spark-connector_2.12-3.0.1.jar")

if not os.path.exists(mongo_jar_path):
    import urllib.request
    urllib.request.urlretrieve(mongo_jar_url, mongo_jar_path)

print("✅ Spark + Mongo connector ready.")

### 🚀 Step 1: Initialize Spark Session

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ETL from MongoDB") \
    .config("spark.jars", mongo_jar_path) \
    .config("spark.mongodb.input.uri", "mongodb+srv://<username>:<password>@<cluster>.mongodb.net/enterprise_logs") \
    .getOrCreate()

print("✅ Spark session started.")

### 📥 Step 2: Load Collections from MongoDB

In [None]:
def load_collection(collection_name):
    return spark.read.format("mongo").option("uri", f"mongodb+srv://<username>:<password>@<cluster>.mongodb.net/enterprise_logs.{collection_name}").load()

df_sales = load_collection("sales_logs")
df_activity = load_collection("user_activity_logs")
df_inventory = load_collection("inventory_events")

df_sales.printSchema()
df_sales.show(2)

### 🧹 Step 3: Data Cleaning & Transformation

In [None]:
from pyspark.sql.functions import col, to_timestamp

# --- Clean Sales Logs ---
df_sales_clean = df_sales \
    .withColumn("timestamp", to_timestamp("timestamp")) \
    .withColumn("price", col("price").cast("double")) \
    .withColumn("quantity", col("quantity").cast("int"))

# --- Clean Activity Logs ---
df_activity_clean = df_activity \
    .withColumn("timestamp", to_timestamp("timestamp")) \
    .withColumn("session_duration", col("session_duration").cast("double"))

# --- Clean Inventory Logs ---
df_inventory_clean = df_inventory \
    .withColumn("timestamp", to_timestamp("timestamp")) \
    .withColumn("quantity_added", col("quantity_added").cast("int")) \
    .withColumn("quantity_removed", col("quantity_removed").cast("int"))

### 💾 Step 4: Save as Partitioned Parquet Files

In [None]:
output_base = "/content/big_data_etl_project/4_data_warehouse/warehouse"

# --- Sales Logs → partition by region ---
df_sales_clean.write.partitionBy("region") \
    .mode("overwrite") \
    .parquet(os.path.join(output_base, "sales_logs"))

# --- Activity Logs → partition by device ---
df_activity_clean.write.partitionBy("device") \
    .mode("overwrite") \
    .parquet(os.path.join(output_base, "user_activity_logs"))

# --- Inventory Logs → partition by warehouse_id ---
df_inventory_clean.write.partitionBy("warehouse_id") \
    .mode("overwrite") \
    .parquet(os.path.join(output_base, "inventory_events"))

print("✅ Partitioned Parquet files saved.")

## ✅ Summary
- MongoDB collections successfully loaded using PySpark.
- Cleaned and typecast all fields.
- Saved partitioned Parquet files for each log type:
   - `sales_logs` → by `region`
   - `user_activity_logs` → by `device`
   - `inventory_events` → by `warehouse_id`
- Data is now ready for Data Warehouse modeling.
