__Общая задача:__ создать скрипт для формирования витрины на основе логов web-сайта.

In [1]:
import pandas as pd        # импортируем необходимые библиотеки
import numpy as np
import psycopg2
from datetime import datetime
import re
import httpagentparser
import csv

In [42]:
# все что нужно для спарка
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.functions import regexp_extract 
from pyspark.sql.functions import lit
from pyspark.sql.functions import col

In [3]:
# создаем спарк-сессию
spark = SparkSession.builder \
  .master("local[1]") \
  .appName("SparkFirst") \
  .appName("Timeout Troubleshooting") \
  .config("spark.executor.memory", "10g")\
  .config("spark.executor.cores", 5) \
  .config("spark.dynamicAllocation.enabled", "true") \
  .config("spark.dynamicAllocation.maxExecutors", 5) \
  .config("spark.network.timeout", "600s") \
  .config("spark.shuffle.service.enabled", "true") \
  .getOrCreate()

Так как исходный файл очень большой, и мощности моего компьютера не хватает для того, чтобы обсчитать его полностью, я разделила его на 4 файла. Отработаем алгоритм для одного

In [4]:
base_df = spark.read.text('access_4_1_1.log')
base_df.printSchema()

root
 |-- value: string (nullable = true)



In [5]:
base_df.show(3, truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                        |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|46.62.164.96 - - [26/Jan/2019:11:43:01 +0330] "GET /image/656/brand HTTP/1.1" 200 2222 "https://www.zanbil.ir/browse/home-appliances/%D

In [6]:
(base_df.filter(base_df['value'].isNull()).count()) # проверим есть ли нулевые строки в исходном датафрейме

0

При помощи регулярных выражений спарсим необходимые данные. Из лога я взяла не все данные, только хост, метод, протокол, статус и самое главное строку с User agent-информацией.

In [7]:
host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'              #регулярное выражение для хоста

In [8]:
method_uri_protocol_pattern = r'\"(\S+) (.*?) (\S+)\"'     #регулярное выражение для метода

In [9]:
status_pattern = r'\s(\d{3})\s'                     #регулярное выражение для ответа

In [10]:
user_agent_top=r'\S+\S+\".\"([^\(]+)\((.*)\)'      #регулярное выражение для начала UA-строки
          #мы его использовать в итоговой таблице не будем так, как оно для всех одинаковое и не несет полезной информации

In [11]:
user_agent_end=r'\((.*) '                       #регулярнео выражение для окончания UA-строки

In [12]:
logs_df = base_df.select(regexp_extract('value', host_pattern, 1).alias('host'),           # теперь объединяем все вместе
                         regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
                         regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
                         regexp_extract('value', user_agent_end, 1).alias('user_agent_end'))
logs_df.show(10, truncate=True)
print((logs_df.count(), len(logs_df.columns)))

+-------------+------+------+--------------------+
|         host|method|status|      user_agent_end|
+-------------+------+------+--------------------+
| 46.62.164.96|   GET|   200|compatible; MSIE ...|
|79.127.88.215|   GET|   200|Windows NT 6.1; r...|
|79.127.88.215|   GET|   200|Windows NT 6.1; r...|
|79.127.88.215|   GET|   200|Windows NT 6.1; r...|
| 46.62.164.96|   GET|   200|compatible; MSIE ...|
|79.127.88.215|   GET|   200|Windows NT 6.1; r...|
| 46.62.164.96|   GET|   200|compatible; MSIE ...|
|79.127.88.215|   GET|   200|Windows NT 6.1; r...|
|104.222.32.91|   GET|   200|code:%20obj[\x5C%...|
| 46.62.164.96|   GET|   200|compatible; MSIE ...|
+-------------+------+------+--------------------+
only showing top 10 rows

(50000, 4)


Проверим схему нашего датафрейма.

In [13]:
logs_df.printSchema()

root
 |-- host: string (nullable = true)
 |-- method: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- user_agent_end: string (nullable = true)



In [14]:
bad_rows_df = logs_df.filter(logs_df['host'].isNull()| 
                            logs_df['method'].isNull() |
                            logs_df['status'].isNull() |
                            logs_df['user_agent_end'].isNull())                    
bad_rows_df.count()   #пустых строчек нет, это хорошо!

0

Приступим к проведению анализа и построению витрины:
- Суррогатный ключ устройства

- Название устройства

- Количество пользователей

- Доля пользователей данного устройства от общего числа пользователей

- Количество совершенных действий для данного устройства

- Доля совершенных действий с данного устройства относительно других устройств

- Список из 5 самых популярных браузеров, используемых на данном устройстве различными пользователями, с указанием доли использования для данного браузера относительно остальных браузеров 

- Количество ответов сервера, отличных от 200 на данном устройстве

- Для каждого из ответов сервера, отличных от 200, сформировать поле, в котором будет содержаться количество ответов данного типа

Для определения устройства и браузера воспользуемся готовым парсером (https://pypi.org/project/httpagentparser/#description) (импортировали его вначале):

Метод httpagentparser.detect(s) дает ответ вида (пример) {'platform': {'name': 'Android', 'version': '6.0'}, 'os': {'name': 'Linux'}, 'bot': False, 'dist': {'name': 'Android', 'version': '6.0'}, 'browser': {'name': 'Chrome', 'version': '66.0.3359.158'}}

In [15]:
#тестовая строка
s='Linux; Android 6.0; ALE-L21 Build/HuaweiALE-L21) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.158 Mobile Safari/537.36'
y=httpagentparser.detect(s)
print('Платформа',y['platform']['name'],',браузер', y['browser']['name'])


Платформа Android ,браузер Chrome


In [16]:
#@F.udf                 # создадим пользовательскую функцию на основе парсера, которая возвращает название платформы
def device_def(x):
    try:
        y= httpagentparser.detect(x)
        if 'platform' in y:
            return f"{y['platform']['name']}"
        else:
            return 'No platform' 
    except:
        return 'Error'

In [17]:
device_def("Mozilla/5.0 (Linux; Android 6.0; ALE-L21 Build/HuaweiALE-L21) AppleWebKit/537.36 \
           (KHTML, like Gecko) Chrome/66.0.3359.158 Mobile Safari/537.36")

'Android'

In [18]:
#@F.udf                 # создадим пользовательскую функцию на основе парсера, которая возвращает название браузера
def browser_def(x):
    try:
        y= httpagentparser.detect(x)
        if 'browser' in y:
            return f"{y['browser']['name']}"
        else:
            return 'No browser' 
    except:
        return 'Error'

In [19]:
browser_def("Mozilla/5.0 (Linux; Android 6.0; ALE-L21 Build/HuaweiALE-L21) AppleWebKit/537.36 \
           (KHTML, like Gecko) Chrome/66.0.3359.158 Mobile Safari/537.36")

'Chrome'

In [20]:
#logs_df = logs_df.withColumn('platform', device_def(F.col('user_agent_end')))   # Я НЕ СМОГЛА РАЗОБРАТЬСЯ И ПОНЯТЬ 
#logs_df = logs_df.withColumn('browser', browser_def(F.col('user_agent_end')))    #  И ПОНЯТЬ ПОЧЕМУ ЭТО НЕ РАБОТАЕТ какие то жуткие ошибки
#logs_df.show(3) # создадим новый столбец с платформой и браузером        # пришлось искать обходной маневр-костыль

In [21]:
 count = 0                      # делаем костыльное решение для получения колонки с браузером
res = []
keys = []

for row in logs_df.toLocalIterator():
    count += 1
    row_dict = row.asDict()
    row_dict['browser'] = browser_def(row_dict['user_agent_end'])
    res.append(row_dict)

    if count == 1:
        keys = res[0].keys()
        with open('log_browser.csv', 'w', newline='') as output_file:
            dict_writer = csv.DictWriter(output_file, keys)
            dict_writer.writeheader()
            
    if count % 500000 == 0:
        print(count)
        with open('log_browser.csv', 'a', newline='') as output_file:
            dict_writer = csv.DictWriter(output_file, keys)
            dict_writer.writerows(res)
            del res
            res = []

with open('log_browser.csv', 'a', newline='') as output_file:
            dict_writer = csv.DictWriter(output_file, keys)
            dict_writer.writerows(res)
del res

In [22]:
log_browser = spark.read.csv('log_browser.csv', header=True, sep=',') # проверяем что все нормально записалось
log_browser.show(3)

+-------------+------+------+--------------------+--------------------+
|         host|method|status|      user_agent_end|             browser|
+-------------+------+------+--------------------+--------------------+
| 46.62.164.96|   GET|   200|compatible; MSIE ...|Microsoft Interne...|
|79.127.88.215|   GET|   200|Windows NT 6.1; r...|             Firefox|
|79.127.88.215|   GET|   200|Windows NT 6.1; r...|             Firefox|
+-------------+------+------+--------------------+--------------------+
only showing top 3 rows



In [23]:
log_browser=log_browser.drop('method').drop('status').drop('user_agent_end') # оставим только хост и браузер

In [24]:
count = 0                      # делаем аналогично для платформы
res = []
keys = []

for row in logs_df.toLocalIterator():
    count += 1
    row_dict = row.asDict()
    row_dict['platform'] = device_def(row_dict['user_agent_end'])
    res.append(row_dict)

    if count == 1:
        keys = res[0].keys()
        with open('log_platform.csv', 'w', newline='') as output_file:
            dict_writer = csv.DictWriter(output_file, keys)
            dict_writer.writeheader()
            
    if count % 500000 == 0:
        print(count)
        with open('log_platform.csv', 'a', newline='') as output_file:
            dict_writer = csv.DictWriter(output_file, keys)
            dict_writer.writerows(res)
            del res
            res = []

with open('log_platform.csv', 'a', newline='') as output_file:
            dict_writer = csv.DictWriter(output_file, keys)
            dict_writer.writerows(res)
del res

In [25]:
log_platform = spark.read.csv('log_platform.csv', header=True, sep=',') # проверяем что все нормально записалось
log_platform.show(3)

+-------------+------+------+--------------------+--------+
|         host|method|status|      user_agent_end|platform|
+-------------+------+------+--------------------+--------+
| 46.62.164.96|   GET|   200|compatible; MSIE ...| Windows|
|79.127.88.215|   GET|   200|Windows NT 6.1; r...| Windows|
|79.127.88.215|   GET|   200|Windows NT 6.1; r...| Windows|
+-------------+------+------+--------------------+--------+
only showing top 3 rows



In [26]:
df_new = log_platform.join(log_browser, 'host', how = 'inner').drop('user_agent_end') # колонку user_agent_end можем удалить
df_new.show(3)           #теперь наконец-то  у нас есть все для анализа! Уррра!

+------------+------+------+--------+--------------------+
|        host|method|status|platform|             browser|
+------------+------+------+--------+--------------------+
|46.62.164.96|   GET|   200| Windows|Microsoft Interne...|
|46.62.164.96|   GET|   200| Windows|Microsoft Interne...|
|46.62.164.96|   GET|   200| Windows|Microsoft Interne...|
+------------+------+------+--------+--------------------+
only showing top 3 rows



Кэшируем df_new для дальнейшего анализа:

In [27]:
df_new.cache()

DataFrame[host: string, method: string, status: string, platform: string, browser: string]

In [28]:
df_new.printSchema()

root
 |-- host: string (nullable = true)
 |-- method: string (nullable = true)
 |-- status: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- browser: string (nullable = true)



Уберем записи,где нам не удалось определить платформу 

In [51]:
df_p = df_new.filter(df_new.platform != 'None')  

In [52]:
clean_df = df_p.filter(df_new.browser != 'No browser') # убираем те записи, где не удалось определить браузер

In [53]:
clean_df.cache()

DataFrame[host: string, method: string, status: string, platform: string, browser: string]

Количество пользователей - это количество уникальных хостов:

In [54]:
total_users = clean_df.select('host').distinct().count()
total_users

1355

Посмотрим, какие у нас есть типы устройств:

In [55]:
device_name = clean_df.select('platform').distinct().show(13)

+--------------------+
|            platform|
+--------------------+
|                 iOS|
|              Mac OS|
| like Gecko) Chro...|
| like Gecko) Vers...|
|               Linux|
| like Gecko) Vers...|
| like Gecko) Chro...|
| like Gecko) Chro...|
| like Gecko) Chro...|
| like Gecko) Chro...|
|             Android|
|             Windows|
+--------------------+



Количество пользователей устройства - это пользователи, сгруппированные по устройству:

In [56]:
device_users=clean_df.groupby('platform').agg(count('host').alias('device_users'))                

Доля пользователей по устройствам:

In [57]:
df0 = clean_df.groupby(col('platform')).agg(count('platform').alias('device_users'))\
              .withColumn('part_device_users', round(F.col('device_users')/total_users,2))
df0.show(3)

+--------------------+------------+-----------------+
|            platform|device_users|part_device_users|
+--------------------+------------+-----------------+
|                 iOS|       67357|            49.71|
|              Mac OS|       45879|            33.86|
| like Gecko) Chro...|           8|             0.01|
+--------------------+------------+-----------------+
only showing top 3 rows



Количество совершенных действий для данного устройства - это группировка по платформе и подсчет методов (GET, POST):

In [58]:
device_actions=clean_df.groupby(col('platform')).agg(count('method').alias('device_actions'))

Всего действий - это подсчет значений колонки методов:

In [59]:
total_actions = clean_df.select('method').count()
total_actions

8520329

Доля совершенных действий с данного устройства относительно других устройств - это 

In [60]:
df1 = clean_df.groupby(col('platform')).agg(count("method").alias("device_actions"))\
        .withColumn('part_device_actions', round(F.col('device_actions')/total_actions,2))

Посмотрим, какие у нас браузеры существуют в этой выборке:

In [62]:
browser_name = clean_df.select('browser').distinct().show(10)

+--------------------+
|             browser|
+--------------------+
|           GoogleBot|
| like Gecko) Chro...|
|      Yandex.Browser|
|      AndroidBrowser|
|             Firefox|
| like Gecko) Vers...|
|              Safari|
| like Gecko) Vers...|
| like Gecko) Chro...|
|           ChromeiOS|
+--------------------+
only showing top 10 rows



In [63]:
total_browser_cnt = clean_df.select('browser').count() 

In [64]:
browser_cnt = clean_df.groupby(col('platform')).agg(count('browser').alias('browser_cnt'))

Доля использования для данного браузера относительно остальных браузеров - это 

In [68]:
df2 = clean_df.groupby(col('platform')).agg(count('browser').alias('browser_cnt'))\
            .withColumn('part_browser', round(F.col('browser_cnt')/total_browser_cnt,2))  

Теперь все объединим и посмотрим промежуточный итог:

In [69]:
df3=df0.join(df1, on='platform', how='left').join(df2, on='platform', how='left').drop('count').drop('method')
df3.show(3)

+--------------------+------------+-----------------+--------------+-------------------+-----------+------------+
|            platform|device_users|part_device_users|device_actions|part_device_actions|browser_cnt|part_browser|
+--------------------+------------+-----------------+--------------+-------------------+-----------+------------+
|                 iOS|       67357|            49.71|         67357|               0.01|      67357|        0.01|
|              Mac OS|       45879|            33.86|         45879|               0.01|      45879|        0.01|
| like Gecko) Chro...|           8|             0.01|             8|                0.0|          8|         0.0|
+--------------------+------------+-----------------+--------------+-------------------+-----------+------------+
only showing top 3 rows



In [70]:
total_status=clean_df.groupBy('status').count().show()

+------+-------+
|status|  count|
+------+-------+
|   200|7873334|
|   408|     69|
|   401|   8802|
|   302| 326780|
|   502|   6241|
|   404|  20370|
|   403|    592|
|   500|    571|
|   304| 169986|
|   499| 110559|
|   301|   3025|
+------+-------+



Количество ответов 200:

In [73]:
answers_200 = clean_df.filter(clean_df.status == '200').groupBy(col('platform')).agg(count("status").alias("answers_200"))

Количество ответов сервера, отличных от 200 на данном устройстве:

In [74]:
answers_ne200 = clean_df.filter(clean_df.status != '200').groupBy(col('platform')).agg(count("status").alias("answers_ne200"))

Для каждого из ответов сервера, отличных от 200, сформировать поле, в котором будет содержаться количество ответов данного типа:

In [75]:
answers_3XX = clean_df.filter((clean_df.status >= '300') & (df_new.status<'400')).groupby(col('platform')).agg(count("status").alias("answers_3XX"))

In [76]:
answers_4XX = clean_df.filter((clean_df.status >= '400') & (df_new.status<'500')).groupby(col('platform')).agg(count("status").alias("answers_4XX"))

In [77]:
answers_5XX = clean_df.filter(clean_df.status > '500').groupby(col('platform')).agg(count("status").alias("answers_5XX"))

In [78]:
df_mart = df3.join(answers_200, on ='platform', how='left').join(answers_ne200, on='platform', how='left')\
       .join(answers_3XX, on ='platform', how='left').join(answers_4XX, on ='platform', how='left').join(answers_5XX, on ='platform', how='left')
df_mart.show(2)

+--------+------------+-----------------+--------------+-------------------+-----------+------------+-----------+-------------+-----------+-----------+-----------+
|platform|device_users|part_device_users|device_actions|part_device_actions|browser_cnt|part_browser|answers_200|answers_ne200|answers_3XX|answers_4XX|answers_5XX|
+--------+------------+-----------------+--------------+-------------------+-----------+------------+-----------+-------------+-----------+-----------+-----------+
|     iOS|       67357|            49.71|         67357|               0.01|      67357|        0.01|      64013|         3344|       1602|       1691|         51|
|  Mac OS|       45879|            33.86|         45879|               0.01|      45879|        0.01|      43484|         2395|       1795|        405|        195|
+--------+------------+-----------------+--------------+-------------------+-----------+------------+-----------+-------------+-----------+-----------+-----------+
only showing top

Запишем в файл:
 

In [79]:
df_res=df_mart.toPandas()

In [80]:
df_res['id'] = df_res.index + 1

In [81]:
df_res['answers_ne200'] = df_res['answers_ne200'].fillna(0)
df_res['answers_3XX'] = df_res['answers_3XX'].fillna(0) 
df_res['answers_4XX'] = df_res['answers_4XX'].fillna(0) 
df_res['answers_5XX'] = df_res['answers_5XX'].fillna(0)

In [82]:
df_res.to_csv('Log_datamart.csv', index = False)    #сохраним все значения в файл на всякий случай

Запишем в таблицу БД Тестлог (ее создали предварительно)

In [87]:
db_con = psycopg2.connect(database='Log',     # создадим подключение к созданной базе данных exrate
                        user='postgres',
                        password='1234',
                        host='localhost',
                        port=5432)
cur = db_con.cursor()

In [84]:
cur.execute(""" CREATE TABLE IF NOT EXISTS log_mart(        
    platform VARCHAR,
    device_users INTEGER,
    part_device_users FLOAT,
    device_actions INTEGER,
    part_device_actions FLOAT,
    browser_cnt INTEGER,
    part_browser FLOAT,
    answers_200 INTEGER,
    answers_ne200 DECIMAL,
    answers_3XX DECIMAL,
    answers_4XX DECIMAL,
    answers_5XX DECIMAL,
    id SERIAL PRIMARY KEY
)
""") 
db_con.commit()                              # создадим таблицу

In [85]:
with open('Log_datamart.csv', 'r') as f:
    next(f) 
    cur.copy_from(f, 'log_mart', sep=',')
    #Commit Changes
    db_con.commit()

In [88]:
cur.execute("""select * from log_mart;""")            #проверка записи
print(cur.fetchone())
db_con.commit()

('iOS', 67357, 49.71, 67357, 0.01, 67357, 0.01, 64013, Decimal('3344.0'), Decimal('1602.0'), Decimal('1691.0'), Decimal('51.0'), 1)


In [89]:
db_con.close()