# Анализ логов

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, FloatType, DateType, TimestampType

from datetime import datetime
from pyspark.sql.functions import *

!pip install pandas
import pandas as pd

!pip install pyyaml ua-parser user-agents
from user_agents import parse
# библиотека для парсинга user-agent


# импорт модулей и библиотек



In [91]:
spark = SparkSession \
    .builder \
    .appName("Log analysis") \
    .config("spark.jars", "C:/Users/raspa/Desktop/spark-3.3.1-bin-hadoop3/jars/postgresql-42.5.1.jar") \
    .config("spark.driver.bindAddress","localhost") \
    .config("spark.ui.port","4040") \
    .getOrCreate()

# создание spark сессии

In [4]:
client_hostname = spark.read.csv('C:/Users/raspa/Desktop/data_ignore/client_hostname.csv', header=True)
# csv файл с данными клиентов

In [5]:
client_hostname.show(15)

+---------------+--------------------+--------------------+-------------------+
|         client|            hostname|          alias_list|       address_list|
+---------------+--------------------+--------------------+-------------------+
|   5.123.144.95|        5.123.144.95|[Errno 1] Unknown...|               null|
|   5.122.76.187|        5.122.76.187|[Errno 1] Unknown...|               null|
|   5.215.249.99|        5.215.249.99|[Errno 1] Unknown...|               null|
|  31.56.102.211|31-56-102-211.sha...|['211.102.56.31.i...|  ['31.56.102.211']|
|  5.123.166.223|       5.123.166.223|[Errno 1] Unknown...|               null|
|    5.160.26.98|         5.160.26.98|[Errno 1] Unknown...|               null|
|  5.127.147.132|       5.127.147.132|[Errno 1] Unknown...|               null|
|  158.58.30.218|       158.58.30.218|[Errno 1] Unknown...|               null|
|   86.55.230.86|        86.55.230.86|[Errno 1] Unknown...|               null|
|   89.35.65.186|        89.35.65.186|[E

In [6]:
def edit_alias_list(str):
    if str == "[Errno 1] Unknown host":
        return "unknown"
    else:
        return str[2:len(str)-2]
    
    
def edit_address_list(str):
    if str == "null":
        return "unknown"
    elif str is None:
        return "unknown"
    else:
        return str[2:len(str)-2]
    
# функция для замены пропущенных значений и редактировании строк(очищает от скобок и кавычек в строке)

In [7]:
func_alias_list = udf(edit_alias_list, StringType())
func_address_list = udf(edit_address_list, StringType())

In [8]:
client_hostname = client_hostname.withColumn("alias_list_new", func_alias_list("alias_list"))
client_hostname = client_hostname.drop("alias_list")
client_hostname = client_hostname.withColumn("address_list_new", func_address_list("address_list"))
client_hostname = client_hostname.drop("address_list")

In [9]:
client_hostname = client_hostname.withColumnRenamed("alias_list_new", "alias_list")\
       .withColumnRenamed("address_list_new", "address_list")

In [10]:
df = pd.read_table('C:/Users/raspa/Desktop/data_ignore/access.log', engine='python', header=None, 
                   names=["ip", "datetime", "request", "code_req", "port", "user_agent"], 
                   sep=" - - \[{1}(.+)\] \"(.*?)\" (\d+) (\d+) .+ \"(.{2,}?)\".*",  index_col=False)

# загрузка главного файла с данными
# первичная обработка с помощью RexEx

In [11]:
df.head(5)

Unnamed: 0,ip,datetime,request,code_req,port,user_agent
0,54.36.149.41,22/Jan/2019:03:56:14 +0330,GET /filter/27|13%20%D9%85%DA%AF%D8%A7%D9%BE%D...,200.0,30577.0,Mozilla/5.0 (compatible; AhrefsBot/6.1; +http:...
1,31.56.96.51,22/Jan/2019:03:56:16 +0330,GET /image/60844/productModel/200x200 HTTP/1.1,200.0,5667.0,Mozilla/5.0 (Linux; Android 6.0; ALE-L21 Build...
2,31.56.96.51,22/Jan/2019:03:56:16 +0330,GET /image/61474/productModel/200x200 HTTP/1.1,200.0,5379.0,Mozilla/5.0 (Linux; Android 6.0; ALE-L21 Build...
3,40.77.167.129,22/Jan/2019:03:56:17 +0330,GET /image/14925/productModel/100x100 HTTP/1.1,200.0,1696.0,Mozilla/5.0 (compatible; bingbot/2.0; +http://...
4,91.99.72.15,22/Jan/2019:03:56:17 +0330,GET /product/31893/62100/%D8%B3%D8%B4%D9%88%D8...,200.0,41483.0,Mozilla/5.0 (Windows NT 6.2; Win64; x64; rv:16...


In [26]:
df.code_req = df.code_req.fillna(0)
df.port = df.port.fillna(0)
df = df.astype({"code_req": int, "port": int, "user_agent": str})


# замена пропущенных значений в столбцах на 0
# для конвертации в тип *integer*

In [14]:
format_date = '%d/%b/%Y:%H:%M:%S +0330'

In [15]:
df.datetime = df.datetime.apply(lambda x: datetime.strptime(x, format_date) if x is not(None) else 0)

# перевод даты из строки в формат datetime

In [None]:
df["browser"] = df.user_agent.apply(lambda x: parse(x).browser.family)

# вытащил из user-agent название браузера

In [None]:
df["device"] = df.user_agent.apply(lambda x: parse(x).device.brand)

# здесь вытащил бренд смартфона

In [56]:
df.head(5)

Unnamed: 0,ip,datetime,request,code_req,port,user_agent
0,54.36.149.41,2019-01-22 03:56:14,GET /filter/27|13%20%D9%85%DA%AF%D8%A7%D9%BE%D...,200,30577,Mozilla/5.0 (compatible; AhrefsBot/6.1; +http:...
1,31.56.96.51,2019-01-22 03:56:16,GET /image/60844/productModel/200x200 HTTP/1.1,200,5667,Mozilla/5.0 (Linux; Android 6.0; ALE-L21 Build...
2,31.56.96.51,2019-01-22 03:56:16,GET /image/61474/productModel/200x200 HTTP/1.1,200,5379,Mozilla/5.0 (Linux; Android 6.0; ALE-L21 Build...
3,40.77.167.129,2019-01-22 03:56:17,GET /image/14925/productModel/100x100 HTTP/1.1,200,1696,Mozilla/5.0 (compatible; bingbot/2.0; +http://...
4,91.99.72.15,2019-01-22 03:56:17,GET /product/31893/62100/%D8%B3%D8%B4%D9%88%D8...,200,41483,Mozilla/5.0 (Windows NT 6.2; Win64; x64; rv:16...


In [72]:
df2 = df[0:100000]
df2["browser"] = df2.user_agent.apply(lambda x: parse(x).browser.family)
df2["device"] = df2.user_agent.apply(lambda x: parse(x).device.brand if parse(x).device.brand is not(None) else parse(x).os.family)

# взял 100 тысяч строк из 10 миллионов, так как вылезает ошибка при создании датафрейма *java.lang.OutOfMemoryError: Java heap space*
# и добавление новых столбцов занимает очень долгое время

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df2["browser"] = df2.user_agent.apply(lambda x: parse(x).browser.family)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df2["device"] = df2.user_agent.apply(lambda x: parse(x).device.brand if parse(x).device.brand is not(None) else parse(x).os.family)


In [77]:
schema = StructType(fields=[
    StructField("ip", StringType()),
    StructField("datetime", TimestampType()),
    StructField("request", StringType()),
    StructField("code_req", IntegerType()),
    StructField("port", IntegerType()),
    StructField("user_agent", StringType()),
    StructField("browser", StringType()),
    StructField("device", StringType())
])

# схема для создания DataFrame pyspark из Pandas DataFrame

In [78]:
logs = spark.createDataFrame(df2, schema=schema)


In [79]:
logs.show(5)

+-------------+-------------------+--------------------+--------+-----+--------------------+-------------+-------+
|           ip|           datetime|             request|code_req| port|          user_agent|      browser| device|
+-------------+-------------------+--------------------+--------+-----+--------------------+-------------+-------+
| 54.36.149.41|2019-01-22 03:56:14|GET /filter/27|13...|     200|30577|Mozilla/5.0 (comp...|    AhrefsBot| Spider|
|  31.56.96.51|2019-01-22 03:56:16|GET /image/60844/...|     200| 5667|Mozilla/5.0 (Linu...|Chrome Mobile| Huawei|
|  31.56.96.51|2019-01-22 03:56:16|GET /image/61474/...|     200| 5379|Mozilla/5.0 (Linu...|Chrome Mobile| Huawei|
|40.77.167.129|2019-01-22 03:56:17|GET /image/14925/...|     200| 1696|Mozilla/5.0 (comp...|      bingbot| Spider|
|  91.99.72.15|2019-01-22 03:56:17|GET /product/3189...|     200|41483|Mozilla/5.0 (Wind...|      Firefox|Windows|
+-------------+-------------------+--------------------+--------+-----+---------

In [94]:
logs.select("ip", "datetime", "request", "code_req", "port", "user_agent", "browser", "device").write.format("jdbc")\
    .option("url", "jdbc:postgresql://localhost:5432/logs") \
    .mode("Overwrite") \
    .option("driver", "org.postgresql.Driver").option("dbtable", "log") \
    .option("user", "postgres").option("password", "0000").save()

# сохранение датафрейма logs в БД postgres

Py4JJavaError: An error occurred while calling o285.save.
: java.lang.ClassNotFoundException: org.postgresql.Driver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:101)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:101)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:101)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:229)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:233)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1589)


In [95]:
client_hostname.select("client", "hostname", "alias_list", "address_list").write.format("jdbc")\
    .option("url", "jdbc:postgresql://localhost:5432/logs") \
    .mode("Overwrite") \
    .option("driver", "org.postgresql.Driver").option("dbtable", "hostname") \
    .option("user", "postgres").option("password", "0000").save()

# сохранение датафрейма client_hostname в БД postgres

Py4JJavaError: An error occurred while calling o301.save.
: java.lang.ClassNotFoundException: org.postgresql.Driver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:101)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:101)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:101)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:229)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:233)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1589)
