In [None]:
import io, os, requests, zipfile
from pyspark.sql import Row
from datetime import datetime

In [None]:
date = "20031127"
year = date[:4]
Qtr = (int(date[4:6])-1)//3+1; print('Qtr', Qtr)
#
data_directory = './'
csv_file = os.path.join(data_directory, f'log{date}.csv')
print(csv_file)

Qtr 4
./log20031127.csv


In [None]:
if not os.path.exists(csv_file):
    #zip_file_url = f'http://www.sec.gov/dera/data/Public-EDGAR-log-file-data/{year}/Qtr{Qtr:d}/log{date}.zip'
    zip_file_url = "https://www.sec.gov/dera/data/Public-EDGAR-log-file-data/2024/Qtr3/log20240930.zip"
    print(zip_file_url)
    r = requests.get(zip_file_url)
    z = zipfile.ZipFile(io.BytesIO(r.content))
    z.extractall(data_directory)
else:
    pass  # The file is already there

https://www.sec.gov/dera/data/Public-EDGAR-log-file-data/2024/Qtr3/log20240930.zip


BadZipFile: File is not a zip file

In [None]:
# Specify the file path
zip_file_path = 'log20031127.zip'
# Specify the directory to extract to (current directory in this case)
extract_to = './'

# Check if the zip file exists
if os.path.exists(zip_file_path):
    # Open the zip file
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        # Extract all the contents
        zip_ref.extractall(extract_to)
    print(f"Unzipped {zip_file_path} successfully.")
else:
    print(f"{zip_file_path} does not exist.")

Unzipped log20031127.zip successfully.


### Read the file into RDD

In Jupyter notebook the SparkSession `spark` and the SparkContext `sc` are created for us

In [None]:
pip install pyspark



In [None]:
sc

''

We create "logFileRDD" of the file by reading it as a collection of lines. For a start we have a look at the first several lines

In [None]:
from pyspark.sql import SparkSession
import os

# Initialize Spark session (if not already initialized)
spark = SparkSession.builder \
    .appName("LogFileProcessing") \
    .getOrCreate()

# Define the path to the CSV file
# csv_file = "path/to/your/file.csv"  # Replace this with the actual file path

# Assert that the file exists
assert os.path.exists(csv_file), f"{csv_file} does not exist. Please check the file path."

# Load the CSV file as an RDD
logFileRDD = spark.sparkContext.textFile(csv_file).cache()

# Show the first 5 rows of the RDD
print("First 5 lines of the RDD:")
print(logFileRDD.take(5))

# Convert the RDD to a DataFrame (if you want to perform DataFrame operations)
logFileDF = spark.read.option("header", "true").csv(csv_file)

# Show the first 5 rows of the DataFrame
print("First 5 rows of the DataFrame:")
logFileDF.show(5)

# Stop the Spark session when done (optional in Colab)
# spark.stop()


First 5 lines of the RDD:
['ip,date,time,zone,cik,accession,extention,code,size,idx,norefer,noagent,find,crawler,browser', '69.41.134.beg,2003-11-27,00:00:44,500.0,1023139.0,0001157523-03-006384,.txt,200.0,36131.0,0.0,0.0,0.0,4.0,0.0,mac', '141.149.186.iee,2003-11-27,00:00:45,500.0,98338.0,0001072613-03-001600,-index.htm,200.0,3214.0,1.0,0.0,0.0,1.0,0.0,win', '141.149.186.iee,2003-11-27,00:00:47,500.0,98338.0,0001072613-03-001600,form10q_12237.txt,200.0,51204.0,0.0,0.0,0.0,9.0,0.0,win', '209.111.89.ggf,2003-11-27,00:00:50,500.0,839430.0,0001019687-03-002390,.txt,304.0,,0.0,0.0,0.0,1.0,0.0,mie']
First 5 rows of the DataFrame:
+---------------+----------+--------+-----+---------+--------------------+-----------------+-----+-------+---+-------+-------+----+-------+-------+
|             ip|      date|    time| zone|      cik|           accession|        extention| code|   size|idx|norefer|noagent|find|crawler|browser|
+---------------+----------+--------+-----+---------+------------------

The file has a header now, which should be stripped. The format of the file is diffrent with 15 fields (variables).

In [None]:
header = logFileRDD.first() #extract header
logFileRDD = logFileRDD.filter(lambda row: row != header)

We briefly summarize the content of the fields
1. ip
2. date - Apache log file date (yyyy-mm-dd)
3. time - Apache log file time (hh:mm:ss)
4. zone - Apache log file zone
5. cik - SEC Central Index Key (CIK) associated with the document requested
6. accession - accession SEC document accession number associated with the document requested
7. extention - filename
8. code - Apache log file status code for the request
9. size - document file size
10. idx - takes on a value of 1 if the requester landed on the index page of a set of documents
11. norefer
12. noagent
13. find
14. crawler
15. browser

Let us parse each line of our RDD accordingly

### Construct regular expression for parsing

the common regexp patterns can be found here [https://regexpattern.com/date-time/](https://regexpattern.com/date-time/)

In [None]:
import re
# A regular expression pattern to extract fields from the log line
re_ip = '\d{1,3}\.\d{1,3}\.\d{1,3}\.\w{3}'
re_date = '\d{4}\-\d{2}\-\d{2}'
re_time = '\d{2}:\d{2}:\d{2}'
re_zone = '\d+\.?\d*'
re_cik = '\d+\.?\d*'
re_accession = '[\w\-]+'
re_filename = '[\w\-\.]+'
re_code = '\d+\.?\d*'
re_size = '\d+\.?\d*'
re_idx = '[01]\.?0?'
re_norefer = '[01]\.?0?'
re_noagent = '[01]\.?0?'
re_find = '1?[0-9]\.?0?'
re_crawler = '[01]\.?0?'
re_browser = '[a-z]{3}'
#
LOG_PATTERN_EDGAR = f'^({re_ip:s}),({re_date:s}),({re_time:s}),({re_zone:s}),({re_cik:s}),\
({re_accession:s}),({re_filename:s}),({re_code:s}),({re_size:s}),({re_idx:s}),\
({re_norefer:s}),({re_noagent:s}),({re_find:s}),({re_crawler:s}),({re_browser:s})$'
print(LOG_PATTERN_EDGAR)
pattern=re.compile(LOG_PATTERN_EDGAR)

^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\w{3}),(\d{4}\-\d{2}\-\d{2}),(\d{2}:\d{2}:\d{2}),(\d+\.?\d*),(\d+\.?\d*),([\w\-]+),([\w\-\.]+),(\d+\.?\d*),(\d+\.?\d*),([01]\.?0?),([01]\.?0?),([01]\.?0?),(1?[0-9]\.?0?),([01]\.?0?),([a-z]{3})$


In [None]:
# answer from regex101.com
# ^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\w{3}),(\d{4}\-\d{2}\-\d{2}),(\d{2}:\d{2}:\d{2}),(\d+\.?\d*),(\d+\.?\d*),([\w\-]+),([\w\-\.]+),(\d+\.?\d*),(\d+\.?\d*),([01]\.?0?),([01]\.?0?),([01]\.?0?),(1?[0-9]\.?0?),([01]\.?0?),([a-z]{3})$

In [None]:
# check correct pattern for datetime - we neglect the time zone shift for the moment
#import datetime
#stri = "24/Oct/1994:13:41:41 -0600"
#dt = datetime.datetime.strptime(stri[:20], "%d/%b/%Y:%H:%M:%S")
#print(dt)
from datetime import datetime
datetime.strptime('2003-10-03', "%Y-%m-%d").date()
datetime.strptime('14:02:13', "%H:%M:%S").time()

datetime.time(14, 2, 13)

We will parse each text line into `pyspark.sql.Row`

In [None]:
def parseApacheLogLine(logline):
    """ Parse a line in the Apache Common Log format
    Inputs:
        logline (str): a line of text in the Apache Common Log format
    Outputs:
        tuple: either a dictionary containing the parts of the Apache Access Log and 1,
               or the original invalid log line and 0
    """
    match = re.search(pattern, logline)
    if match is None:   # failed match
        #print('failed  ', logline)
        return (logline, 0)

    #size_field = match.group(9)
    #if size_field == '-':
    #    size = float(0)
    #else:
    #    size = float(match.group(9))

    parsed_row = Row(
        ip          = match.group(1),
        date = datetime.strptime(match.group(2), "%Y-%m-%d").date(),
        time       = datetime.strptime(match.group(3), "%H:%M:%S").time(),
        zone        = match.group(4),
        cik      = match.group(5),
        accesion      = match.group(6),
        filename = match.group(7),
        response_code = int(float(match.group(8))),
        content_size  = match.group(9), # size,
        idx = bool(match.group(10)),
        browser = match.group(15)
    )
    #print('parsed   ', logline)

    return (parsed_row, 1)

We will keep track of correctly parsed and failed logs are returned those as two RDDs

In [None]:
def access_fail_logs(parsed_logs):
    """ Read and parse log file, print a 20-sample of failed log-lines
    Inputs:
        parsed_logs (RDD): an RDD obtained via parseApacheLogLine(...)
    Outputs:
        tuple of RDDs: access_logs, failed_logs
    """
    access_logs = (parsed_logs
                   .filter(lambda s: s[1] == 1)
                   .map(lambda s: s[0])
                   .cache())

    failed_logs = (parsed_logs
                   .filter(lambda s: s[1] == 0)
                   .map(lambda s: s[0]))
    failed_logs_count = failed_logs.count()
    if failed_logs_count > 0:
        print(f'Number of invalid logline: {failed_logs.count():d}')
        for line in failed_logs.take(20):
            print(f'Invalid logline: {line}')

    print(f'Read {parsed_logs.count():d} lines, successfully parsed { access_logs.count():d} lines, \
           failed to parse {failed_logs.count():d} lines')
    return access_logs, failed_logs

In [None]:
logFileRDD.take(3)

['69.41.134.beg,2003-11-27,00:00:44,500.0,1023139.0,0001157523-03-006384,.txt,200.0,36131.0,0.0,0.0,0.0,4.0,0.0,mac',
 '141.149.186.iee,2003-11-27,00:00:45,500.0,98338.0,0001072613-03-001600,-index.htm,200.0,3214.0,1.0,0.0,0.0,1.0,0.0,win',
 '141.149.186.iee,2003-11-27,00:00:47,500.0,98338.0,0001072613-03-001600,form10q_12237.txt,200.0,51204.0,0.0,0.0,0.0,9.0,0.0,win']

In [None]:
parseApacheLogLine('69.41.134.beg,2003-11-27,00:00:44,500.0,1023139.0,0001157523-03-006384,.txt,200.0,36131.0,0.0,0.0,0.0,4.0,0.0,mac')

(Row(ip='69.41.134.beg', date=datetime.date(2003, 11, 27), time=datetime.time(0, 0, 44), zone='500.0', cik='1023139.0', accesion='0001157523-03-006384', filename='.txt', response_code=200, content_size='36131.0', idx=True, browser='mac'),
 1)

In [None]:
parsedLogsRDD = logFileRDD.map(parseApacheLogLine).cache()

In [None]:
parsedLogsRDD.take(1)

[(Row(ip='69.41.134.beg', date=datetime.date(2003, 11, 27), time=datetime.time(0, 0, 44), zone='500.0', cik='1023139.0', accesion='0001157523-03-006384', filename='.txt', response_code=200, content_size='36131.0', idx=True, browser='mac'),
  1)]

In [None]:
accessLogsRDD, failedLogsRDD = access_fail_logs(parsedLogsRDD)

Number of invalid logline: 11100
Invalid logline: 209.111.89.ggf,2003-11-27,00:00:50,500.0,839430.0,0001019687-03-002390,.txt,304.0,,0.0,0.0,0.0,1.0,0.0,mie
Invalid logline: 151.191.175.fdf,2003-11-27,00:01:08,500.0,874255.0,0000950130-00-000268,-index.html,304.0,,1.0,0.0,0.0,1.0,0.0,win
Invalid logline: 61.115.76.jbf,2003-11-27,00:01:11,500.0,1271300.0,9999999997-03-042782,.txt,200.0,1436.0,0.0,1.0,0.0,0.0,0.0,
Invalid logline: 67.118.98.jfc,2003-11-27,00:01:11,500.0,1115143.0,0001266275-03-000002,primary_doc.xml,304.0,,0.0,1.0,0.0,0.0,0.0,win
Invalid logline: 61.115.76.jbf,2003-11-27,00:01:12,500.0,901430.0,0000885725-03-000128,.txt,200.0,8352.0,0.0,1.0,0.0,0.0,0.0,
Invalid logline: 61.115.76.jbf,2003-11-27,00:01:13,500.0,790500.0,9999999997-03-042788,.txt,200.0,1815.0,0.0,1.0,0.0,0.0,0.0,
Invalid logline: 61.115.76.jbf,2003-11-27,00:01:14,500.0,779336.0,9999999997-03-042786,.txt,200.0,1698.0,0.0,1.0,0.0,0.0,0.0,
Invalid logline: 61.115.76.jbf,2003-11-27,00:01:15,500.0,1052837.0,0001

### Explore parsed data

How can we be sure that our parsing delivered meaningful results? Let us have a look at the unique values of the "method", "response code", and "protocol".

In [None]:
def distinct_responsecodes_browsers(accessLogsRDD):
    """
    Prints distinct values for  response codes and browsers
    Inputs:
        accessLogsRDD
    """
    ResponseCodesRDD = accessLogsRDD.map(lambda log: log.response_code)
    uniqueResponseCodesRDD = ResponseCodesRDD.distinct()
    print("Response codes are", sorted(uniqueResponseCodesRDD.collect()))
    BrowserRDD = accessLogsRDD.map(lambda log: log.browser)
    uniqueBrowserRDD = BrowserRDD.distinct()
    print("Browsers are", sorted(uniqueBrowserRDD.collect()))
    #return MethodsRDD, ResponseCodesRDD, ProtocolsRDD
    return None

distinct_responsecodes_browsers(accessLogsRDD)

Response codes are [200, 206, 302, 404]
Browsers are ['lin', 'mac', 'mie', 'opr', 'win']


## Adjust parsing


In [None]:
# A regular expression pattern to extract fields from the log line
re_ip = '\d{1,3}\.\d{1,3}\.\d{1,3}\.\w{3}'
re_date = '\d{4}\-\d{2}\-\d{2}'
re_time = '\d{2}:\d{2}:\d{2}'
re_zone = '\d+\.?\d*'
re_cik = '\d+\.?\d*'
re_accession = '[\w\-]+'
re_filename = '[\w\-\.]+'
re_code = '\d+\.?\d*'
re_size = '\d*\.?\d*'
re_idx = '[01]\.?0?'
re_norefer = '[01]\.?0?'
re_noagent = '[01]\.?0?'
re_find = '1?[0-9]\.?0?'
re_crawler = '[01]\.?0?'
re_browser = '[a-z]{0,3}'
#
LOG_PATTERN_EDGAR = f'^({re_ip:s}),({re_date:s}),({re_time:s}),({re_zone:s}),({re_cik:s}),\
({re_accession:s}),({re_filename:s}),({re_code:s}),({re_size:s}),({re_idx:s}),\
({re_norefer:s}),({re_noagent:s}),({re_find:s}),({re_crawler:s}),({re_browser:s})$'
print(LOG_PATTERN_EDGAR)
pattern=re.compile(LOG_PATTERN_EDGAR)

^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\w{3}),(\d{4}\-\d{2}\-\d{2}),(\d{2}:\d{2}:\d{2}),(\d+\.?\d*),(\d+\.?\d*),([\w\-]+),([\w\-\.]+),(\d+\.?\d*),(\d*\.?\d*),([01]\.?0?),([01]\.?0?),([01]\.?0?),(1?[0-9]\.?0?),([01]\.?0?),([a-z]{0,3})$


In [None]:
def parseApacheLogLine(logline):
    """ Parse a line in the Apache Common Log format
    Inputs:S
        logline (str): a line of text in the Apache Common Log format
    Outputs:
        tuple: either a dictionary containing the parts of the Apache Access Log and 1,
               or the original invalid log line and 0
    """
    match = re.search(pattern, logline)
    if match is None:   # failed match
        print('failed  ', logline)
        return (logline, 0)

    size_field = match.group(9)
    if size_field:
        size = float(match.group(9))
    else:
        size = float(0)

    browser_field = match.group(15)
    if browser_field:
        browser = browser_field
    else:
        browser = 'not_found'

    parsed_row = Row(
        ip          = match.group(1),
        date = datetime.strptime(match.group(2), "%Y-%m-%d").date(),
        time       = datetime.strptime(match.group(3), "%H:%M:%S").time(),
        zone        = match.group(4),
        cik      = match.group(5),
        accesion      = match.group(6),
        filename = match.group(7),
        response_code = int(float(match.group(8))),
        content_size  = size,
        idx = bool(match.group(10)),
        browser = browser
    )
    print('parsed   ', logline)

    return (parsed_row, 1)

In [None]:
#parseApacheLogLine('208.252.214.jbf,2003-03-03,00:00:00,500.0,919642.0,0000891836-02-000291,-index.htm,304.0,,1.0,0.0,0.0,1.0,0.0,mie')

parsed    208.252.214.jbf,2003-03-03,00:00:00,500.0,919642.0,0000891836-02-000291,-index.htm,304.0,,1.0,0.0,0.0,1.0,0.0,mie


In [None]:
#parseApacheLogLine('209.172.247.haf,2003-03-03,00:00:04,500.0,78003.0,0000914121-02-001461,pf121702-8k.txt,200.0,2527.0,0.0,0.0,0.0,9.0,0.0,win')

parsed    209.172.247.haf,2003-03-03,00:00:04,500.0,78003.0,0000914121-02-001461,pf121702-8k.txt,200.0,2527.0,0.0,0.0,0.0,9.0,0.0,win


In [None]:
#parseApacheLogLine('66.48.138.ach,2003-03-03,00:01:52,500.0,1037949.0,0001047469-03-006343,.txt,200.0,13224.0,0.0,1.0,0.0,0.0,0.0,')

parsed    66.48.138.ach,2003-03-03,00:01:52,500.0,1037949.0,0001047469-03-006343,.txt,200.0,13224.0,0.0,1.0,0.0,0.0,0.0,


In [None]:
parsedLogsRDD1 = logFileRDD.map(parseApacheLogLine).cache()

In [None]:
parsedLogsRDD1.take(1)

[(Row(ip='69.41.134.beg', date=datetime.date(2003, 11, 27), time=datetime.time(0, 0, 44), zone='500.0', cik='1023139.0', accesion='0001157523-03-006384', filename='.txt', response_code=200, content_size=36131.0, idx=True, browser='mac'),
  1)]

In [None]:
accessLogsRDD1, failedLogsRDD1 = access_fail_logs(parsedLogsRDD1)

Number of invalid logline: 687
Invalid logline: 67.118.98.jfc,2003-11-27,00:02:31,500.0,1115143.0,0001266275-03-000002,xslF345X02/primary_doc.xml,200.0,34998.0,0.0,1.0,0.0,0.0,0.0,win
Invalid logline: 164.164.89.djf,2003-11-27,00:03:44,500.0,45599.0,0001209191-03-027965,xslF345X02/doc4.xml,200.0,15391.0,0.0,0.0,0.0,9.0,0.0,win
Invalid logline: 32.103.193.egd,2003-11-27,00:04:07,500.0,1078271.0,0001241404-03-000021,xslF345X02/edgar.xml,200.0,25594.0,0.0,0.0,0.0,10.0,0.0,win
Invalid logline: 202.71.146.jdd,2003-11-27,00:05:16,500.0,851968.0,0000925177-03-000001,xslF345X02/edgardoc.xml,200.0,17754.0,0.0,0.0,0.0,9.0,0.0,win
Invalid logline: 64.85.248.aed,2003-11-27,00:07:04,500.0,1023731.0,0001054779-03-000003,xslF345X02/primary_doc.xml,200.0,16985.0,0.0,0.0,0.0,9.0,0.0,win
Invalid logline: 164.164.89.djf,2003-11-27,00:07:19,500.0,1086319.0,0000919567-03-000072,xslF345X02/gasco-form3_ex.xml,200.0,25160.0,0.0,0.0,0.0,9.0,0.0,win
Invalid logline: 164.164.89.djf,2003-11-27,00:11:25,500.0,1258

Checking again that results are meaningful

In [None]:
distinct_responsecodes_browsers(accessLogsRDD1)

Response codes are [200, 206, 302, 304, 400, 404]
Browsers are ['lin', 'mac', 'mie', 'not_found', 'opr', 'win']


Again we check that the results are reasonable

In [None]:
import importlib
import pattern_EDGAR as pE; importlib.reload(pE)
import utils; importlib.reload(utils)

<module 'utils' from '/content/utils.py'>

In [None]:
parsedLogsRDD1 = logFileRDD.map(pE.parseApacheLogLine).cache()

In [None]:
parsedLogsRDD1.take(3)

[(Row(ip='69.41.134.beg', date=datetime.date(2003, 11, 27), time=datetime.time(0, 0, 44), zone='500.0', cik='1023139.0', accesion='0001157523-03-006384', filename='.txt', response_code=200, content_size=36131.0, idx=True, browser='mac'),
  1),
 (Row(ip='141.149.186.iee', date=datetime.date(2003, 11, 27), time=datetime.time(0, 0, 45), zone='500.0', cik='98338.0', accesion='0001072613-03-001600', filename='-index.htm', response_code=200, content_size=3214.0, idx=True, browser='win'),
  1),
 (Row(ip='141.149.186.iee', date=datetime.date(2003, 11, 27), time=datetime.time(0, 0, 47), zone='500.0', cik='98338.0', accesion='0001072613-03-001600', filename='form10q_12237.txt', response_code=200, content_size=51204.0, idx=True, browser='win'),
  1)]

In [None]:
accessLogsRDD1, failedLogsRDD1 = utils.access_fail_logs(parsedLogsRDD1)

In [None]:
failedLogsRDD1.count()

687

In [None]:
uniqueResponseCodes, uniqueBrowsers = pE.distinct_responsecodes_browsers(accessLogsRDD)

In [None]:
print('uniqueResponseCodes', uniqueResponseCodes)
print('uniqueBrowsers', uniqueBrowsers)

uniqueResponseCodes [200, 206, 302, 404]
uniqueBrowsers ['lin', 'mac', 'mie', 'opr', 'win']


## Spark Streaming

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

import pattern_EDGAR as pE  #; importlib.reload(pE)
import utils;

In [None]:
#sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

In [None]:
lines = ssc.socketTextStream("localhost", 8890)

In [None]:
parsedLogsRDD1 = logFileRDD.map(pE.parseApacheLogLine).cache()

In [None]:
parsedLogsRDD1.pprint()

In [None]:
ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

In [None]:
# run Netcat as a data server
%sh
cat ./data/log20030303.csv | nc -u localhost 8890 -w0