In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, date_format
from pyspark.sql.types import StringType,BooleanType,DateType,TimestampType

In [0]:
path = "/Volumes/workspace/default/web-server-access-logs/access.log"

df_raw = spark.read.text(path)

display(df_raw)

In [0]:
pattern = r'''^(\S+) (\S+) (\S+) \[([^\]]+)\] "(\S+)\s([^"]+?)\s(\S+)" (\d{3}) (\S+) "([^"]*)" "([^"]*)" "([^"]*)"'''

In [0]:
df = (df_raw
  .withColumn("ip",         F.regexp_extract("value", pattern, 1))
  .withColumn("ident",      F.regexp_extract("value", pattern, 2))
  .withColumn("authuser",   F.regexp_extract("value", pattern, 3))
  .withColumn("time_raw",   F.regexp_extract("value", pattern, 4))
  .withColumn("method",     F.regexp_extract("value", pattern, 5))
  .withColumn("request",    F.regexp_extract("value", pattern, 6))
  .withColumn("http_ver",   F.regexp_extract("value", pattern, 7))
  .withColumn("status",     F.regexp_extract("value", pattern, 8).cast("int"))
  .withColumn("bytes_raw",  F.regexp_extract("value", pattern, 9))
  .withColumn("referer",    F.regexp_extract("value", pattern, 10))
  .withColumn("user_agent", F.regexp_extract("value", pattern, 11))
  .withColumn("field12",    F.regexp_extract("value", pattern, 12))
  .withColumn("bytes", F.when(F.col("bytes_raw") == "-", None).otherwise(F.col("bytes_raw").cast("long")))
  .withColumn("ts", F.to_timestamp("time_raw", "dd/MMM/yyyy:HH:mm:ss Z"))
)

In [0]:
columns_drop = ["value", "ident", "authuser", "field12", "time_raw"]
df = df.drop(*columns_drop)

In [0]:
df.display()

In [0]:
df = df.select(
    col("ip").cast("string"),
    col("ts"),
    col("method").cast("string"),
    col("request").cast("string"),
    col("http_ver").cast("string"),
    col("status").cast("int"),
    col("bytes").cast("double"),
    col("referer").cast("string"),
    col("user_agent").cast("string"),
)

In [0]:
df.display()