# Установка необходимых пакетов

Установка Java Development Kit (JDK) версии 8 (OpenJDK).

Импорт модуля os.

Установка пременной окружения JAVA_HOME.

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Установка библиотеки PySpark, которая предоставляет Python API для работы с Apache Spark.

Импорт класса SparkSession из модуля pyspark.sql, который предоставляет средства для работы с данными в Spark SQL.

Создание экземпляра объекта SparkSession, который является точкой входа для выполнения операций Spark SQL. Метод builder используется для настройки настроек сессии, а метод getOrCreate получает или создает существующую сессию Spark, если она уже существует.


In [2]:
!pip install pyspark

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=04f4e2a04833ee9ae55d5281e507b88165c4de41046ebf26259fc218bdac985a
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


# Импорт log-файла и его анализ

In [3]:
df_log = spark.read.text("/content/drive/MyDrive/Проект 3.log")

In [4]:
df_log.printSchema()

root
 |-- value: string (nullable = true)



In [5]:
df_log.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                                                                                                                                                                                            

In [6]:
column_names = df_log.columns
print(column_names)

['value']


Мы видим, что структурно log-файл представляет собой один столбец ('value') и множество строк. В каждой строке содержится информация о HTTP-запросе, включая IP-адрес, дату и время, URL, статус ответа и другие параметры.

Для того, чтобы впоследствии объединить этот файл с содержимым csv-файла табличного типа, необходимо преобразовать и упорядочить данные.

In [7]:
from pyspark.sql.functions import split, substring, count, sum, col, round

Разделим значения строк на несколько столбцов:

"IP" - столбец с IP-адресом;

"Timestamp" - столбец с временной меткой запроса;

"Request" - столбец с HTTP-запросом;

"Status"- столбец с кодом состояния HTTP-ответа;

"Size" - столбец с размером ответа;

"Referrer" - столбец с источником перехода;

"User-Agent" - столбец с информацией о браузере и операционной системе.

In [8]:
df_log = df_log.withColumn("IP", split(df_log["value"], " - - \[")[0])  # столбец с IP-адресом
df_log = df_log.withColumn("Timestamp", split(df_log["value"], "\[")[1].substr(1, 20))  # столбец с временной меткой запроса
df_log = df_log.withColumn("Request", split(df_log["value"], "\"")[1])  # столбец с HTTP-запросом
df_log = df_log.withColumn("Status", split(split(df_log["value"], "\"")[2], " ")[1])  # столбец с кодом состояния HTTP-ответа
df_log = df_log.withColumn("Size", split(split(df_log["value"], "\"")[2], " ")[2])  # столбец с размером ответа
df_log = df_log.withColumn("Referrer", split(df_log["value"], "\"")[5])  # столбец с источником перехода
df_log = df_log.withColumn("User-Agent", split(df_log["value"], "\"")[7])  # столбец с информацией о браузере и операционной системе

In [9]:
columns_to_select = ["IP", "Timestamp", "Request", "Status", "Size", "Referrer", "User-Agent"]
df_log_new = df_log.select(columns_to_select)

In [10]:
df_log_new.show(truncate=False)

+-------------+--------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-----+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
|IP           |Timestamp           |Request                                                                                                                                                                                    |Status|Size |Referrer                                                                                                                                                                                              |User-Agent|
+-------------+--------------------+--------------------------------------------------------------------

Структура файла значительно улучшилась, но мы видим, что значения устройства попадает в столбец "Referrer", хотелось бы, чтобы эти значения состояли в отдельном столбце

In [11]:
unique_user_agents = df_log_new.select("User-Agent").distinct()
count_unique_user_agents = unique_user_agents.count()
print(count_unique_user_agents)

1558


In [12]:
from pyspark.sql.functions import regexp_extract, regexp_replace

Извлечем значения в скобках в отдельный столбец

In [13]:
df_log_new = df_log_new.withColumn("Device", regexp_extract(df_log_new.Referrer, r"\((.*?)\)", 1))

In [14]:
# Удаление данных из столбца Referrer, которые содержатся в столбце Device
df_log_new = df_log_new.withColumn("Referrer", regexp_replace(df_log_new.Referrer, r"\(.*?\)", ""))

In [15]:
df_log_new.show(truncate=False)

+-------------+--------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-----+--------------------------------------------------------------------------+----------+----------------------------------------------------------+
|IP           |Timestamp           |Request                                                                                                                                                                                    |Status|Size |Referrer                                                                  |User-Agent|Device                                                    |
+-------------+--------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+---

In [16]:
import pyspark.sql.functions as F

In [17]:
# Проверка на наличие пропущенных значений
df_log_new.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_log_new.columns]).show()

+---+---------+-------+------+----+--------+----------+------+
| IP|Timestamp|Request|Status|Size|Referrer|User-Agent|Device|
+---+---------+-------+------+----+--------+----------+------+
|  0|        0|      0|     0|   0|       0|         0|     0|
+---+---------+-------+------+----+--------+----------+------+



In [18]:
unique_user_agents = df_log_new.groupBy("User-Agent").count()
unique_user_agents.show(20, truncate=False)

+---------------+-----+
|User-Agent     |count|
+---------------+-----+
|5.160.222.39   |7    |
|10.16.209.128  |1    |
|5.120.126.172  |9    |
|5.125.28.183   |14   |
|79.127.0.178   |4    |
|37.129.253.117 |16   |
|5.112.157.124  |2    |
|78.38.193.27   |3    |
|5.235.199.183  |65   |
|83.123.163.193 |18   |
|5.236.217.44   |18   |
|216.52.207.114 |68   |
|5.120.251.244  |1    |
|5.116.116.63   |56   |
|5.122.249.42   |1    |
|5.115.154.42   |6    |
|5.233.212.248  |2    |
|5.116.149.132  |1    |
|151.233.253.107|10   |
|94.23.238.218  |1    |
+---------------+-----+
only showing top 20 rows



In [19]:
df_log_new.printSchema()

root
 |-- IP: string (nullable = true)
 |-- Timestamp: string (nullable = true)
 |-- Request: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Referrer: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- Device: string (nullable = true)



In [20]:
# Подсчет общего количества строк в DataFrame
total_rows = df_log_new.count()

In [21]:
# Подсчет количества уникальных строк в DataFrame
unique_rows = df_log_new.distinct().count()

In [22]:
# Вычисление количества дубликатов
duplicate_rows = total_rows - unique_rows

In [23]:
print(f"Количество дубликатов: {duplicate_rows}")

Количество дубликатов: 110791


In [24]:
print(f"Количество уникальных значений: {unique_rows}")

Количество уникальных значений: 10254361


In [25]:
# Группировка данных по уникальным паттернам и подсчет количества вхождений
pattern_counts = df_log_new.groupBy("IP", "Timestamp", "Request", "Status", "Size", "Referrer", "User-Agent", "Device").count()


In [26]:
# Сортировка по количеству вхождений в убывающем порядке
sorted_patterns = pattern_counts.orderBy(col("count").desc())

In [27]:
# Вывод отсортированного списка повторяющихся паттернов
sorted_patterns.show(truncate=False)

+---------------+--------------------+---------------------------------------------------------------------------------------------------------------------------+------+----+-------------------------------------------------------------------------+----------+------------------------------------+-----+
|IP             |Timestamp           |Request                                                                                                                    |Status|Size|Referrer                                                                 |User-Agent|Device                              |count|
+---------------+--------------------+---------------------------------------------------------------------------------------------------------------------------+------+----+-------------------------------------------------------------------------+----------+------------------------------------+-----+
|5.219.193.129  |25/Jan/2019:00:12:34|GET /static/images/exists.png HTTP/1.1               

# Импорт csv-файла и его анализ

In [28]:
df_csv = spark.read.csv("/content/drive/MyDrive/Проект 3.csv", header=True)

In [29]:
df_csv.printSchema()

root
 |-- client: string (nullable = true)
 |-- hostname: string (nullable = true)
 |-- alias_list: string (nullable = true)
 |-- address_list: string (nullable = true)



In [30]:
df_csv.show(truncate=False)

+---------------+-------------------------------+--------------------------------+-------------------+
|client         |hostname                       |alias_list                      |address_list       |
+---------------+-------------------------------+--------------------------------+-------------------+
|5.123.144.95   |5.123.144.95                   |[Errno 1] Unknown host          |NULL               |
|5.122.76.187   |5.122.76.187                   |[Errno 1] Unknown host          |NULL               |
|5.215.249.99   |5.215.249.99                   |[Errno 1] Unknown host          |NULL               |
|31.56.102.211  |31-56-102-211.shatel.ir        |['211.102.56.31.in-addr.arpa']  |['31.56.102.211']  |
|5.123.166.223  |5.123.166.223                  |[Errno 1] Unknown host          |NULL               |
|5.160.26.98    |5.160.26.98                    |[Errno 1] Unknown host          |NULL               |
|5.127.147.132  |5.127.147.132                  |[Errno 1] Unknown host  

# Объединение двух потоков данных

Объединим два датафрейма по типу "inner"

In [31]:
joined_df = df_csv.join(df_log_new, df_csv["client"] == df_log_new["IP"], "inner")

In [32]:
joined_df.printSchema()

root
 |-- client: string (nullable = true)
 |-- hostname: string (nullable = true)
 |-- alias_list: string (nullable = true)
 |-- address_list: string (nullable = true)
 |-- IP: string (nullable = true)
 |-- Timestamp: string (nullable = true)
 |-- Request: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Referrer: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- Device: string (nullable = true)



Так как датафреймы объединены по типу "inner", значение столбцов "client" и "IP" идентичны. Поэтому, чтобы не утяжелять датафрейм, удалим один из столбцоы (я выбрала "IP")

In [33]:
joined_df = joined_df.drop("IP")

In [34]:
joined_df.show()

+-----------+-----------+--------------------+------------+--------------------+--------------------+------+-----+--------------------+----------+--------------------+
|     client|   hostname|          alias_list|address_list|           Timestamp|             Request|Status| Size|            Referrer|User-Agent|              Device|
+-----------+-----------+--------------------+------------+--------------------+--------------------+------+-----+--------------------+----------+--------------------+
|1.234.99.77|1.234.99.77|[Errno 1] Unknown...|        NULL|25/Jan/2019:04:12:00|GET /filter/b1,p6...|   200|31739|Mozilla/5.0  like...|         -|Windows NT 10.0; ...|
|1.234.99.77|1.234.99.77|[Errno 1] Unknown...|        NULL|25/Jan/2019:04:12:01|GET /image/%7B%7B...|   200|    5|Mozilla/5.0  like...|         -|Windows NT 10.0; ...|
|1.234.99.77|1.234.99.77|[Errno 1] Unknown...|        NULL|25/Jan/2019:04:12:01|GET /image/19/bra...|   200| 2595|Mozilla/5.0  like...|         -|Windows NT 10.

# Выполнение скриптов для формирования витрины

In [35]:
from pyspark.sql.window import Window
from pyspark.sql.functions import monotonically_increasing_id, split
from pyspark.sql import functions as F

In [36]:
# 1. Суррогатный ключ устройства
df_with_device_id = joined_df.withColumn("Device_id", monotonically_increasing_id())

In [37]:
# 2. Название устройства
df_with_device_name = df_with_device_id.withColumn("Device_name", split(df_with_device_id["Device"], ";")[0])

In [38]:
# 3. Количество пользователей
user_count = joined_df.select("client").distinct().count()
df_with_user_count = joined_df.withColumn("User_count", F.lit(user_count))
df_with_user_count = df_with_user_count.withColumn("User_count", F.lit(user_count))

In [39]:
# 4. Доля пользователей данного устройства от общего числа пользователей
device_user_count = df_with_device_name.select("client", "Device_id").distinct().groupBy("Device_id").count()
device_user_count = device_user_count.withColumn("User_percentage", device_user_count["count"] / user_count)

In [40]:
# 5. Количество совершенных действий для данного устройства
action_count = df_with_device_name.groupBy("Device_id").count()
df_with_action_count = df_with_device_name.withColumn("Action_count", F.col("Device_id")) \
    .join(action_count, "Device_id")

In [41]:
# 6. Доля совершенных действий с данного устройства относительно других устройств
total_action_count = df_with_device_name.select("client", "Device_id").distinct().groupBy("Device_id").count()
device_action_count = df_with_device_name.groupBy("Device_id").count()
device_action_percentage = device_action_count.join(total_action_count, "Device_id") \
    .withColumn("Action_percentage", device_action_count["count"] / total_action_count["count"])

In [42]:
# 7. Список из 5 самых популярных браузеров, используемых на данном устройстве различными пользователями,
# с указанием доли использования для данного браузера относительно остальных браузеров
top_browsers = df_with_device_name.groupBy("Device_id", "User-Agent").count()
window_spec = Window.partitionBy("Device_id").orderBy(F.desc("count"))
top_browsers = top_browsers.withColumn("Rank", F.row_number().over(window_spec))
top_browsers = top_browsers.filter(F.col("Rank") <= 5)
total_counts = top_browsers.groupBy("Device_id").agg(F.sum("count").alias("Total_count"))
top_browsers = top_browsers.join(total_counts, on="Device_id", how="inner")
top_browsers = top_browsers.withColumn("% of Total", F.expr(f"(`count` / Total_count) * 100"))
top_browsers = top_browsers.select(
    "Device_id",
    F.col("User-Agent").alias("Browser"),
    "% of Total"
)
top_browsers = top_browsers.orderBy(F.desc("% of Total")).limit(5)

In [43]:
# 8. Количество ответов сервера, отличных от 200, на данном устройстве
error_count = df_with_device_name.filter(F.col("Status") != "200").groupBy("Device_id").count()

df_with_error_count = df_with_device_name.join(
    error_count.select("Device_id", "count").withColumnRenamed("count", "Error_count"),
    on="Device_id",
    how="left"
)

In [44]:
# 9. Для каждого из ответов сервера, отличных от 200, сформировать поле, в котором будет содержаться
# количество ответов данного типа
error_responses = df_with_device_name.filter(F.col("Status") != "200").groupBy("Device_id", "Status").count()
error_response_cols = error_responses.select("Status").distinct().rdd.flatMap(lambda x: x).collect()

for col in error_response_cols:
    error_responses = error_responses.withColumn(col, F.when(F.col("Status") == col, F.col("count")).otherwise(0))

# Создание финального датафрейма

In [45]:
data_mart = df_with_device_name \
    .join(df_with_user_count.select("client", "User_count"), "client", "left") \
    .join(device_user_count.select("Device_id", "User_percentage"), "Device_id", "left") \
    .join(df_with_action_count.select("Device_id", "Action_count"), "Device_id", "left") \
    .join(device_action_percentage.select("Device_id", "Action_percentage"), "Device_id", "left") \
    .join(top_browsers.select("Device_id", "Browser", "% of Total"), "Device_id", "left") \
    .join(df_with_error_count.select("Device_id", "Error_count"), "Device_id", "left") \
    .join(error_responses, "Device_id", "left") \
    .select("Device_id", "Device_name", "User_count", "User_percentage", "Action_count", "Action_percentage", "% of Total", "Error_count", *error_response_cols)

data_mart.printSchema()

root
 |-- Device_id: long (nullable = false)
 |-- Device_name: string (nullable = true)
 |-- User_count: integer (nullable = true)
 |-- User_percentage: double (nullable = true)
 |-- Action_count: long (nullable = true)
 |-- Action_percentage: double (nullable = true)
 |-- % of Total: double (nullable = true)
 |-- Error_count: long (nullable = true)
 |-- 408: long (nullable = true)
 |-- 401: long (nullable = true)
 |-- 302: long (nullable = true)
 |-- 502: long (nullable = true)
 |-- 404: long (nullable = true)
 |-- 403: long (nullable = true)
 |-- 500: long (nullable = true)
 |-- 304: long (nullable = true)
 |-- 400: long (nullable = true)
 |-- 499: long (nullable = true)
 |-- 504: long (nullable = true)
 |-- 301: long (nullable = true)
 |-- 414: long (nullable = true)
 |-- 405: long (nullable = true)
 |-- 206: long (nullable = true)



# Пример использования Apache Airflow для автоматического запуска скрипта

In [46]:
!pip install apache-airflow

Collecting apache-airflow
  Downloading apache_airflow-2.7.3-py3-none-any.whl (12.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.9/12.9 MB[0m [31m47.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting WTForms<3.1.0 (from apache-airflow)
  Downloading WTForms-3.0.1-py3-none-any.whl (136 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m136.5/136.5 kB[0m [31m14.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting alembic<2.0,>=1.6.3 (from apache-airflow)
  Downloading alembic-1.13.0-py3-none-any.whl (230 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m230.6/230.6 kB[0m [31m19.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting argcomplete>=1.10 (from apache-airflow)
  Downloading argcomplete-3.1.6-py3-none-any.whl (41 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.7/41.7 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting asgiref (from apache-airflow)
  Downloading asgiref-3.7.2-py3-none-any.wh

In [47]:
!mkdir ~/airflow

In [48]:
!echo "[core]" >> ~/airflow/airflow.cfg
!echo "dags_folder = /content/dags" >> ~/airflow/airflow.cfg

In [49]:
!mkdir /content/dags

In [55]:
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python import PythonOperator

In [56]:
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 12, 25),
    'retry_delay': timedelta(minutes=5),
    'schedule_interval': '0 0 * * MON',
}

In [57]:
def execute_my_dag():
    exec(open('/content/dags/my_dag.py').read())

In [58]:
with DAG('my_dag', default_args=default_args, max_active_tasks=1, schedule=default_args['schedule_interval']) as dag:
    start = DummyOperator(task_id='start')
    execute = PythonOperator(task_id='execute_my_dag', python_callable=execute_my_dag)
    end = DummyOperator(task_id='end')

start >> execute >> end

<Task(EmptyOperator): end>

In [59]:
!airflow webserver --port 8080

[[34m2023-12-04T18:54:06.250+0000[0m] {[34mconfiguration.py:[0m2049} INFO[0m - Creating new FAB webserver config file in: [01m/root/airflow/webserver_config.py[22m[0m

Please confirm database initialize (or wait 4 seconds to skip it). Are you sure? [y/N]
[[34m2023-12-04T18:54:11.966+0000[0m] {[34mdb.py:[0m903} INFO[0m - Log template table does not exist (added in 2.3.0); skipping log template sync.[0m
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
Timeout: 120
Logfiles: - -
Access Logformat: 
[[34m2023-12-04T18:54:13.868+0000[0m] {[34mmanager.py:[0m102} INFO[0m - Security DB not found Creating all Models from Base[0m
[[34m2023-12-04T18:54:14.819+0000[0m] {[34mmanager.py:[0m104} INFO[0m - Security DB Created

In [None]:
!airflow scheduler



Please confirm database initialize (or wait 4 seconds to skip it). Are you sure? [y/N]
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[[34m2023-12-04T18:56:18.611+0000[0m] {[34mexecutor_loader.py:[0m117} INFO[0m - Loaded executor: [01mSequentialExecutor[22m[0m
[2023-12-04 18:56:18 +0000] [10332] [INFO] Starting gunicorn 21.2.0
[2023-12-04 18:56:18 +0000] [10332] [INFO] Listening at: http://[::]:8793 (10332)
[2023-12-04 18:56:18 +0000] [10332] [INFO] Using worker: sync
[2023-12-04 18:56:18 +0000] [10333] [INFO] Booting worker with pid: 10333
[[34m2023-12-04T18:56:18.856+0000[0m] {[34mscheduler_job_runner.py:[0m797} INFO[0m - Starting the scheduler[0m
[[34m2023-12-04T18:56:18.858+0000[0m] {[34mscheduler_job_runner.py:[0m804} INFO[0m - Processing each file at most -1 times[0m
[[34m