In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType
from pyspark.sql import functions as F

# Khởi tạo Spark session
spark = SparkSession.builder.appName('Laliga').getOrCreate()

# Định nghĩa schema cho tệp CSV
schema = StructType([
    StructField("FTR", StringType(), True),
    StructField("AwayTeam", StringType(), True),
    StructField("AST", IntegerType(), True),  # Away Team Shots on Target
    StructField("Date", DateType(), True),  # Away Team Shots on Target
    
])

# Đọc dữ liệu từ thư mục (streaming)
df_stream = spark.readStream.format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load("DataStream_cleaned/")  # Đảm bảo đây là thư mục chứa tệp


# Lọc các trận đấu mà đội khách thắng (FTR = 'A')
away_wins_df = df_stream.filter(df_stream["FTR"] == "A")

# Nhóm theo đội khách (AwayTeam) và tính tổng số trận thắng và tổng số cú sút trúng đích (AST)
result_df = away_wins_df.groupBy("AwayTeam").agg(
    F.count("FTR").alias("away_wins_count"),  # Số trận thắng trên sân khách
    F.sum("AST").alias("total_shots_on_target")  # Tổng số cú sút trúng đích
)

# Sắp xếp theo số trận thắng giảm dần và lấy 3 đội có số trận thắng cao nhất
top_3_teams_df = result_df.orderBy(F.col("away_wins_count").desc()).limit(3)

# Xuất kết quả ra console
query = top_3_teams_df.writeStream.outputMode("complete").format("console").start()

# Đợi cho đến khi stream dừng
query.awaitTermination()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/21 23:16:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/21 23:16:27 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/04/21 23:16:29 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/20/3zj4bhnd5gl57qd2cr57vsk40000gn/T/temporary-3bb72665-89ca-46ca-a983-a1b741715f18. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/04/21 23:16:29 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                               

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+---------------+---------------------+
|AwayTeam|away_wins_count|total_shots_on_target|
+--------+---------------+---------------------+
+--------+---------------+---------------------+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------+---------------+---------------------+
|   AwayTeam|away_wins_count|total_shots_on_target|
+-----------+---------------+---------------------+
|   Sociedad|              5|                   26|
|Real Madrid|              5|                   24|
|    Sevilla|              4|                   12|
+-----------+---------------+---------------------+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+-----------+---------------+---------------------+
|   AwayTeam|away_wins_count|total_shots_on_target|
+-----------+---------------+---------------------+
|Real Madrid|             16|                   89|
|    Sevilla|             13|                   61|
|   Sociedad|             12|                   59|
+-----------+---------------+---------------------+

