In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("web server logs").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/01 18:27:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [169]:
df_raw = spark.read.text("access.log")
df_raw.show()

+--------------------+
|               value|
+--------------------+
|54.36.149.41 - - ...|
|31.56.96.51 - - [...|
|31.56.96.51 - - [...|
|40.77.167.129 - -...|
|91.99.72.15 - - [...|
|40.77.167.129 - -...|
|40.77.167.129 - -...|
|40.77.167.129 - -...|
|66.249.66.194 - -...|
|40.77.167.129 - -...|
|207.46.13.136 - -...|
|40.77.167.129 - -...|
|178.253.33.51 - -...|
|40.77.167.129 - -...|
|91.99.72.15 - - [...|
|40.77.167.129 - -...|
|207.46.13.136 - -...|
|40.77.167.129 - -...|
|40.77.167.129 - -...|
|66.249.66.194 - -...|
+--------------------+
only showing top 20 rows



In [170]:
import re
log_str = '54.36.149.41 - - [22/Jan/2019:03:56:14 +0330] "GET /filter/27|13%20%D9%85%DA%AF%D8%A7%D9%BE%DB%8C%DA%A9%D8%B3%D9%84,27|%DA%A9%D9%85%D8%AA%D8%B1%20%D8%A7%D8%B2%205%20%D9%85%DA%AF%D8%A7%D9%BE%DB%8C%DA%A9%D8%B3%D9%84,p53 HTTP/1.1" 200 30577 "-" "Mozilla/5.0 (compatible; AhrefsBot/6.1; +http://ahrefs.com/robot/)" "-"'
temp = '"Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)" "-"'
remote_host = r"(?P<remote_host>[\d\.]+)"
time_stamp = r"\[(?P<timestamp>\d\d/\w\w\w/\d\d\d\d:\d\d:\d\d:\d\d\s[+-]+\d\d\d\d)\]"

request = r"\"(?P<request_type>[\w]+)\s(?P<request_url>/.+)\s(?P<http_version>[\w/\d\.]+)\"\s(?P<response_code>[\d]+)\s(?P<response_bytes>[\d]+)"
user_agent = r"\"(?P<user_agent>[\w/\.\d]+).+"
log_pattern = f'{remote_host} - - {time_stamp} {request} "-" {user_agent} "-"'
re_pattern = re.compile(log_pattern)
print(re_pattern.match(log_str).groups())

('54.36.149.41', '22/Jan/2019:03:56:14 +0330', 'GET', '/filter/27|13%20%D9%85%DA%AF%D8%A7%D9%BE%DB%8C%DA%A9%D8%B3%D9%84,27|%DA%A9%D9%85%D8%AA%D8%B1%20%D8%A7%D8%B2%205%20%D9%85%DA%AF%D8%A7%D9%BE%DB%8C%DA%A9%D8%B3%D9%84,p53', 'HTTP/1.1', '200', '30577', 'Mozilla/5.0')


In [171]:
# 40.77.167.129 - - [22/Jan/2019:03:56:18 +0330] "GET /image/57710/productModel/100x100 HTTP/1.1" 200 1695 "-" "Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)" "-"
from pyspark.sql.functions import udf

def read_log(log_text):
    match_obj = re_pattern.match(log_text)
    if match_obj is None:
        return
    return (
        match_obj.group("remote_host"),
        match_obj.group("timestamp"),
        match_obj.group("request_type"),
        match_obj.group("request_url"),
        match_obj.group("http_version"),
        match_obj.group("response_code"),
        match_obj.group("response_bytes"),
        match_obj.group("user_agent"),
    )


In [172]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StructField, StringType, StructType
log_schema = StructType(
    [
        StructField("remote_host", StringType(), True),
        StructField("time_stamp", StringType(), True),
        StructField("request_type", StringType(), True),
        StructField("request_url", StringType(), True),
        StructField("http_version", StringType(), True),
        StructField("response_code", StringType(), True),
        StructField("response_bytes", StringType(), True),
        StructField("user_agent", StringType(), True),
    ]
)

udf_read_log = udf(read_log, log_schema)



In [173]:
df = df_raw.withColumn('log', udf_read_log(df_raw['value']))

In [179]:
df.select('log.*').show()

+-------------+--------------------+------------+--------------------+------------+-------------+--------------+-----------+
|  remote_host|          time_stamp|request_type|         request_url|http_version|response_code|response_bytes| user_agent|
+-------------+--------------------+------------+--------------------+------------+-------------+--------------+-----------+
| 54.36.149.41|22/Jan/2019:03:56...|         GET|/filter/27|13%20%...|    HTTP/1.1|          200|         30577|Mozilla/5.0|
|         null|                null|        null|                null|        null|         null|          null|       null|
|         null|                null|        null|                null|        null|         null|          null|       null|
|40.77.167.129|22/Jan/2019:03:56...|         GET|/image/14925/prod...|    HTTP/1.1|          200|          1696|Mozilla/5.0|
|  91.99.72.15|22/Jan/2019:03:56...|         GET|/product/31893/62...|    HTTP/1.1|          200|         41483|Mozilla/5.0|


In [180]:
df = df.select("log.*")


In [181]:
df.show()

+-------------+--------------------+------------+--------------------+------------+-------------+--------------+-----------+
|  remote_host|          time_stamp|request_type|         request_url|http_version|response_code|response_bytes| user_agent|
+-------------+--------------------+------------+--------------------+------------+-------------+--------------+-----------+
| 54.36.149.41|22/Jan/2019:03:56...|         GET|/filter/27|13%20%...|    HTTP/1.1|          200|         30577|Mozilla/5.0|
|         null|                null|        null|                null|        null|         null|          null|       null|
|         null|                null|        null|                null|        null|         null|          null|       null|
|40.77.167.129|22/Jan/2019:03:56...|         GET|/image/14925/prod...|    HTTP/1.1|          200|          1696|Mozilla/5.0|
|  91.99.72.15|22/Jan/2019:03:56...|         GET|/product/31893/62...|    HTTP/1.1|          200|         41483|Mozilla/5.0|
