In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, col


spark = SparkSession.builder \
.appName('log-analysis') \
.enableHiveSupport() \
.getOrCreate()


# Путь в HDFS
hdfs_path = 'logfiles.log'


# Читаем как текст
logs = spark.read.text(hdfs_path)


# Парсим Common Log Format (пример регулярки)
# Формат: host ident authuser [date] "request" status bytes
host_re = r'^(\S+)'
request_re = r'\"(GET|POST|PUT|DELETE)\s(\S+)\sHTTP/\d.\d\"'
status_re = r'\s(\d{3})\s'


logs_parsed = logs.select(
regexp_extract('value', host_re, 1).alias('host'),
regexp_extract('value', request_re, 2).alias('path'),
regexp_extract('value', status_re, 1).cast('integer').alias('status')
)


logs_parsed.createOrReplaceTempView('access_logs')


# Примеры SQL-запросов
q1 = spark.sql('SELECT status, count(*) as cnt FROM access_logs GROUP BY status ORDER BY cnt DESC')
q2 = spark.sql("SELECT path, count(*) as hits FROM access_logs GROUP BY path ORDER BY hits DESC LIMIT 20")


q1.show()
q2.show()


# Сохранить результат в HDFS (CSV)
q2.write.mode('overwrite').csv('hdfs://namenode:9000/jupyter/youruser/results/top_paths')


spark.stop()

+------+------+
|status|   cnt|
+------+------+
|   500|143467|
|   404|142938|
|   303|142821|
|   502|142793|
|   403|142697|
|   200|142653|
|   304|142631|
+------+------+

+--------------------+------+
|                path|  hits|
+--------------------+------+
|/usr/admin/developer|200224|
|       /usr/register|200099|
|          /usr/login|200040|
|          /usr/admin|199881|
|                /usr|199756|
+--------------------+------+

