# Устанавливаем библиотеки

In [None]:
pip --version


pip 24.1.2 from /usr/local/lib/python3.10/dist-packages/pip (python 3.10)


In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=5ad36f2c06001929cb50b51bd481d7bbf316fad31555a23f80d3094c7acff5bc
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [1]:
pip install faker

Collecting faker
  Downloading Faker-26.2.0-py3-none-any.whl.metadata (15 kB)
Downloading Faker-26.2.0-py3-none-any.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m15.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faker
Successfully installed faker-26.2.0


# Часть 1. Генерация информации

In [2]:
import csv
from faker import Faker
import random

fake = Faker()

num_records = 100000

http_methods = ['GET', 'POST', 'PUT', 'DELETE']
response_codes = [200, 301, 404, 500]

file_path = "/content/sample_data/web_server_logs.csv"

with open(file_path, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['ip', 'timestamp', 'method', 'url', 'response_code', 'response_size'])

    for _ in range(num_records):
        ip = fake.ipv4()
        timestamp = fake.date_time_this_year().isoformat()
        method = random.choice(http_methods)
        url = fake.uri_path()
        response_code = random.choice(response_codes)
        response_size = random.randint(100, 10000)

        writer.writerow([ip, timestamp, method, url, response_code, response_size])

print(f"Сгенерировано {num_records} записей и сохранено в {file_path}")

Сгенерировано 100000 записей и сохранено в /content/sample_data/web_server_logs.csv


# Часть 2. Анализ информации

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, mean, col, when, max, sum, year, count

# Создание SparkSession
spark = SparkSession.builder.appName("Final_task").getOrCreate()

# Чтение CSV-файла
df = spark.read.csv("/content/sample_data/web_server_logs.csv", header=True, inferSchema=True)

# Печать схемы DataFrame
df.printSchema()

# Показ первых 5 строк
df.show(5)

root
 |-- ip: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- method: string (nullable = true)
 |-- url: string (nullable = true)
 |-- response_code: integer (nullable = true)
 |-- response_size: integer (nullable = true)

+--------------+--------------------+------+-------------+-------------+-------------+
|            ip|           timestamp|method|          url|response_code|response_size|
+--------------+--------------------+------+-------------+-------------+-------------+
|79.166.211.234|2024-03-21 07:42:...|   PUT|       search|          200|          179|
|189.140.237.90|2024-04-02 13:08:...|DELETE|      tag/tag|          200|         7911|
|196.182.31.155|2024-06-06 00:03:...|DELETE|    main/main|          404|         1339|
|162.201.98.112|2024-03-10 02:37:...|   GET|main/app/main|          301|         5854|
|  39.92.44.196|2024-05-19 23:03:...|   PUT|   categories|          500|         3450|
+--------------+--------------------+------+-----------

#### 1. Сгруппируем данные по IP и посчитаем количество запросов для каждого IP, выведем 10 самых активных IP.

In [None]:
df.groupBy("ip").agg(count("ip").alias("request_count")).sort(count("ip").desc()).show(10)

+---------------+-------------+
|             ip|request_count|
+---------------+-------------+
|   77.36.178.19|            2|
|  65.233.223.21|            2|
| 222.18.125.243|            2|
| 66.129.249.206|            1|
|  155.99.228.20|            1|
| 163.74.167.232|            1|
| 31.220.240.211|            1|
| 220.218.35.110|            1|
|131.136.150.155|            1|
| 118.189.122.83|            1|
+---------------+-------------+
only showing top 10 rows



#### 2. Сгруппируем данные по HTTP-методу и посчитаем количество запросов для каждого метода.

In [None]:
df.groupBy("method").agg(count("method").alias("method_count")).show()

+------+------------+
|method|method_count|
+------+------------+
|  POST|       24947|
|DELETE|       25157|
|   PUT|       24930|
|   GET|       24966|
+------+------------+



#### 3. Профильтруем и посчитаем количество запросов с кодом ответа 404.

In [None]:
error_df = df.filter(df['response_code'] == 404)
error_df.agg(count("response_code").alias("error_count")).show()

+-----------+
|error_count|
+-----------+
|      25137|
+-----------+



#### 4. Сгруппируем данные по дате и просуммируем размер ответов, отсортируем по дате.

In [None]:
df = df.withColumn("timestamp", to_date(df["timestamp"], "yyyy-MM-dd"))
df.show(10)


+---------------+----------+------+--------------------+-------------+-------------+
|             ip| timestamp|method|                 url|response_code|response_size|
+---------------+----------+------+--------------------+-------------+-------------+
| 79.166.211.234|2024-03-21|   PUT|              search|          200|          179|
| 189.140.237.90|2024-04-02|DELETE|             tag/tag|          200|         7911|
| 196.182.31.155|2024-06-06|DELETE|           main/main|          404|         1339|
| 162.201.98.112|2024-03-10|   GET|       main/app/main|          301|         5854|
|   39.92.44.196|2024-05-19|   PUT|          categories|          500|         3450|
|  92.226.112.87|2024-06-16|   GET|       list/category|          301|         7700|
|  54.49.242.118|2024-07-29|   PUT|        tag/blog/app|          301|         2245|
| 84.255.125.235|2024-08-06|  POST|explore/main/wp-c...|          200|         2348|
|  60.237.127.63|2024-01-23|DELETE|           main/tags|         

In [None]:
df.groupBy("timestamp").agg(sum("response_size").alias("sum_response_size")).sort(df.timestamp.asc()).show()

+----------+-----------------+
| timestamp|sum_response_size|
+----------+-----------------+
|2024-01-01|          2275909|
|2024-01-02|          2107770|
|2024-01-03|          2602718|
|2024-01-04|          2325032|
|2024-01-05|          2445210|
|2024-01-06|          2282068|
|2024-01-07|          2086482|
|2024-01-08|          2347392|
|2024-01-09|          2346266|
|2024-01-10|          2221562|
|2024-01-11|          2312272|
|2024-01-12|          2304178|
|2024-01-13|          2219949|
|2024-01-14|          2347645|
|2024-01-15|          2247323|
|2024-01-16|          2316285|
|2024-01-17|          2357095|
|2024-01-18|          2397670|
|2024-01-19|          2180152|
|2024-01-20|          2461393|
+----------+-----------------+
only showing top 20 rows

