In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum as spark_sum, to_date, desc

# Создание SparkSession
spark = SparkSession.builder.appName("WebLogsAnalysis").getOrCreate()

# Загрузка данных
logs_df = spark.read.csv("web_server_logs.csv", header=True, inferSchema=True)

# 1. Группировка по IP и подсчёт количества запросов для каждого IP
ip_request_counts = logs_df.groupBy("ip").agg(count("*").alias("request_count"))
top_10_ips = ip_request_counts.orderBy(desc("request_count")).limit(10)
print("Топ-10 самых активных IP:")
top_10_ips.show()

# 2. Группировка по HTTP-методу и подсчёт количества запросов для каждого метода
method_request_counts = logs_df.groupBy("method").agg(count("*").alias("request_count"))
print("\nКоличество запросов по HTTP-методам:")
method_request_counts.show()

# 3. Фильтрация и подсчёт количества запросов с кодом ответа 404
filtered_404 = logs_df.filter(col("response_code") == 404)
count_404 = filtered_404.count()
print(f"\nКоличество запросов с кодом ответа 404: {count_404}")

# 4. Группировка по дате и суммирование размера ответов, сортировка по дате
logs_df = logs_df.withColumn("date", to_date(col("timestamp")))
daily_response_size = logs_df.groupBy("date").agg(spark_sum("response_size").alias("total_response_size"))
daily_response_size = daily_response_size.orderBy("date")
print("\nСуммарный размер ответов по дням:")
daily_response_size.show()