In [9]:
from pyspark.sql import SparkSession, functions as F

# SparkSession
spark = SparkSession.builder \
    .appName('ReadLogsAndWriteHDFS') \
    .config('spark.hadoop.fs.defaultFS', 'hdfs://cluster-master:9000') \
    .config('spark.sql.catalogImplementation', 'hive') \
    .config('spark.driver.memory', '2g') \
    .config('spark.executor.memory', '1g') \
    .getOrCreate()

In [14]:
# Access Logları Okuma
access_df = spark.read.text('file:///dataops/logs/access.log*')

In [15]:
access_df.show(5, truncate=False)

[Stage 0:>                                                          (0 + 1) / 1]

+----------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                     |
+----------------------------------------------------------------------------------------------------------------------------------------------------------+
|192.168.100.100 - - [11/Apr/2025:20:45:48 +0000] "DELETE /static/script.js HTTP/1.1" 500 447 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 13_5 like Mac OS X)" |
|192.168.100.100 - - [11/Apr/2025:00:28:44 +0000] "DELETE /downloads/product_1 HTTP/1.1" 200 1184 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"          |
|192.168.100.100 - - [11/Apr/2025:18:02:41 +0000] "PUT /downloads/product_1 HTTP/1.1" 301 1286 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 13_5 like Mac OS X)"|
|192.168.100.100 - - [11/Apr/2025:02:09:16 +0000] "GET /st

                                                                                

In [16]:
# Access Loglarını Ayrıştırma

In [17]:
access_parse_regexp = r'(\S+) - - \[(.*?)\] "(\S+) (\S+) (\S+)" (\d+) (\d+|-) "(.*?)" "(.*?)"'

In [18]:
access_parsed_df = access_df.select(
    F.regexp_extract("value", access_parse_regexp, 1).alias("ip"),
    F.to_timestamp(F.regexp_extract("value", access_parse_regexp, 2),"dd/MMM/yyyy:HH:mm:ss Z").alias("log_timestamp"),
    F.regexp_extract("value", access_parse_regexp, 3).alias("log_method"),
    F.regexp_extract("value", access_parse_regexp, 4).alias("url"),
    F.regexp_extract("value", access_parse_regexp, 5).alias("protocol"),
    F.regexp_extract("value", access_parse_regexp, 6).alias("status"),
    F.regexp_extract("value", access_parse_regexp, 7).alias("log_size"),
    F.regexp_extract("value", access_parse_regexp, 8).alias("referer"),
    F.regexp_extract("value", access_parse_regexp, 9).alias("user_agent")
)


In [19]:
access_parsed_df.show(15, truncate=False)

+---------------+-------------------+----------+--------------------+--------+------+--------+-------+------------------------------------------------------+
|ip             |log_timestamp      |log_method|url                 |protocol|status|log_size|referer|user_agent                                            |
+---------------+-------------------+----------+--------------------+--------+------+--------+-------+------------------------------------------------------+
|192.168.100.100|2025-04-11 20:45:48|DELETE    |/static/script.js   |HTTP/1.1|500   |447     |-      |Mozilla/5.0 (iPhone; CPU iPhone OS 13_5 like Mac OS X)|
|192.168.100.100|2025-04-11 00:28:44|DELETE    |/downloads/product_1|HTTP/1.1|200   |1184    |-      |Mozilla/5.0 (Windows NT 10.0; Win64; x64)             |
|192.168.100.100|2025-04-11 18:02:41|PUT       |/downloads/product_1|HTTP/1.1|301   |1286    |-      |Mozilla/5.0 (iPhone; CPU iPhone OS 13_5 like Mac OS X)|
|192.168.100.100|2025-04-11 02:09:16|GET       |/sta

In [21]:
# Error Loglarını Okuma ve Ayrıştırma

In [22]:
error_df = spark.read.text('file:///dataops/logs/error.log*')

In [23]:
error_df.show(5, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                    |
+-----------------------------------------------------------------------------------------------------------------------------------------+
|[14/Apr/2025:08:59:20 +0000] [error] [client 24.147.180.192] script not found or unable to stat: /cgi-bin/test.cgi                       |
|[14/Apr/2025:20:56:40 +0000] [error] [client 159.162.98.228] script not found or unable to stat: /cgi-bin/test.cgi                       |
|[14/Apr/2025:01:14:59 +0000] [error] [client 160.232.0.253] upstream timed out (110: Connection timed out) while reading response header |
|[14/Apr/2025:01:31:46 +0000] [error] [client 193.76.143.129] upstream timed out (110: Connection timed out) while reading response header|
|[14/Apr/2025:16:01:

In [24]:
error_parse_regexp = r'\[(.*?)\] \[(\w+)\] \[client (\S+)\] (.*)'

In [25]:
error_parsed_df = error_df.select(
    F.to_timestamp(F.regexp_extract("value", error_parse_regexp, 1),"dd/MMM/yyyy:HH:mm:ss Z").alias("log_timestamp"),
    F.regexp_extract("value", error_parse_regexp, 2).alias("level"),      # Hata seviyesi
    F.regexp_extract("value", error_parse_regexp, 3).alias("client_ip"),  # İstemci IP
    F.regexp_extract("value", error_parse_regexp, 4).alias("message")     # Hata mesajı
)

In [26]:
error_parsed_df.show(5, truncate=False)

+-------------------+-----+--------------+----------------------------------------------------------------------------+
|log_timestamp      |level|client_ip     |message                                                                     |
+-------------------+-----+--------------+----------------------------------------------------------------------------+
|2025-04-14 08:59:20|error|24.147.180.192|script not found or unable to stat: /cgi-bin/test.cgi                       |
|2025-04-14 20:56:40|error|159.162.98.228|script not found or unable to stat: /cgi-bin/test.cgi                       |
|2025-04-14 01:14:59|error|160.232.0.253 |upstream timed out (110: Connection timed out) while reading response header|
|2025-04-14 01:31:46|error|193.76.143.129|upstream timed out (110: Connection timed out) while reading response header|
|2025-04-14 16:01:31|error|135.207.120.68|script not found or unable to stat: /cgi-bin/test.cgi                       |
+-------------------+-----+-------------

In [27]:
# DataFramelere Date Alanı Ekleme

In [28]:
access_parsed_df = access_parsed_df.withColumn("log_date", F.to_date(access_parsed_df.log_timestamp, "dd/MMM/yyyy"))

In [29]:
access_parsed_df.show(5, truncate=False)

+---------------+-------------------+----------+--------------------+--------+------+--------+-------+------------------------------------------------------+----------+
|ip             |log_timestamp      |log_method|url                 |protocol|status|log_size|referer|user_agent                                            |log_date  |
+---------------+-------------------+----------+--------------------+--------+------+--------+-------+------------------------------------------------------+----------+
|192.168.100.100|2025-04-11 20:45:48|DELETE    |/static/script.js   |HTTP/1.1|500   |447     |-      |Mozilla/5.0 (iPhone; CPU iPhone OS 13_5 like Mac OS X)|2025-04-11|
|192.168.100.100|2025-04-11 00:28:44|DELETE    |/downloads/product_1|HTTP/1.1|200   |1184    |-      |Mozilla/5.0 (Windows NT 10.0; Win64; x64)             |2025-04-11|
|192.168.100.100|2025-04-11 18:02:41|PUT       |/downloads/product_1|HTTP/1.1|301   |1286    |-      |Mozilla/5.0 (iPhone; CPU iPhone OS 13_5 like Mac OS X

In [30]:
error_parsed_df = error_parsed_df.withColumn("log_date", F.to_date(error_parsed_df.log_timestamp, "dd/MMM/yyyy"))

In [31]:
error_parsed_df.show(5, truncate=False)

+-------------------+-----+--------------+----------------------------------------------------------------------------+----------+
|log_timestamp      |level|client_ip     |message                                                                     |log_date  |
+-------------------+-----+--------------+----------------------------------------------------------------------------+----------+
|2025-04-14 08:59:20|error|24.147.180.192|script not found or unable to stat: /cgi-bin/test.cgi                       |2025-04-14|
|2025-04-14 20:56:40|error|159.162.98.228|script not found or unable to stat: /cgi-bin/test.cgi                       |2025-04-14|
|2025-04-14 01:14:59|error|160.232.0.253 |upstream timed out (110: Connection timed out) while reading response header|2025-04-14|
|2025-04-14 01:31:46|error|193.76.143.129|upstream timed out (110: Connection timed out) while reading response header|2025-04-14|
|2025-04-14 16:01:31|error|135.207.120.68|script not found or unable to stat: /cgi-

In [32]:
# Parquet Formatında Tarih Alanına Göre Partition Alarak HDFS' e Yazma

In [34]:
access_parsed_df.write \
    .partitionBy('log_date') \
    .mode('append') \
    .parquet('hdfs://cluster-master:9000/logs/access')

                                                                                

In [35]:
error_parsed_df.write \
    .partitionBy('log_date') \
    .mode('append') \
    .parquet('hdfs://cluster-master:9000/logs/errors')

                                                                                

In [73]:
spark.stop()