In [1]:
import re
from pyspark.sql import types as T
from pyspark.sql import SparkSession
from pyspark.sql import Row

In [2]:
S3_ACCESS_LOG_OUTPUT_SCHEMA = T.StructType([
    T.StructField("bucket_owner", T.StringType(), True),
    T.StructField("bucket", T.StringType(), True),
    T.StructField("time", T.StringType(), True),
    T.StructField("remote_ip", T.StringType(), True),
    T.StructField("requester", T.StringType(), True),
    T.StructField("request_id", T.StringType(), True),
    T.StructField("operation", T.StringType(), True),
    T.StructField("key", T.StringType(), True),
    T.StructField("request_uri", T.StringType(), True),
    T.StructField("http_status", T.StringType(), True),
    T.StructField("error_code", T.StringType(), True),
    T.StructField("bytes_sent", T.StringType(), True),
    T.StructField("object_size", T.StringType(), True),
    T.StructField("total_time", T.StringType(), True),
    T.StructField("turn_around_time", T.StringType(), True),
    T.StructField("referer", T.StringType(), True),
    T.StructField("user_agent", T.StringType(), True),
    T.StructField("version_id", T.StringType(), True),
    T.StructField("host_id", T.StringType(), True),
    T.StructField("signature_version", T.StringType(), True),
    T.StructField("cipher_suite", T.StringType(), True),
    T.StructField("authentication_type", T.StringType(), True),
    T.StructField("host_header", T.StringType(), True),
    T.StructField("tls_version", T.StringType(), True),
    T.StructField("error_line", T.StringType(), True)
])

In [3]:
S3_ACCESS_LOG_PATTERN = re.compile(
    r'(?P<bucket_owner>\S+) ' + 
    r'(?P<bucket>\S+) ' + 
    r'\[(?P<time>[^\]]+)\] ' + 
    r'(?P<remote_ip>\S+) ' +
    r'(?P<requester>\S+) ' + 
    r'(?P<request_id>\S+) ' + 
    r'(?P<operation>\S+) ' + 
    r'(?P<key>\S+) ' +
    r'(?P<request_uri>-|"-"|"\S+ \S+ (?:-|\S+)") ' + 
    r'(?P<http_status>\S+) ' + 
    r'(?P<error_code>\S+) ' + 
    r'(?P<bytes_sent>\S+) ' +
    r'(?P<object_size>\S+) ' + 
    r'(?P<total_time>\S+) ' + 
    r'(?P<turn_around_time>\S+) ' + 
    r'(?P<referer>-|"[^"]+") ' +
    r'(?P<user_agent>-|"[^"]+") ' + 
    r'(?P<version_id>\S+) ' + 
    r'(?P<host_id>\S+) ' + 
    r'(?P<signature_version>\S+) ' + 
    r'(?P<cipher_suite>\S+) ' + 
    r'(?P<authentication_type>\S+) ' + 
    r'(?P<host_header>\S+) ' + 
    r'(?P<tls_version>\S+)'
)

In [4]:
spark = SparkSession \
    .builder \
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic") \
    .config("spark.sql.caseSensitive", True) \
    .config("spark.sql.repl.eagerEval.enabled",True) \
    .appName("s3_server_logs") \
    .getOrCreate()

In [5]:
def parse_apache_log_line(logline):
    match = S3_ACCESS_LOG_PATTERN.search(logline)
    if match is None:
        return Row(
            bucket_owner=None,
            bucket=None,
            time=None,
            remote_ip=None,
            requester=None,
            request_id=None,
            operation=None,
            key=None,
            request_uri=None,
            http_status=None,
            error_code=None,
            bytes_sent=None,
            object_size=None,
            total_time=None,
            turn_around_time=None,
            referer=None,
            user_agent=None,
            version_id=None,
            host_id=None,
            signature_version=None,
            cipher_suite=None,
            authentication_type=None,
            host_header=None,
            tls_version=None,
            error_line=logline
        )
    
    return Row(
        bucket_owner=match.group('bucket_owner'),
        bucket=match.group('bucket'),
        time=match.group('time'),
        remote_ip=match.group('remote_ip'),
        requester=match.group('requester'),
        request_id=match.group('request_id'),
        operation=match.group('operation'),
        key=match.group('key'),
        request_uri=match.group('request_uri'),
        http_status=match.group('http_status'),
        error_code=match.group('error_code'),
        bytes_sent=match.group('bytes_sent'),
        object_size=match.group('object_size'),
        total_time=match.group('total_time'),
        turn_around_time=match.group('turn_around_time'),
        referer=match.group('referer'),
        user_agent=match.group('user_agent'),
        version_id=match.group('version_id'),
        host_id=match.group('host_id'),
        signature_version=match.group('signature_version'),
        cipher_suite=match.group('cipher_suite'),
        authentication_type=match.group('authentication_type'),
        host_header=match.group('host_header'),
        tls_version=match.group('tls_version'),
        error_line=None
    )

In [6]:
def parser_logs(path_logs):
    logs = spark.sparkContext.textFile(','.join(path_logs))
    logs = logs.map(lambda x: parse_apache_log_line(x))
    access_logs_df = spark.createDataFrame(logs, S3_ACCESS_LOG_OUTPUT_SCHEMA)
    return access_logs_df

In [7]:
access_logs_df = parser_logs(['./assets/s3_server_access_log.txt'])
access_logs_df

bucket_owner,bucket,time,remote_ip,requester,request_id,operation,key,request_uri,http_status,error_code,bytes_sent,object_size,total_time,turn_around_time,referer,user_agent,version_id,host_id,signature_version,cipher_suite,authentication_type,host_header,tls_version,error_line
79a59df900b949e55...,awsexamplebucket1,06/Feb/2019:00:00...,192.0.2.3,79a59df900b949e55...,3E57427F3EXAMPLE,REST.GET.VERSIONING,-,"""GET /awsexampleb...",200,-,113,-,7,-,"""-""","""S3Console/0.4""",-,s9lzHYrFp76ZVxRcp...,SigV2,ECDHE-RSA-AES128-...,AuthHeader,awsexamplebucket1...,TLSV1.1,
79a59df900b949e55...,awsexamplebucket1,06/Feb/2019:00:00...,192.0.2.3,79a59df900b949e55...,891CE47D2EXAMPLE,REST.GET.LOGGING_...,-,"""GET /awsexampleb...",200,-,242,-,11,-,"""-""","""S3Console/0.4""",-,9vKBE6vMhrNiWHZmb...,SigV2,ECDHE-RSA-AES128-...,AuthHeader,awsexamplebucket1...,TLSV1.1,
79a59df900b949e55...,awsexamplebucket1,06/Feb/2019:00:00...,192.0.2.3,79a59df900b949e55...,A1206F460EXAMPLE,REST.GET.BUCKETPO...,-,"""GET /awsexampleb...",404,NoSuchBucketPolicy,297,-,38,-,"""-""","""S3Console/0.4""",-,BNaBsXZQQDbssi6xM...,SigV2,ECDHE-RSA-AES128-...,AuthHeader,awsexamplebucket1...,TLSV1.1,
79a59df900b949e55...,awsexamplebucket1,06/Feb/2019:00:01...,192.0.2.3,79a59df900b949e55...,7B4A0FABBEXAMPLE,REST.GET.VERSIONING,-,"""GET /awsexampleb...",200,-,113,-,33,-,"""-""","""S3Console/0.4""",-,Ke1bUcazaN1jWuUlP...,SigV2,ECDHE-RSA-AES128-...,AuthHeader,awsexamplebucket1...,TLSV1.1,
79a59df900b949e55...,awsexamplebucket1,06/Feb/2019:00:01...,192.0.2.3,79a59df900b949e55...,DD6CC733AEXAMPLE,REST.PUT.OBJECT,s3-dg.pdf,"""PUT /awsexampleb...",200,-,-,4406583,41754,28,"""-""","""S3Console/0.4""",-,10S62Zv81kBW7BB6S...,SigV4,ECDHE-RSA-AES128-SHA,AuthHeader,awsexamplebucket1...,TLSV1.1,
