<a href="https://colab.research.google.com/github/Yourius/stepik_de/blob/main/stepik_spark_de_20240726.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install faker

Collecting faker
  Downloading Faker-26.0.0-py3-none-any.whl.metadata (15 kB)
Downloading Faker-26.0.0-py3-none-any.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m15.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faker
Successfully installed faker-26.0.0


In [26]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=48baa21b40307f4a9a7c5e117f036d43f95061418cf7f4f5e3f8868c82863d11
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [31]:
import csv
from faker import Faker
import random
import pandas as pd

from pyspark.sql import SparkSession

from pyspark.sql.functions import col
from pyspark.sql.functions import to_date

In [3]:
fake = Faker()

num_records = 100000

http_methods = ['GET', 'POST', 'PUT', 'DELETE']
response_codes = [200, 301, 404, 500]

file_path = "web_server_logs.csv"

with open(file_path, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['ip', 'timestamp', 'method', 'url', 'response_code', 'response_size'])

    for _ in range(num_records):
        ip = fake.ipv4()
        timestamp = fake.date_time_this_year().isoformat()
        method = random.choice(http_methods)
        url = fake.uri_path()
        response_code = random.choice(response_codes)
        response_size = random.randint(100, 10000)

        writer.writerow([ip, timestamp, method, url, response_code, response_size])

print(f"Сгенерировано {num_records} записей и сохранено в {file_path}")

Сгенерировано 100000 записей и сохранено в web_server_logs.csv


In [41]:
# Создание SparkSession
spark = SparkSession.builder.appName("Read CSV Example").getOrCreate()

# Чтение CSV-файла
df = spark.read.csv("/content/web_server_logs.csv", header=True, inferSchema=True)

# Печать схемы DataFrame
df.printSchema()

# Показ первых 5 строк
df.show(5)

root
 |-- ip: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- method: string (nullable = true)
 |-- url: string (nullable = true)
 |-- response_code: integer (nullable = true)
 |-- response_size: integer (nullable = true)

+--------------+--------------------+------+--------------------+-------------+-------------+
|            ip|           timestamp|method|                 url|response_code|response_size|
+--------------+--------------------+------+--------------------+-------------+-------------+
|21.177.134.240|2024-04-24 15:48:...|   PUT|wp-content/tags/e...|          301|         7875|
|177.250.27.121|2024-02-08 08:17:...|  POST|              search|          200|         1848|
|   100.21.0.41|2024-04-09 11:32:...|  POST|                tags|          404|         2748|
|   90.148.4.24|2024-05-14 16:01:...|  POST|            category|          200|          944|
|182.185.42.162|2024-01-15 13:17:...|DELETE|category/explore/...|          404|         9351|

In [43]:
print('TOP active IP addresses:')
(df
 .groupBy('ip')
 .agg({'response_code':'count'})
 .withColumnRenamed("count(response_code)", "request_count")
 .orderBy(col("request_count").desc())
 .show(10)
)

print('\nRequest count by HTTP method:')
(df
 .groupBy('method')
 .agg({'response_code':'count'})
 .withColumnRenamed("count(response_code)", "method_count")
 .orderBy(col("method_count").desc())
 .show()
)

num_resp_code_404=(df
 .filter(df['response_code'] == '404')
 .groupBy('response_code')
 .agg({'response_size':'count'})
#  .withColumnRenamed("count(response_code)", "method_count")
#  .orderBy(col("method_count").desc())
#  .show()
)
print(f'\nNumber of 404 response codes - {num_resp_code_404.collect()[0][1]}')

print('\nTotal responze size by day:')
(df
 .withColumn("day", to_date("timestamp"))
 .groupBy('day')
 .agg({'response_size':'sum'})
 .withColumnRenamed("sum(response_size)", "total_response_size")
 .orderBy(col("day"))
 .show(10)
)

spark.stop()

TOP active IP addresses:
+--------------+-------------+
|            ip|request_count|
+--------------+-------------+
| 69.230.227.43|            2|
|   100.0.7.150|            2|
|118.61.163.113|            1|
| 194.186.25.86|            1|
|    59.87.9.62|            1|
| 93.247.42.225|            1|
|  84.92.236.70|            1|
|  44.226.241.4|            1|
|160.235.18.142|            1|
|207.232.59.243|            1|
+--------------+-------------+
only showing top 10 rows


Request count by HTTP method:
+------+------------+
|method|method_count|
+------+------------+
|   PUT|       25254|
|  POST|       25034|
|DELETE|       24904|
|   GET|       24808|
+------+------------+


Number of 404 response codes - 24954

Total responze size by day:
+----------+-------------------+
|       day|total_response_size|
+----------+-------------------+
|2024-01-01|            2498757|
|2024-01-02|            2565887|
|2024-01-03|            2620582|
|2024-01-04|            2580788|
|2024-01-