In [1]:
# from pyspark.context import SparkContext
# from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
from pyspark.conf import SparkConf

# load up other dependencies
import re
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.functions import regexp_extract

In [2]:
# sc = SparkContext()
# sqlContext = SQLContext(sc)
# Set Spark Configuration
conf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("LogAnalytics") \
    .setExecutorEnv("spark.executor.memory", "4g") \
    .setExecutorEnv("spark.driver.memory", "4g")


# Create a SparkSession
spark = SparkSession.builder \
    .config(conf = conf) \
    .getOrCreate()

23/06/06 22:42:33 WARN Utils: Your hostname, Tanmays-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.123 instead (on interface en0)
23/06/06 22:42:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/06 22:42:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/06/06 22:42:34 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
df = spark.read.text("/Users/tanmaysingla/Downloads/access_log_Jul95")

In [4]:
df.printSchema()

root
 |-- value: string (nullable = true)



In [5]:
df.count()

                                                                                

1891715

In [6]:
host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'
ts_pattern = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})\]'
method_uri_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'
status_pattern = r'\s(\d{3})\s'
content_size_pattern = r'\s(\d+)$'

In [7]:
logs_df = df.select(regexp_extract('value', host_pattern, 1).alias('host'),
                         regexp_extract('value', ts_pattern, 1).alias('timestamp'),
                         regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
                         regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
                         regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
                         regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
                         regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))
logs_df.show(10, truncate=True)
print((logs_df.count(), len(logs_df.columns)))
logs_df.cache()

+--------------------+--------------------+------+--------------------+--------+------+------------+
|                host|           timestamp|method|            endpoint|protocol|status|content_size|
+--------------------+--------------------+------+--------------------+--------+------+------------+
|        199.72.81.55|01/Jul/1995:00:00...|   GET|    /history/apollo/|HTTP/1.0|   200|        6245|
|unicomp6.unicomp.net|01/Jul/1995:00:00...|   GET| /shuttle/countdown/|HTTP/1.0|   200|        3985|
|      199.120.110.21|01/Jul/1995:00:00...|   GET|/shuttle/missions...|HTTP/1.0|   200|        4085|
|  burger.letters.com|01/Jul/1995:00:00...|   GET|/shuttle/countdow...|HTTP/1.0|   304|           0|
|      199.120.110.21|01/Jul/1995:00:00...|   GET|/shuttle/missions...|HTTP/1.0|   200|        4179|
|  burger.letters.com|01/Jul/1995:00:00...|   GET|/images/NASA-logo...|HTTP/1.0|   304|           0|
|  burger.letters.com|01/Jul/1995:00:00...|   GET|/shuttle/countdow...|HTTP/1.0|   200|    

DataFrame[host: string, timestamp: string, method: string, endpoint: string, protocol: string, status: int, content_size: int]

In [8]:
month_map = {
  'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
  'Aug':8,  'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12
}

def parse_clf_time(text):
    """ Convert Common Log time format into a Python datetime object
    Args:
        text (str): date and time in Apache time format [dd/mmm/yyyy:hh:mm:ss (+/-)zzzz]
    Returns:
        a string suitable for passing to CAST('timestamp')
    """
    # NOTE: We're ignoring the time zones here, might need to be handled depending on the problem you are solving
    if text: 
        return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(
          int(text[7:11]),
          month_map[text[3:6]],
          int(text[0:2]),
          int(text[12:14]),
          int(text[15:17]),
          int(text[18:20])
        )
    return ""

In [9]:
udf_parse_time = udf(parse_clf_time)

logs_df_with_time = (logs_df.select('*', udf_parse_time(logs_df['timestamp']).cast('timestamp').alias('time')).drop('timestamp'))
logs_df_with_time.show(10, truncate=True)

[Stage 7:>                                                          (0 + 1) / 1]

+--------------------+------+--------------------+--------+------+------------+-------------------+
|                host|method|            endpoint|protocol|status|content_size|               time|
+--------------------+------+--------------------+--------+------+------------+-------------------+
|        199.72.81.55|   GET|    /history/apollo/|HTTP/1.0|   200|        6245|1995-07-01 00:00:01|
|unicomp6.unicomp.net|   GET| /shuttle/countdown/|HTTP/1.0|   200|        3985|1995-07-01 00:00:06|
|      199.120.110.21|   GET|/shuttle/missions...|HTTP/1.0|   200|        4085|1995-07-01 00:00:09|
|  burger.letters.com|   GET|/shuttle/countdow...|HTTP/1.0|   304|           0|1995-07-01 00:00:11|
|      199.120.110.21|   GET|/shuttle/missions...|HTTP/1.0|   200|        4179|1995-07-01 00:00:11|
|  burger.letters.com|   GET|/images/NASA-logo...|HTTP/1.0|   304|           0|1995-07-01 00:00:12|
|  burger.letters.com|   GET|/shuttle/countdow...|HTTP/1.0|   200|           0|1995-07-01 00:00:12|


                                                                                

In [10]:
logs_df_with_time.cache()

DataFrame[host: string, method: string, endpoint: string, protocol: string, status: int, content_size: int, time: timestamp]

In [11]:
week_map = {
    1: "Sunday",
    2: "Monday",
    3: "Tuesday",
    4: "Wednesday",
    5: "Thursday",
    6: "Friday",
    7: "Saturday"
}

def parse_day_of_week(dayOfWeek):
    if dayOfWeek:
        return week_map[dayOfWeek]
    else:
        return ""

In [12]:
udf_parse_day = udf(parse_day_of_week)
enpoint_day_of_week_df = logs_df_with_time.select(logs_df_with_time.endpoint, 
                             udf_parse_day(F.dayofweek('time')).alias("dayOfWeek"))
enpoint_day_of_week_df.show(5, truncate=False)

[Stage 8:>                                                          (0 + 1) / 1]

+-----------------------------------------------+---------+
|endpoint                                       |dayOfWeek|
+-----------------------------------------------+---------+
|/history/apollo/                               |Saturday |
|/shuttle/countdown/                            |Saturday |
|/shuttle/missions/sts-73/mission-sts-73.html   |Saturday |
|/shuttle/countdown/liftoff.html                |Saturday |
|/shuttle/missions/sts-73/sts-73-patch-small.gif|Saturday |
+-----------------------------------------------+---------+
only showing top 5 rows



                                                                                

In [13]:
highest_invocations_df = (enpoint_day_of_week_df
                          .groupBy("endpoint", "dayOfWeek")
                          .count()
                          .sort("count", ascending=False)
                          .select(enpoint_day_of_week_df.dayOfWeek.alias("Day in a Week"), enpoint_day_of_week_df.endpoint, "count"))
highest_invocations_df.show(5, truncate=False)

                                                                                

+-------------+--------------------------+-----+
|Day in a Week|endpoint                  |count|
+-------------+--------------------------+-----+
|Thursday     |/images/NASA-logosmall.gif|25140|
|Wednesday    |/images/NASA-logosmall.gif|20415|
|Thursday     |/images/KSC-logosmall.gif |16941|
|Monday       |/images/NASA-logosmall.gif|16624|
|Tuesday      |/images/NASA-logosmall.gif|16034|
+-------------+--------------------------+-----+
only showing top 5 rows



In [14]:
highest_invocations_df.show(1, truncate=False)

[Stage 12:>                                                         (0 + 8) / 8]

+-------------+--------------------------+-----+
|Day in a Week|endpoint                  |count|
+-------------+--------------------------+-----+
|Thursday     |/images/NASA-logosmall.gif|25140|
+-------------+--------------------------+-----+
only showing top 1 row



                                                                                

In [15]:
not_found_df = logs_df_with_time.filter(logs_df["status"] == 404).cache()
yearly_404_sorted_df = (not_found_df
                        .select(F.year("time").alias("Year"))
                        .groupBy("Year")
                        .count()
                        .sort("count", ascending=True).limit(10))
yearly_404_sorted_df.show(10, truncate=False)

[Stage 15:>                                                         (0 + 8) / 8]

+----+-----+
|Year|count|
+----+-----+
|1995|10843|
+----+-----+



                                                                                