In [1]:
import os
os.environ["SPARK_HOME"] = "/home/hau/Downloads/spark-3.5.3-bin-hadoop3"
os.environ["PYTHONPATH"] = os.environ.get("PYTHONPATH", "") + ":/home/hau/Downloads/spark-3.5.3-bin-hadoop3/python"
os.environ["PYSPARK_PYTHON"] = "/home/hau/anaconda3/envs/myenv/bin/python3.9"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/home/hau/anaconda3/envs/myenv/bin/python3.9"
# Thêm gói Kafka vào biến môi trường
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3 pyspark-shell"
)

In [3]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col, to_json
from elasticsearch import Elasticsearch
import os
from pyspark.sql.functions import from_unixtime, to_timestamp, lit


In [4]:
import os
import pandas as pd
from functools import reduce

directory = '/home/hau/train/datasets/KETI'
dataframes = {}
dataframes_room = {}

# Thứ tự cột thực tế khi đọc file (cần thay đổi nếu dữ liệu thực tế khác)
columns = ['co2', 'humidity', 'light', 'pir', 'temperature']

def create_separate_dataframes() -> dict:
    """
    Creates a dictionary that includes room numbers as keys and dataframes per room as values.
    """
    for filename in os.listdir(directory):
        new_directory = os.path.join(directory, filename)

        # Bỏ qua nếu không phải là thư mục hoặc là thư mục ẩn
        if not os.path.isdir(new_directory) or filename.startswith('.'):
            continue

        count = 0
        for new_file in sorted(os.listdir(new_directory)):
            f = os.path.join(new_directory, new_file)

            # Bỏ qua các file ẩn
            if new_file.startswith('.'):
                continue

            # Tạo key cho dataframe, ví dụ: '656_pir'
            sensor_type = columns[count]  # Lấy tên cột tương ứng (pir, humidity, co2, ...)
            my_path = f"{filename}_{sensor_type}"
            dataframes[my_path] = pd.read_csv(f, names=['ts_min_bignt', sensor_type])
            count += 1

        # Tạo dataframe tổng hợp cho từng phòng bằng cách merge các dataframe lại
        try:
            dataframes_room[filename] = reduce(
                lambda left, right: pd.merge(left, right, on='ts_min_bignt', how='inner'),
                [dataframes[f"{filename}_{col}"] for col in columns]
            )
            dataframes_room[filename]['room'] = filename  # Thêm cột 'room' để lưu số phòng
        except KeyError as e:
            print(f"KeyError: {e} - Có thể thiếu một file dữ liệu trong phòng {filename}")

    return dataframes_room

def create_main_dataframe(separate_dataframes: dict) -> pd.DataFrame:
    """
    Concatenates all per-room dataframes vertically to create a final dataframe.
    """
    dataframes_to_concat = list(separate_dataframes.values())

    # Nối tất cả các dataframe
    df = pd.concat(dataframes_to_concat, ignore_index=True)
    df = df.sort_values('ts_min_bignt')  # Sắp xếp theo timestamp

    df.dropna(inplace=True)  # Xóa các dòng có giá trị NaN
    df["event_ts_min"] = pd.to_datetime(df["ts_min_bignt"], unit='s')  # Chuyển đổi timestamp thành datetime
    return df

def write_main_dataframe(df: pd.DataFrame):
    """
    Writes the final dataframe to the local CSV file.
    """
    output_path = '/home/hau/train/data-generator/input/sensors.csv'
    df.to_csv(output_path, index=False)
    print(f"DataFrame đã được ghi vào: {output_path}")

# Thực thi các hàm
separate_dataframes = create_separate_dataframes()
if separate_dataframes:
    main_df = create_main_dataframe(separate_dataframes)
    write_main_dataframe(main_df)
else:
    print("Không có dữ liệu để xử lý.")

DataFrame đã được ghi vào: /home/hau/train/data-generator/input/sensors.csv


In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType
from pyspark.sql.functions import from_unixtime

# Khởi tạo Spark session
spark = SparkSession.builder \
    .appName("KETI Data") \
    .config("spark.jars", "/home/hau/Downloads/tlcn/Real-time-IoT-Sensor-Data-Dashboard-main/elasticsearch-spark-30_2.12-8.0.0-beta1.jar") \
    .config("spark.jars.packages",
            "org.apache.kafka:kafka-clients:3.5.3,"
            "commons-httpclient:commons-httpclient:3.1") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# Định nghĩa schema
schema = StructType([
    StructField("ts_min_bignt", StringType(), True),
    StructField("co2", FloatType(), True),
    StructField("humidity", FloatType(), True),
    StructField("light", FloatType(), True),
    StructField("pir", FloatType(), True),
    StructField("temperature", FloatType(), True),
    StructField("room", StringType(), True)
])

# Đường dẫn tới tệp sensors.csv
file_path = "/home/hau/train/data-generator/input/sensors.csv"

# Đọc dữ liệu từ tệp sensors.csv
final_df = spark.read.format("csv") \
    .option("header", True) \
    .schema(schema) \
    .load(f"file://{file_path}")

# Bổ sung cột event_ts_min bằng cách chuyển đổi ts_min_bignt thành kiểu Timestamp
final_df = final_df.withColumn("event_ts_min", from_unixtime(final_df["ts_min_bignt"]).cast(TimestampType()))

# Hiển thị dữ liệu
final_df.show(truncate=False)

24/12/07 12:47:33 WARN Utils: Your hostname, hau resolves to a loopback address: 127.0.1.1; using 192.168.81.128 instead (on interface ens33)
24/12/07 12:47:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/hau/Downloads/spark-3.5.3-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/hau/.ivy2/cache
The jars for the packages stored in: /home/hau/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6ad2aae9-5dd6-4e7f-a2ea-30e8a5b28f0e;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.3 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.3 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.5 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 432ms :: artifacts dl 22ms
	:: modul

+------------+-----+--------+------+----+-----------+----+-------------------+
|ts_min_bignt|co2  |humidity|light |pir |temperature|room|event_ts_min       |
+------------+-----+--------+------+----+-----------+----+-------------------+
|1377299093  |387.0|52.75   |252.0 |0.0 |22.62      |511 |2013-08-24 06:04:53|
|1377299097  |465.0|52.4    |165.0 |0.0 |22.8       |644 |2013-08-24 06:04:57|
|1377299097  |175.0|50.32   |191.0 |0.0 |23.32      |648 |2013-08-24 06:04:57|
|1377299097  |579.0|49.9    |176.0 |30.0|24.37      |656A|2013-08-24 06:04:57|
|1377299101  |434.0|49.94   |11.0  |29.0|24.08      |564 |2013-08-24 06:05:01|
|1377299101  |423.0|53.51   |3.0   |0.0 |23.11      |558 |2013-08-24 06:05:01|
|1377299105  |347.0|50.03   |3.0   |0.0 |23.64      |664 |2013-08-24 06:05:05|
|1377299105  |437.0|49.39   |148.0 |0.0 |24.14      |666 |2013-08-24 06:05:05|
|1377299105  |538.0|46.49   |102.0 |30.0|25.26      |656B|2013-08-24 06:05:05|
|1377299105  |421.0|49.06   |1977.0|0.0 |24.81      

In [7]:
final_df.printSchema()

root
 |-- ts_min_bignt: string (nullable = true)
 |-- co2: float (nullable = true)
 |-- humidity: float (nullable = true)
 |-- light: float (nullable = true)
 |-- pir: float (nullable = true)
 |-- temperature: float (nullable = true)
 |-- room: string (nullable = true)
 |-- event_ts_min: timestamp (nullable = true)



In [8]:
import subprocess
command = [
    "/home/hau/Downloads/kafka/bin/kafka-topics.sh",
    "--bootstrap-server", "localhost:9092",
    "--create",
    "--topic", "dataframe-to-kafka",
    "--partitions", "5",
    "--replication-factor", "1"
]

try:
    result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
    print("Output:", result.stdout)
except subprocess.CalledProcessError as e:
    print("Error:", e.stderr)

Error: [2024-12-07 12:47:43,843] ERROR org.apache.kafka.common.errors.TopicExistsException: Topic 'dataframe-to-kafka' already exists.
 (org.apache.kafka.tools.TopicCommand)



In [9]:
from pyspark.sql.functions import to_json, struct

# Định nghĩa Kafka bootstrap servers và topic
kafka_bootstrap_servers = "localhost:9092"  # Thay bằng địa chỉ Kafka của bạn
kafka_topic = "dataframe-to-kafka"  # Thay bằng topic Kafka mà bạn muốn gửi dữ liệu

# Chuyển đổi DataFrame thành định dạng JSON
final_df_with_json = final_df.withColumn("value", to_json(struct([col(c) for c in final_df.columns])))

# Gửi dữ liệu đến Kafka
final_df_with_json.select("value") \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("topic", kafka_topic) \
    .save()

print("Data sent to Kafka successfully!")


24/12/07 12:47:44 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 8, schema size: 7
CSV file: file:///home/hau/train/data-generator/input/sensors.csv
[Stage 1:>                                                          (0 + 2) / 2]

Data sent to Kafka successfully!


                                                                                

In [10]:
es = Elasticsearch(
    hosts=["http://localhost:9200"],
)

In [11]:
es.indices.delete(index='office-index', ignore=[400, 404])

  es.indices.delete(index='office-index', ignore=[400, 404])


ObjectApiResponse({'acknowledged': True})

In [12]:
office_index = {
    "settings": {
        "index": {
            "analysis": {
                "analyzer": {
                    "custom_analyzer":
                        {
                            "type": "custom",
                            "tokenizer": "standard",
                            "filter": [
                                "lowercase", "custom_edge_ngram", "asciifolding"
                            ]
                        }
                },
                "filter": {
                    "custom_edge_ngram": {
                        "type": "edge_ngram",
                        "min_gram": 2,
                        "max_gram": 10
                    }
                }
            }
        }
    },
    "mappings": {
        "properties": {
            "event_ts_min": {"type": "date",
            "format": "yyyy-MM-d hh:mm:ss||yyyy-MM-dd hh:mm:ss||yyyy-MM-dd HH:mm:ss||yyyy-MM-d HH:mm:ss",
            "ignore_malformed": "true"
      },
            "co2": {"type": "float"},
            "humidity": {"type": "float"},
            "light": {"type": "float"},
            "temperature": {"type": "float"},
            "room": {"type": "keyword"},
            "pir": {"type": "float"},
            "if_movement": {"type": "keyword"}

        }
    }
}

In [13]:
es.indices.create(index="office-index", body=office_index)

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'office-index'})

In [None]:
# Khởi tạo SparkSession
spark = SparkSession.builder \
    .appName("KafkaStreamingToElasticsearch") \
    .getOrCreate()

# Định nghĩa schema cho dữ liệu JSON
schema = StructType([
    StructField("ts_min_bignt", StringType(), True),
    StructField("co2", FloatType(), True),
    StructField("humidity", FloatType(), True),
    StructField("light", FloatType(), True),
    StructField("pir", FloatType(), True),
    StructField("temperature", FloatType(), True),
    StructField("room", StringType(), True)
])

# Đọc dữ liệu từ Kafka
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "dataframe-to-kafka") \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", "earliest") \
    .load()

# Chuyển đổi giá trị từ Kafka (dạng byte) thành chuỗi JSON
df_parsed = df.selectExpr("CAST(value AS STRING) as json_value") \
    .withColumn("data", F.from_json(F.col("json_value"), schema)) \
    .select("data.*")

# Thêm cột event_ts_min (chuyển đổi từ ts_min_bignt sang timestamp)
df_parsed = df_parsed.withColumn(
    "event_ts_min", 
    F.from_unixtime(F.col("ts_min_bignt").cast("long"), "yyyy-MM-dd HH:mm:ss")
)

# Áp dụng SELECT với logic CASE
df_final = df_parsed.selectExpr(
    "event_ts_min",
    "co2",
    "humidity",
    "light",
    "temperature",
    "room",
    "pir",
    "CASE WHEN pir > 0 THEN 'movement' ELSE 'no_movement' END as if_movement"
)

# Hàm để ghi dữ liệu vào Elasticsearch
def write_to_elasticsearch(batch_df, batch_id):
    # Debug: In dữ liệu của batch
    batch_df.show()

    # Ghi vào Elasticsearch
    batch_df.write \
        .format("org.elasticsearch.spark.sql") \
        .option("es.resource", "office-index") \
        .option("es.nodes", "localhost") \
        .option("es.port", "9200") \
        .mode("append") \
        .save()

# Cấu hình Streaming Query và trigger mỗi 3 giây
query = df_final.writeStream \
    .trigger(processingTime="3 seconds") \
    .outputMode("append") \
    .foreachBatch(write_to_elasticsearch) \
    .option("checkpointLocation", "/home/hau/Downloads/tlcn/Real-time-IoT-Sensor-Data-Dashboard-main/checkpoint/dir") \
    .start()

query.awaitTermination()

24/12/07 12:47:49 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/12/07 12:47:49 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/12/07 12:47:50 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


+-------------------+-----+--------+-----+-----------+----+----+-----------+
|       event_ts_min|  co2|humidity|light|temperature|room| pir|if_movement|
+-------------------+-----+--------+-----+-----------+----+----+-----------+
|2013-08-28 14:41:48|473.0|   58.67|  5.0|      22.78| 446| 0.0|no_movement|
|2013-08-28 14:41:58|480.0|   58.67|  3.0|      22.78| 446| 0.0|no_movement|
|2013-08-28 14:42:08|467.0|   58.67|  3.0|      22.77| 446| 0.0|no_movement|
|2013-08-24 06:17:20|459.0|   48.16|  4.0|      24.53| 726| 0.0|no_movement|
|2013-08-28 14:42:18|464.0|   58.67|  4.0|      22.78| 446| 0.0|no_movement|
|2013-08-28 14:42:28|464.0|   58.67|  5.0|      22.78| 446| 0.0|no_movement|
|2013-08-24 06:17:22|543.0|   50.45|  6.0|      23.23| 748| 0.0|no_movement|
|2013-08-28 14:42:38|472.0|   58.67|  3.0|      22.79| 446| 0.0|no_movement|
|2013-08-24 06:17:23|677.0|   52.97|144.0|      23.47| 419|30.0|   movement|
|2013-08-28 14:42:48|472.0|   58.67|  3.0|      22.78| 446| 0.0|no_movement|

24/12/07 12:48:00 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 3000 milliseconds, but spent 10440 milliseconds
