In [25]:
## 1. Инициализация
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import col, count, desc, sum, to_date
from pyspark.sql.types import StructType, StructField, StringType, IntegerType


In [35]:
# Создаем SparkSession
#spark = SparkSession.builder.appName("WebServerLogAnalysis").getOrCreate()
conf = SparkConf().setAppName("WebServerLogAnalysis").setMaster("local[*]")
spark = SparkSession.builder.config(conf=conf).getOrCreate()


25/05/16 22:25:40 WARN Utils: Your hostname, oleg-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/05/16 22:25:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/16 22:25:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [36]:
## 2. Загрузка данных
schema = StructType([
    StructField("ip", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("method", StringType(), True),
    StructField("url", StringType(), True),
    StructField("response_code", IntegerType(), True),
    StructField("response_size", IntegerType(), True)
    ])

In [37]:
# Путь к CSV-файлу
file_path = "web_server_logs.csv"

In [38]:
# Загружаем данные из CSV
df = spark.read.csv(file_path, header=True, schema=schema)

In [39]:
df.show()
df.printSchema()

                                                                                

+---------------+--------------------+------+--------------------+-------------+-------------+
|             ip|           timestamp|method|                 url|response_code|response_size|
+---------------+--------------------+------+--------------------+-------------+-------------+
|    62.133.45.2|2025-04-03T15:23:...|   PUT|                main|          500|         2154|
|173.214.210.237|2025-03-10T03:06:...|DELETE|           tags/tags|          500|          492|
| 113.51.202.172|2025-02-21T14:35:...|   PUT| categories/tag/blog|          301|         5748|
|   54.64.188.71|2025-02-23T20:54:...|   PUT|     main/categories|          500|         6459|
| 41.119.224.255|2025-01-21T17:14:...|   PUT|                tags|          301|         7371|
|  76.254.10.215|2025-01-18T04:50:...|   GET|                 app|          404|         1959|
|169.107.219.131|2025-02-18T22:34:...|  POST|category/categori...|          200|         4121|
|117.172.129.199|2025-02-20T07:37:...|  POST|     

In [40]:
# Кэшируем DataFrame, так как он будет использоваться несколько раз
df.cache()

DataFrame[ip: string, timestamp: string, method: string, url: string, response_code: int, response_size: int]

In [41]:
# Показываем пример данных для проверки загрузки
print("Пример загруженных данных:")
df.show(5, truncate=False)

Пример загруженных данных:


[Stage 1:>                                                          (0 + 2) / 2]

+---------------+--------------------------+------+-------------------+-------------+-------------+
|ip             |timestamp                 |method|url                |response_code|response_size|
+---------------+--------------------------+------+-------------------+-------------+-------------+
|62.133.45.2    |2025-04-03T15:23:46.899331|PUT   |main               |500          |2154         |
|173.214.210.237|2025-03-10T03:06:57.220068|DELETE|tags/tags          |500          |492          |
|113.51.202.172 |2025-02-21T14:35:11.105501|PUT   |categories/tag/blog|301          |5748         |
|54.64.188.71   |2025-02-23T20:54:39.036629|PUT   |main/categories    |500          |6459         |
|41.119.224.255 |2025-01-21T17:14:47.670850|PUT   |tags               |301          |7371         |
+---------------+--------------------------+------+-------------------+-------------+-------------+
only showing top 5 rows



                                                                                

In [42]:
# 3. Задачи анализа

In [43]:
# Задача 1: Топ-10 активных IP-адресов
#Группируем данные по IP и подсчитываем количество запросов для каждого IP. Отображаем 10 наиболее активных IP.

In [45]:
print("Топ-10 активных IP-адресов:")
top_ips = df.groupBy("ip").agg(count("*").alias("request_count")).orderBy(desc("request_count")).limit(10)
top_ips.show(truncate=False)

Топ-10 активных IP-адресов:


[Stage 3:>                                                          (0 + 2) / 2]

+---------------+-------------+
|ip             |request_count|
+---------------+-------------+
|144.233.244.70 |2            |
|167.97.66.152  |1            |
|205.35.92.97   |1            |
|211.104.48.9   |1            |
|37.163.161.154 |1            |
|182.92.119.187 |1            |
|195.225.213.82 |1            |
|136.70.10.229  |1            |
|217.215.88.189 |1            |
|198.178.100.135|1            |
+---------------+-------------+



                                                                                

In [None]:
# Задача 2: Количество запросов по HTTP-методам
# Группируем данные по HTTP-методу и подсчитываем количество запросов для каждого метода.

In [46]:
print("Количество запросов по HTTP-методам:")

method_counts = df.groupBy("method").agg(count("*").alias("method_count")).orderBy(desc("method_count"))
# Отсортировано по количеству для типичного анализа\n",

method_counts.show(truncate=False)


Количество запросов по HTTP-методам:
+------+------------+
|method|method_count|
+------+------------+
|DELETE|25292       |
|POST  |25102       |
|PUT   |24805       |
|GET   |24801       |
+------+------------+



In [47]:
# Задача 3: Количество ответов с кодом 404
# Фильтруем и подсчитываем количество запросов с кодом ответа 404.

In [48]:
count_404 = df.filter(col("response_code") == 404).count()
print(f"\nКоличество ответов с кодом 404: {count_404}")


Количество ответов с кодом 404: 25069


In [49]:
# Задача 4: Общий размер ответов по дням
#Группируем данные по дате и суммируем размеры ответов для каждого дня. Сортируем по дате.

In [51]:
print("\nОбщий размер ответов по дням:")

# Преобразуем строку временной метки в дату.
# Временная метка от Faker имеет формат ISO, например, 'YYYY-MM-DDTHH:MM:SS' или 'YYYY-MM-DDTHH:MM:SS.ffffff'
# to_date() корректно извлекает часть с датой.
df_with_date = df.withColumn("date", to_date(col("timestamp")))
daily_response_size = df_with_date.groupBy("date").agg(sum("response_size").alias("total_response_size")).orderBy("date")

# Показываем результаты. show(truncate=False) по умолчанию отобразит до 20 строк.
# На изображении-примере было 17 строк; это совпадет, если уникальных дат <=20
# или если первые 17 показанных дат совпадают с примером.
daily_response_size.show(20, truncate=False)


Общий размер ответов по дням:
+----------+-------------------+
|date      |total_response_size|
+----------+-------------------+
|2025-01-01|3606675            |
|2025-01-02|3597901            |
|2025-01-03|3651909            |
|2025-01-04|3863015            |
|2025-01-05|3528132            |
|2025-01-06|4041495            |
|2025-01-07|3845266            |
|2025-01-08|3776612            |
|2025-01-09|3495800            |
|2025-01-10|3674878            |
|2025-01-11|3829383            |
|2025-01-12|3573539            |
|2025-01-13|3763178            |
|2025-01-14|3959373            |
|2025-01-15|3648677            |
|2025-01-16|3728369            |
|2025-01-17|3421429            |
|2025-01-18|3700340            |
|2025-01-19|3844888            |
|2025-01-20|3751651            |
+----------+-------------------+
only showing top 20 rows



In [52]:
# 4. Остановка SparkSession
#Останавливаем SparkSession для освобождения ресурсов

In [54]:
spark.stop()