In [1]:
# Cell 1 - Install Java, PySpark and required Python packages
# Run this cell first. Restart runtime if prompted.
!apt-get update -qq
!apt-get install -y openjdk-11-jdk-headless -qq
!pip install -q pyspark pandas scikit-learn joblib matplotlib seaborn nbformat
print("Install complete.")

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Selecting previously unselected package java-common.
(Reading database ... 125080 files and directories currently installed.)
Preparing to unpack .../java-common_0.72build2_all.deb ...
Unpacking java-common (0.72build2) ...
Selecting previously unselected package libpcsclite1:amd64.
Preparing to unpack .../libpcsclite1_1.9.5-3ubuntu1_amd64.deb ...
Unpacking libpcsclite1:amd64 (1.9.5-3ubuntu1) ...
Selecting previously unselected package openjdk-11-jre-headless:amd64.
Preparing to unpack .../openjdk-11-jre-headless_11.0.28+6-1ubuntu1~22.04.1_amd64.deb ...
Unpacking openjdk-11-jre-headless:amd64 (11.0.28+6-1ubuntu1~22.04.1) ...
Selecting previously unselected package ca-certificates-java.
Preparing to unpack .../ca-certificates-java_20190909ubuntu1.2_all.deb ...
Unpacking ca-certificates-java (20190909u

In [4]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [5]:
import findspark, os
findspark.init()

# Check where pyspark is actually installed
import pyspark
print("‚úÖ PySpark Installed At:", pyspark.__file__)

# Extract the folder path automatically
spark_path = os.path.dirname(pyspark.__file__)
print("üìÅ Spark Path Detected:", spark_path)


‚úÖ PySpark Installed At: /usr/local/lib/python3.12/dist-packages/pyspark/__init__.py
üìÅ Spark Path Detected: /usr/local/lib/python3.12/dist-packages/pyspark


In [6]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.12/dist-packages/pyspark"
os.environ["HADOOP_HOME"] = "/usr/local/lib/python3.12/dist-packages/pyspark"
os.environ["PATH"] += os.pathsep + os.path.join(spark_path, "bin")

print("‚úÖEnvironment variables set successfully!")

‚úÖEnvironment variables set successfully!


In [7]:
from pyspark.sql import SparkSession
spark = SparkSession.builder     .appName("Smart_Agri_Colab")     .config("spark.ui.enabled", "false")     .getOrCreate()
print('Spark version:', spark.version)
print('SparkSession created:', spark)

Spark version: 3.5.1
SparkSession created: <pyspark.sql.session.SparkSession object at 0x7e9e3e257bf0>


In [9]:
# Cell 3 - Upload your CSV file (Crop_recommendation.csv)
# Use the file upload dialog to upload Crop_recommendation.csv and any existing model .pkl if you have one.
from google.colab import files
uploaded = files.upload()  # select Crop_recommendation.csv (and optionally crop_recommendation_pipeline.pkl)
for fn in uploaded.keys():
    print('Uploaded file:', fn)

Saving Crop_recommendation.csv to Crop_recommendation.csv
Uploaded file: Crop_recommendation.csv


In [10]:
# Cell 4 - ETL: Load CSV with Spark, basic cleaning, and inspection
from pyspark.sql.functions import col

CSV_PATH = "Crop_recommendation.csv"  # change if your filename differs

# Read CSV
df = spark.read.csv(CSV_PATH, header=True, inferSchema=True)
print('Schema:')
df.printSchema()
print('Sample rows:')
df.show(5)

# Basic cleaning: drop nulls in essential columns and cast to double
essential = ["N","P","K","temperature","humidity","ph","rainfall"]
df_clean = df.dropna(subset=essential)
for c in essential:
    df_clean = df_clean.withColumn(c, col(c).cast("double"))

# Filter unrealistic values
df_clean = df_clean.filter(
    (col("ph") > 0) & (col("ph") < 14) &
    (col("temperature") > -30) & (col("temperature") < 60) &
    (col("rainfall") >= 0)
).cache()

print('After cleaning - count:', df_clean.count())
df_clean.show(5)

Schema:
root
 |-- N: integer (nullable = true)
 |-- P: integer (nullable = true)
 |-- K: integer (nullable = true)
 |-- temperature: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- ph: double (nullable = true)
 |-- rainfall: double (nullable = true)
 |-- label: string (nullable = true)

Sample rows:
+---+---+---+-----------+-----------+-----------------+-----------+-----+
|  N|  P|  K|temperature|   humidity|               ph|   rainfall|label|
+---+---+---+-----------+-----------+-----------------+-----------+-----+
| 90| 42| 43|20.87974371|82.00274423|6.502985292000001|202.9355362| rice|
| 85| 58| 41|21.77046169|80.31964408|      7.038096361|226.6555374| rice|
| 60| 55| 44|23.00445915| 82.3207629|      7.840207144|263.9642476| rice|
| 74| 35| 40|26.49109635|80.15836264|      6.980400905|242.8640342| rice|
| 78| 42| 42|20.13017482|81.60487287|      7.628472891|262.7173405| rice|
+---+---+---+-----------+-----------+-----------------+-----------+-----+
only showin

In [11]:
# Cell 5 - Convert to pandas and prepare features/target
pandas_df = df_clean.toPandas()
features = ["N","P","K","temperature","humidity","ph","rainfall"]

# Detect label column common names
label_col = None
for candidate in ["label","crop","Crop","CropName","crop_name"]:
    if candidate in pandas_df.columns:
        label_col = candidate
        break

print("Detected label column:", label_col)
if label_col is None:
    raise ValueError("No label column found. Ensure your CSV contains target crop name column (label/crop).")

X = pandas_df[features].copy()
y = pandas_df[label_col]

Detected label column: label


In [12]:
# Cell 6 - Train ML pipeline and evaluate
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import joblib

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

pipeline = Pipeline([
    ("scaler", StandardScaler()),
    ("clf", RandomForestClassifier(n_estimators=200, random_state=42))
])

pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)

print("Accuracy:", accuracy_score(y_test, y_pred))
print("\nClassification report:\n", classification_report(y_test, y_pred))

Accuracy: 0.9954545454545455

Classification report:
               precision    recall  f1-score   support

       apple       1.00      1.00      1.00        20
      banana       1.00      1.00      1.00        20
   blackgram       1.00      0.95      0.97        20
    chickpea       1.00      1.00      1.00        20
     coconut       1.00      1.00      1.00        20
      coffee       1.00      1.00      1.00        20
      cotton       1.00      1.00      1.00        20
      grapes       1.00      1.00      1.00        20
        jute       0.95      1.00      0.98        20
 kidneybeans       1.00      1.00      1.00        20
      lentil       1.00      1.00      1.00        20
       maize       0.95      1.00      0.98        20
       mango       1.00      1.00      1.00        20
   mothbeans       1.00      1.00      1.00        20
    mungbean       1.00      1.00      1.00        20
   muskmelon       1.00      1.00      1.00        20
      orange       1.00    

In [14]:
# Cell 8 - Save trained pipeline and predictions
joblib.dump(pipeline, "crop_recommendation_pipeline_colab.pkl")
print("Saved pipeline as crop_recommendation_pipeline_colab.pkl")

pandas_df["Predicted_Crop"] = pipeline.predict(X)
pandas_df.to_csv("agri_predictions_colab.csv", index=False)
print("Saved predictions as agri_predictions_colab.csv")

# Provide download links
from google.colab import files
files.download("crop_recommendation_pipeline_colab.pkl")
files.download("agri_predictions_colab.csv")

Saved pipeline as crop_recommendation_pipeline_colab.pkl
Saved predictions as agri_predictions_colab.csv


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [15]:
# Cell 9 - Generate Airflow DAG file and instructions
# This cell writes a DAG python file you can copy to your Airflow DAGs folder on a local machine or VM.
DAG_CONTENT = r"""
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'ankush',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='smart_agri_pipeline',
    default_args=default_args,
    start_date=datetime(2025, 11, 1),
    schedule_interval='@weekly',
    catchup=False
) as dag:

    run_etl = BashOperator(
        task_id='run_agri_etl',
        bash_command='spark-submit /full/path/to/agri_etl_pipeline.py'
    )

    run_etl
"""

DAG_PATH = 'agri_pipeline_dag.py'
with open(DAG_PATH, 'w') as f:
    f.write(DAG_CONTENT)
print('Wrote DAG to', DAG_PATH)

# Also write a sample agri_etl_pipeline.py you can run with spark-submit
ETL_SCRIPT = r"""
# agri_etl_pipeline.py - simplified script for spark-submit
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import joblib

spark = SparkSession.builder.appName('Smart_Agri_Pipeline').getOrCreate()

# Adjust path to your CSV and model
RAW_CSV = '/full/path/to/Crop_recommendation.csv'
MODEL_PKL = '/full/path/to/crop_recommendation_pipeline_colab.pkl'
OUTPUT_PRED_CSV = '/full/path/to/agri_predictions.csv'

# Read and clean
df = spark.read.csv(RAW_CSV, header=True, inferSchema=True)
essential = ['N','P','K','temperature','humidity','ph','rainfall']
df_clean = df.dropna(subset=essential)
for c in essential:
    df_clean = df_clean.withColumn(c, col(c).cast('double'))

pandas_df = df_clean.toPandas()
model = joblib.load(MODEL_PKL)
X = pandas_df[essential]
pandas_df['Predicted_Crop'] = model.predict(X)
pandas_df.to_csv(OUTPUT_PRED_CSV, index=False)

spark.stop()
print('ETL + prediction done')
"""

with open('agri_etl_pipeline.py', 'w') as f:
    f.write(ETL_SCRIPT)
print('Wrote example ETL script agri_etl_pipeline.py')

print('\n\n--- Airflow instructions (local machine) ---\n')
print('1. Install Airflow on your local machine or server (follow official docs).')
print('2. Copy agri_pipeline_dag.py to your Airflow DAGs folder.')
print('3. Update the bash_command path in the DAG to the full path of agri_etl_pipeline.py')
print('4. Ensure spark-submit is available on the machine that runs Airflow (set PATH).')
print('5. Start Airflow scheduler and webserver, then trigger the DAG.')

Wrote DAG to agri_pipeline_dag.py
Wrote example ETL script agri_etl_pipeline.py


--- Airflow instructions (local machine) ---

1. Install Airflow on your local machine or server (follow official docs).
2. Copy agri_pipeline_dag.py to your Airflow DAGs folder.
3. Update the bash_command path in the DAG to the full path of agri_etl_pipeline.py
4. Ensure spark-submit is available on the machine that runs Airflow (set PATH).
5. Start Airflow scheduler and webserver, then trigger the DAG.
