In [7]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, max as spark_max
from pyspark.sql.types import StructType, StructField, FloatType, StringType
from pymongo import MongoClient
import pandas as pd
from datetime import datetime

# MongoDB configuration
MONGO_URI = "mongodb://localhost:27017/"
DB_NAME = "Recommendations"
COLLECTION_NAME = "Recommendations"

# Kafka & Excel configuration
KAFKA_TOPIC = "SF_weather_data"
KAFKA_BROKER = "localhost:9092"
EXCEL_FILE = "khuyen-nghi.xlsx"

# Spark session
spark = SparkSession.builder \
    .appName("SFr-recommend") \
    .master("local[*]") \
    .getOrCreate()

In [9]:
# Schema for Kafka data
schema = StructType([
    StructField("temperature", FloatType(), True),
    StructField("humidity_air", FloatType(), True),
    StructField("pressure", FloatType(), True),
    StructField("soil_moisture", FloatType(), True),
    StructField("timestamp", StringType(), True)
])

In [10]:
# MongoDB connection
mongo_client = MongoClient(MONGO_URI)
db = mongo_client[DB_NAME]
collection = db[COLLECTION_NAME]

In [11]:
# Read data from Kafka topic
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("startingOffsets", "latest") \
    .load() 

In [12]:
# Decode Kafka data

decoded_df = df.selectExpr("CAST(value AS STRING) as json_data") \
    .select(from_json(col("json_data"), schema).alias("data")) \
    .select("data.*")

In [13]:
# Hàm lấy dữ liệu từ Excel
def get_recommendations(crop, soil_type, actual_data):
    # Đọc Excel
    df_excel = pd.read_excel(EXCEL_FILE)

    # Lọc dữ liệu theo crop và soil_type
    crop_data = df_excel[(df_excel["label"] == crop) & (df_excel["Soilcolor"] == soil_type)]
    if crop_data.empty:
        return f"Không tìm thấy dữ liệu cho cây {crop} trên loại đất {soil_type}."

    crop_data = crop_data.iloc[0]  # Dòng đầu tiên

    # Xác định mùa hiện tại
    month = datetime.now().month
    season = "W" if month in [12, 1, 2] else "Sp" if month in [3, 4, 5] else "Su" if month in [6, 7, 8] else "Au"

    # Lấy dữ liệu lý tưởng từ Excel
    ideal_temp = crop_data[f"T2M_AVG-{season}"]
    ideal_humidity = crop_data[f"QV2M-{season}"]
    ideal_precipitation = crop_data[f"PRECTOTCORR-{season}"]
    ideal_soil_moisture = crop_data["GWETTOP"]
    ideal_pressure = crop_data["PS"]
    ideal_ph = crop_data["Ph"]

    # Tính toán khuyến nghị
    temperature_diff = actual_data["temperature"] - ideal_temp
    humidity_diff = actual_data["humidity_air"] - ideal_humidity
    pressure_diff = actual_data["pressure"] - ideal_pressure
    soil_moisture_diff = actual_data["soil_moisture"] - ideal_soil_moisture

    recommendations = {
        "temperature": f"Điều chỉnh nhiệt độ {'giảm' if temperature_diff > 0 else 'tăng'} {abs(temperature_diff):.2f}°C.",
        "humidity_air": f"Điều chỉnh độ ẩm không khí {'giảm' if humidity_diff > 0 else 'tăng'} {abs(humidity_diff):.2f}%.",
        "pressure": f"Điều chỉnh áp suất {'giảm' if pressure_diff > 0 else 'tăng'} {abs(pressure_diff):.2f} hPa.",
        "soil_moisture": f"Điều chỉnh độ ẩm đất {'giảm' if soil_moisture_diff > 0 else 'tăng'} {abs(soil_moisture_diff):.2f}%.",
        "irrigation": f"Tưới {ideal_precipitation * 10:.2f} m³ nước/ha/ngày.",
        "soil_pH": f"pH đất lý tưởng: {ideal_ph}.",
    }

    return recommendations

In [14]:
# Xử lý và lưu vào MongoDB
def process_and_store(crop, soil_type, actual_data):
    recommendations = get_recommendations(crop, soil_type, actual_data)
    if isinstance(recommendations, dict):
        # Lưu khuyến nghị vào MongoDB
        mongo_data = {
            "timestamp": actual_data["timestamp"],
            "crop": crop,
            "soil_type": soil_type,
            "actual_data": actual_data,
            "recommendations": recommendations,
        }
        collection.insert_one(mongo_data)
        print(f"Khuyến nghị đã lưu vào MongoDB: {mongo_data}")
    else:
        print(recommendations)

In [18]:
# Function to process and store recommendations
def process_and_store(crop, soil_type, actual_data):
    recommendations = get_recommendations(crop, soil_type, actual_data)
    if isinstance(recommendations, dict):
        # Lưu khuyến nghị vào MongoDB
        mongo_data = {
            "timestamp": actual_data["timestamp"],
            "crop": crop,
            "soil_type": soil_type,
            "actual_data": actual_data,
            "recommendations": recommendations,
        }
        collection.insert_one(mongo_data)
        print(f"Khuyến nghị đã lưu vào MongoDB: {mongo_data}")
    else:
        print(recommendations)

# Main Spark Streaming Process
def process_row(row):
    actual_data = {
        "temperature": row.temperature,
        "humidity_air": row.humidity_air,
        "pressure": row.pressure,
        "soil_moisture": row.soil_moisture,
        "timestamp": row.timestamp,
    }
    
    # Reinitialize MongoDB connection inside the function
    mongo_client = MongoClient(MONGO_URI)
    db = mongo_client[DB_NAME]
    collection = db[COLLECTION_NAME]
    
    process_and_store(crop, soil_type, actual_data)

# Example crop and soil type; Replace these with dynamic input if needed
crop = "Cây Teff"
soil_type = "Nâu"

query = decoded_df.writeStream \
    .foreach(process_row) \
    .start()

query.awaitTermination()

Traceback (most recent call last):
  File "c:\Users\aboyw\anaconda3\Lib\site-packages\pyspark\serializers.py", line 459, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\aboyw\anaconda3\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "c:\Users\aboyw\anaconda3\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
TypeError: cannot pickle '_thread.lock' object


PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.lock' object

In [16]:
def process_batch(df, batch_id):
    # Convert DataFrame from Spark to Pandas for easier processing
    pdf = df.toPandas()
    
    for index, row in pdf.iterrows():
        actual_data = {
            "temperature": row['temperature'],
            "humidity_air": row['humidity_air'],
            "pressure": row['pressure'],
            "soil_moisture": row['soil_moisture'],
            "timestamp": row['timestamp'],
        }
        
        # Reinitialize MongoDB connection inside the function
        mongo_client = MongoClient(MONGO_URI)
        db = mongo_client[DB_NAME]
        collection = db[COLLECTION_NAME]
        
        process_and_store(crop, soil_type, actual_data)

# Use foreachBatch instead of foreach
query = decoded_df.writeStream \
    .foreachBatch(process_batch) \
    .start()