In [5]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [6]:
# 初始化Spark会话
spark = SparkSession.builder \
    .config('spark.ui.port', 4040) \
    .appName("LogAnalysis") \
    .getOrCreate()

In [10]:
http_path = "maccdc-2012/{00,01,02,03,04,05}/http.log.gz"
df_http = spark.read.json(http_path)
df_http = df_http.withColumn("ts", F.from_unixtime("ts").cast("timestamp"))

dns_path = "maccdc-2012/{00,01,02,03,04,05}/dns.log.gz"
df_dns = spark.read.json(dns_path)
df_dns = df_dns.withColumn("ts", F.from_unixtime("ts").cast("timestamp"))

df_http.createOrReplaceTempView("http_log")
df_dns.createOrReplaceTempView("dns_log")

                                                                                

In [11]:
task2_result = (
    df_http
    .filter((F.col("status_code") == 200) & (F.col("method") == "GET"))
    .groupBy("uri")
    .agg(F.count("*").alias("access_count"))
    .orderBy(F.desc("access_count"))
)
task2_result.show()



+--------------------+------------+
|                 uri|access_count|
+--------------------+------------+
|                   /|        9475|
|/admin/config.php...|         556|
|  /main.php?logout=1|         194|
|/top.php?stuff=15...|         191|
|            /top.php|         179|
|/main.php?stuff=1...|         172|
|  /get_latest_id.php|         159|
|/admin/config.php...|         138|
|    /cacti/index.php|         129|
|/en-US/api/messag...|         118|
|          /index.php|         105|
|/phpmyadmin/index...|          77|
|             /cacti/|          68|
|        /phpmyadmin/|          56|
|        /favicon.ico|          55|
|              /admin|          42|
|  /scripts/index.php|          40|
|             /icons/|          39|
|/en-US/api/search...|          39|
|  /cgi-bin/index.php|          37|
+--------------------+------------+
only showing top 20 rows



                                                                                

In [13]:
task3_result = (
    df_http.alias("h")
    .join(df_dns.alias("d"), "uid", "left")
    .filter(
        (F.col("h.status_code") == 200) & 
        (F.col("h.method") == "GET")
    )
    .groupBy("h.uri")
    .agg(
        F.count("h.uid").alias("http_access_count"),
        F.round(
            F.coalesce(
                F.sum(F.when(F.col("d.proto") == "tcp", 1).otherwise(0)) * 100.0 /
                F.nullif(F.count("d.uid"), F.lit(0)),  # 修正这里：将0改为F.lit(0)
                F.lit(0.0)
            ), 2
        ).alias("tcp_percentage")
    )
    .orderBy(F.desc("http_access_count"))
)
task3_result.show()



+--------------------+-----------------+--------------+
|                 uri|http_access_count|tcp_percentage|
+--------------------+-----------------+--------------+
|                   /|             9475|           0.0|
|/admin/config.php...|              556|           0.0|
|  /main.php?logout=1|              194|           0.0|
|/top.php?stuff=15...|              191|           0.0|
|            /top.php|              179|           0.0|
|/main.php?stuff=1...|              172|           0.0|
|  /get_latest_id.php|              159|           0.0|
|/admin/config.php...|              138|           0.0|
|    /cacti/index.php|              129|           0.0|
|/en-US/api/messag...|              118|           0.0|
|          /index.php|              105|           0.0|
|/phpmyadmin/index...|               77|           0.0|
|             /cacti/|               68|           0.0|
|        /phpmyadmin/|               56|           0.0|
|        /favicon.ico|               55|        

                                                                                