In [1]:
import os, shutil, glob, time
import pandas as pd
import numpy as np


os.environ["HADOOP_HOME"] = "D:/hadoop"
os.environ["PATH"] += os.pathsep + "D:/hadoop/bin"
os.makedirs("D:/hadoop/checkpoint", exist_ok=True)

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, from_unixtime
from pyspark.sql.types import StructType, IntegerType, DoubleType, TimestampType

from sqlalchemy import create_engine
import urllib

In [3]:
# C·∫•u h√¨nh th∆∞ m·ª•c
output_dir = "D:/sensor-data/output"
checkpoint_dir = "D:/hadoop/checkpoint"
backup_dir = "D:/sensor-data/backup"

# T·∫°o th∆∞ m·ª•c n·∫øu ch∆∞a c√≥
os.makedirs(output_dir, exist_ok=True)
os.makedirs(checkpoint_dir, exist_ok=True)
os.makedirs(backup_dir, exist_ok=True)


def merge_csv_files_clean_headers(input_dir, output_file):
    csv_files = glob.glob(os.path.join(input_dir, "part-*"))
    if not csv_files:
        print("‚ö†Ô∏è Kh√¥ng c√≥ file ƒë·ªÉ g·ªôp.")
        return None

    dfs = []
    for file in csv_files:
        try:
            df = pd.read_csv(file, header=None)
            df = df[df[0] != 'sensor_id']
            dfs.append(df)
        except Exception as e:
            print(f"‚ùå L·ªói ƒë·ªçc file {file}: {e}")

    if dfs:
        merged_df = pd.concat(dfs, ignore_index=True)
        merged_df.columns = ['sensor_id', 'temperature', 'humidity', 'timestamp']
        merged_df.to_csv(output_file, index=False)
        print(f"‚úÖ ƒê√£ g·ªôp v√† l√†m s·∫°ch th√†nh {output_file}")
        return True
    else:
        print("‚ö†Ô∏è Kh√¥ng c√≥ n·ªôi dung h·ª£p l·ªá ƒë·ªÉ g·ªôp.")
        return False


# Xo√° output v√† checkpoint c≈©
def cleanup_dirs():
    shutil.rmtree(output_dir, ignore_errors=True)
    shutil.rmtree(checkpoint_dir, ignore_errors=True)
    print("üßπ ƒê√£ xo√° output v√† checkpoint.")


In [4]:
import pandas as pd

def validate_data(file_path):
    try:
        df = pd.read_csv(file_path)

        # 1. Ki·ªÉm tra kh√¥ng c√≥ gi√° tr·ªã null
        if df['sensor_id'].isnull().any():
            print("‚ùå C√≥ gi√° tr·ªã thi·∫øu trong 'sensor_id'")
            return False
        if df['temperature'].isnull().any():
            print("‚ùå C√≥ gi√° tr·ªã thi·∫øu trong 'temperature'")
            return False
        if df['humidity'].isnull().any():
            print("‚ùå C√≥ gi√° tr·ªã thi·∫øu trong 'humidity'")
            return False
        if df['timestamp'].isnull().any():
            print("‚ùå C√≥ gi√° tr·ªã thi·∫øu trong 'timestamp'")
            return False

        # 2. Ki·ªÉm tra range nhi·ªát ƒë·ªô (-50¬∞C ƒë·∫øn 100¬∞C)
        if not df['temperature'].between(-50, 100).all():
            print("‚ùå Nhi·ªát ƒë·ªô ngo√†i kho·∫£ng h·ª£p l·ªá (-50 ƒë·∫øn 100¬∞C)")
            return False

        # 3. Ki·ªÉm tra ƒë·ªô ·∫©m (0% ƒë·∫øn 100%)
        if not df['humidity'].between(0, 100).all():
            print("‚ùå ƒê·ªô ·∫©m ngo√†i kho·∫£ng h·ª£p l·ªá (0 ƒë·∫øn 100%)")
            return False

        # 4. Ki·ªÉm tra ƒë·ªãnh d·∫°ng th·ªùi gian (yyyy-mm-dd HH:MM:SS)
        try:
            pd.to_datetime(df['timestamp'], format="%Y-%m-%d %H:%M:%S", errors='raise')
        except ValueError:
            print("‚ùå ƒê·ªãnh d·∫°ng 'timestamp' kh√¥ng h·ª£p l·ªá")
            return False

        print("‚úÖ T·∫•t c·∫£ ki·ªÉm tra d·ªØ li·ªáu ƒë·ªÅu ƒë·∫°t y√™u c·∫ßu.")
        return True

    except Exception as e:
        print(f"‚ùå L·ªói khi ƒë·ªçc ho·∫∑c ki·ªÉm tra d·ªØ li·ªáu: {e}")
        return False


In [5]:
# SparkSession
spark = SparkSession.builder \
    .appName("KafkaSensorConsumerWithValidation") \
    .master("local[*]") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2") \
    .config("spark.hadoop.home.dir", "D:/hadoop") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# Schema d·ªØ li·ªáu Kafka
schema = StructType() \
    .add('sensor_id', IntegerType()) \
    .add('temperature', DoubleType()) \
    .add('humidity', DoubleType()) \
    .add('timestamp', DoubleType())  # timestamp d·∫°ng Unix

# ƒê·ªçc d·ªØ li·ªáu t·ª´ Kafka
df_raw = spark.readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', 'localhost:9092') \
    .option('subscribe', 'sensor-data') \
    .option('startingOffsets', 'latest') \
    .load()

# Parse JSON t·ª´ Kafka v√† chuy·ªÉn timestamp sang d·∫°ng string
df_parsed = df_raw.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*") \
    .withColumn("timestamp", from_unixtime(col("timestamp")).cast("string"))

# Ghi d·ªØ li·ªáu ra CSV n·∫øu h·ª£p l·ªá
query = df_parsed.writeStream \
    .option("path", output_dir) \
    .option("checkpointLocation", checkpoint_dir) \
    .option("header", True) \
    .format("csv") \
    .start()

query.awaitTermination(120)  # 2 ph√∫t
query.stop()
print("üî¥ ƒê√£ d·ª´ng stream sau 2 ph√∫t.")

üî¥ ƒê√£ d·ª´ng stream sau 2 ph√∫t.


In [6]:
# ==== G·ªòP FILE + KI·ªÇM TRA D·ªÆ LI·ªÜU ====
timestamp_str = time.strftime("%Y%m%d_%H%M%S")
merged_file_path = f"{backup_dir}/merged_{timestamp_str}.csv"

# H√†m merge tr·∫£ v·ªÅ True n·∫øu g·ªôp th√†nh c√¥ng
success = merge_csv_files_clean_headers(output_dir, merged_file_path)

if success and os.path.exists(merged_file_path):
    is_clean = validate_data(merged_file_path)
    if is_clean:
        print("‚úÖ D·ªØ li·ªáu ƒë·∫°t y√™u c·∫ßu ch·∫•t l∆∞·ª£ng, c√≥ th·ªÉ ƒë∆∞a v√†o SQL/Azure.")
    else:
        print("‚ùå D·ªØ li·ªáu KH√îNG ƒë·∫°t ch·∫•t l∆∞·ª£ng. H·ªßy ƒë·∫©y v√†o h·ªá th·ªëng ch√≠nh.")
    cleanup_dirs()  # Xo√° output v√† checkpoint sau khi ki·ªÉm tra
else:
    print("‚ö†Ô∏è G·ªôp file th·∫•t b·∫°i ho·∫∑c kh√¥ng t·ªìn t·∫°i file ƒë·∫ßu ra. B·ªè qua ki·ªÉm tra.")


‚úÖ ƒê√£ g·ªôp v√† l√†m s·∫°ch th√†nh D:/sensor-data/backup/merged_20250616_152345.csv
‚úÖ T·∫•t c·∫£ ki·ªÉm tra d·ªØ li·ªáu ƒë·ªÅu ƒë·∫°t y√™u c·∫ßu.
‚úÖ D·ªØ li·ªáu ƒë·∫°t y√™u c·∫ßu ch·∫•t l∆∞·ª£ng, c√≥ th·ªÉ ƒë∆∞a v√†o SQL/Azure.
üßπ ƒê√£ xo√° output v√† checkpoint.


In [7]:
# File CSV sau khi g·ªôp
csv_path = merged_file_path

# ƒê·ªçc d·ªØ li·ªáu
df = pd.read_csv(csv_path)

# T√™n instance SQL Server trong m√°y b·∫°n (Express ho·∫∑c m·∫∑c ƒë·ªãnh)
server = "LAPTOP-CUA-QUAN\SQLSERVER1"  # ho·∫∑c ch·ªâ "localhost" n·∫øu d√πng m·∫∑c ƒë·ªãnh
database = "SensorData"

# T·∫°o connection string
connection_string = (
    f"mssql+pyodbc://{server}/{database}"
    "?driver=ODBC+Driver+17+for+SQL+Server"
    "&trusted_connection=yes"
)

# T·∫°o engine v√† ƒë·∫©y d·ªØ li·ªáu
engine = create_engine(connection_string)
df.to_sql('sensor_data', con=engine, if_exists='append', index=False)

print("‚úÖ D·ªØ li·ªáu ƒë√£ ƒë∆∞·ª£c ƒë·∫©y v√†o SQL Server th√†nh c√¥ng.")


‚úÖ D·ªØ li·ªáu ƒë√£ ƒë∆∞·ª£c ƒë·∫©y v√†o SQL Server th√†nh c√¥ng.


In [16]:
# Sau khi validate d·ªØ li·ªáu
if success and os.path.exists(merged_file_path):
    is_clean = validate_data(merged_file_path)
    
    run_time = pd.Timestamp.now()
    record_count = len(df)
    job_name = "KafkaSensorETL"

    if is_clean:
        
        # Ghi log th√†nh c√¥ng
        log_df = pd.DataFrame({
            'start_time': [run_time],
            'end_time': [run_time],
            'job_name': [job_name],
            'status': ['Success'],
            'records_processed': [record_count],
            'error_message': ['Data validation passed']
        })
        log_df.to_sql('ETL_Log', con=engine, if_exists='append', index=False)
        print("üìù ƒê√£ ghi log ETL th√†nh c√¥ng v√†o ETL_Log.")

    else:

        # Ghi log th·∫•t b·∫°i v·ªõi th√¥ng b√°o l·ªói
        log_df = pd.DataFrame({
            'start_time': [run_time],
            'end_time': [run_time],
            'job_name': [job_name],
            'status': ['Failed'],
            'records_processed': [record_count],
            'error_message': ['Data validation failed']
        })
        log_df.to_sql('ETL_Log', con=engine, if_exists='append', index=False)
        print("üìù ƒê√£ ghi log ETL th·∫•t b·∫°i v√†o ETL_Log.")

else:
    print("‚ö†Ô∏è G·ªôp file th·∫•t b·∫°i ho·∫∑c kh√¥ng t·ªìn t·∫°i file ƒë·∫ßu ra. B·ªè qua ki·ªÉm tra.")


‚úÖ T·∫•t c·∫£ ki·ªÉm tra d·ªØ li·ªáu ƒë·ªÅu ƒë·∫°t y√™u c·∫ßu.
üìù ƒê√£ ghi log ETL th√†nh c√¥ng v√†o ETL_Log.
