In [None]:
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

import matplotlib.pyplot as plt
import pandas as pd

# SparkSession (adjust paths if needed for your notebook environment)
spark = (
    SparkSession.builder
    .appName("FLW-ETL")
    .config("spark.mongodb.read.connection.uri", "mongodb://localhost:27017")
    .config("spark.executor.memory", "2g")
    .getOrCreate()
)

# Load from MongoDB
df = (
    spark.read.format("mongodb")
    .option("database", "flw")
    .option("collection", "records")
    .load()
)

print("Data loaded from MongoDB:", df.count(), "rows")

# Data Cleaning and Imputation
num_cols = ["m49_code", "year", "loss_percentage", "loss_quantity", "sample_size"]
for col in num_cols:
    df = df.withColumn(col, F.col(col).cast("double"))

cat_cols = ["country", "region", "cpc_code", "commodity", "food_supply_stage",
            "activity", "treatment", "cause_of_loss", "method_data_collection", "reference"]

for col in cat_cols:
    df = df.withColumn(col, F.when(F.col(col).isNull() | (F.trim(F.col(col)) == ""), "UNK").otherwise(F.col(col)))

# Remove outliers/extreme loss percentages
df = df.filter((F.col("loss_percentage").isNotNull()) & (F.col("loss_percentage") >= 0) & (F.col("loss_percentage") <= 100))
df = df.filter(F.col("year").isNotNull())

print("Rows after cleaning:", df.count())

# Bucket rare categories for model stability
for col in cat_cols:
    values = [r[0] for r in df.groupBy(col).count().orderBy(F.desc("count")).limit(50).collect()]
    df = df.withColumn(col, F.when(F.col(col).isin(values), F.col(col)).otherwise(F.lit(f"OTHER_{col}")))

# Feature Engineering: 3-year rolling mean
entity_cols = ["country", "commodity", "food_supply_stage"]
w = Window.partitionBy(*entity_cols).orderBy("year").rowsBetween(-2, 0)
df = df.withColumn("roll3_mean_loss_pct", F.avg("loss_percentage").over(w))

# ML encoding and feature assembly
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid="keep") for col in cat_cols]
encoders = [OneHotEncoder(inputCols=[f"{col}_idx"], outputCols=[f"{col}_oh"], handleInvalid="keep") for col in cat_cols]
feat_cols = [f"{col}_oh" for col in cat_cols] + ["year", "loss_quantity", "sample_size", "roll3_mean_loss_pct"]
assembler = VectorAssembler(inputCols=feat_cols, outputCol="features")
pipeline = Pipeline(stages=indexers + encoders + [assembler])
model = pipeline.fit(df)
curated = model.transform(df).select("features", "year", F.col("loss_percentage").alias("label"), "commodity", "country", "food_supply_stage", "cause_of_loss")

print("Final feature engineering complete")

# Export sample data to pandas for visualization (use .sample or .limit for large datasets)
sample_pd = df.limit(2000).toPandas()   # Use .limit(N) for notebooks; avoids memory issues

display(sample_pd.head())

# Visualization: Boxplot by commodity (inline in notebook)
plt.figure(figsize=(12,5))
plt.title("Distribution of Loss Percentage by Commodity")
sample_pd.boxplot(column="loss_percentage", by="commodity", rot=60)
plt.ylabel("Loss Percentage (%)")
plt.tight_layout()
plt.show()

# Histogram of global loss percentages
plt.figure(figsize=(10,5))
sample_pd["loss_percentage"].hist(bins=40)
plt.title("Distribution of Loss Percentage (Global)")
plt.xlabel("Loss Percentage (%)")
plt.ylabel("Frequency")
plt.tight_layout()
plt.show()

print("✔ Preprocessing & analytics complete. Visualizations displayed.")


Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.UnsupportedOperationException: getSubject is supported only if a security manager is allowed
	at java.base/javax.security.auth.Subject.getSubject(Subject.java:347)
	at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:588)
	at org.apache.spark.util.Utils$.$anonfun$getCurrentUserName$1(Utils.scala:2446)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2446)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:339)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:59)
	at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:501)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:485)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:1575)


In [3]:
!pip install matplotlib pandas pyspark

Collecting pyspark
  Downloading pyspark-4.0.1.tar.gz (434.2 MB)
     ---------------------------------------- 0.0/434.2 MB ? eta -:--:--
     ---------------------------------------- 1.3/434.2 MB 8.4 MB/s eta 0:00:52
     ---------------------------------------- 2.4/434.2 MB 5.8 MB/s eta 0:01:14
     ---------------------------------------- 3.9/434.2 MB 6.2 MB/s eta 0:01:10
     ---------------------------------------- 5.2/434.2 MB 6.0 MB/s eta 0:01:12
      --------------------------------------- 6.6/434.2 MB 6.0 MB/s eta 0:01:12
      --------------------------------------- 6.8/434.2 MB 6.1 MB/s eta 0:01:11
      --------------------------------------- 6.8/434.2 MB 6.1 MB/s eta 0:01:11
      --------------------------------------- 7.6/434.2 MB 4.4 MB/s eta 0:01:37
      --------------------------------------- 8.9/434.2 MB 4.7 MB/s eta 0:01:32
      -------------------------------------- 10.2/434.2 MB 4.9 MB/s eta 0:01:27
     - ------------------------------------- 11.5/434.2 MB 5.0


[notice] A new release of pip is available: 25.0.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip
