# Welcome to CS540 final project

In [1]:
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession


sc = SparkContext()
sqlContext = SQLContext(sc)
spark = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/02 06:43:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable




In [2]:
spark

In [3]:
# load up other dependencies
import re
import glob

In [4]:
raw_data_files=glob.glob('large_log.txt')
raw_data_files

['large_log.txt']

### Schema of data frame

In [5]:
base_df=spark.read.text(raw_data_files)
base_df.printSchema()
type(base_df)
print((base_df.count(), len(base_df.columns)))

root
 |-- value: string (nullable = true)



[Stage 0:>                                                          (0 + 8) / 8]

(3727592, 1)


                                                                                

In [11]:
sample_logs=[item['value'] for item in base_df.take(3)]
sample_logs

['',
 '13.66.139.0 - - [19/Dec/2020:13:57:26 +0100] "GET /index.php?option=com_phocagallery&view=category&id=1:almhuette-raith&Itemid=53 HTTP/1.1" 200 32653 "-" "Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)" "-"',
 '157.48.153.185 - - [19/Dec/2020:14:08:06 +0100] "GET /apache-log/access.log HTTP/1.1" 200 233 "-" "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" "-"']

## Extracting Patterns

In [12]:
host_pattern= r'(^\S+\.[\S+\.]+\S+)\s'
hosts= [re.search(host_pattern, item).group(1)
       if re.search(host_pattern,item)
        else 'no match'
        for item in sample_logs
       ]
hosts

['no match', '13.66.139.0', '157.48.153.185']

In [13]:
#Extracting timestamps
ts_pattern = r"\[([0-9]{2}/[a-zA-Z]{3}/[0-9]{4}:[0-9]{2}:[0-9]{2}:[0-9]{2} [+-][0-9]{4})]"
timestamps = [re.search(ts_pattern, item).group(1)
              if re.search(ts_pattern, item) 
              else 'No TS' for item in sample_logs]
timestamps

['No TS', '19/Dec/2020:13:57:26 +0100', '19/Dec/2020:14:08:06 +0100']

In [14]:
method_uri_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'
method_uri_protocol = [re.search(method_uri_protocol_pattern, item).groups()
if re.search(method_uri_protocol_pattern,item)
else 'No Match'
for item in sample_logs]
method_uri_protocol

['No Match',
 ('GET',
  '/index.php?option=com_phocagallery&view=category&id=1:almhuette-raith&Itemid=53',
  'HTTP/1.1'),
 ('GET', '/apache-log/access.log', 'HTTP/1.1')]

In [15]:
#HTTP/1.1" 200
status_pattern = r" [0-9]{3} "
status= [re.search(status_pattern, item).group(0)[1:-1]
         if re.search(status_pattern,item)
         else "No Match"
        for item in sample_logs]
status

['No Match', '200', '200']

In [16]:
content_size_pattern=r" [0-9]{4}[0-9]* "
content_size = [re.search(content_size_pattern, item).group(0)[1:-1]
                if re.search(content_size_pattern, item)
                else "Null"
               for item in sample_logs]
content_size

['Null', '32653', 'Null']

In [17]:
request_device_pattern=r'((\"http[\S]*\")|(\"-\"))[\S\s]*\"-\"'
request_device=[re.search(request_device_pattern, item).group(0)
if re.search(request_device_pattern,item)
else "Null"
for item in sample_logs]
request_device

['Null',
 '"-" "Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)" "-"',
 '"-" "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" "-"']

In [21]:
from pyspark.sql.functions import regexp_extract
logs_df = base_df.select(regexp_extract('value',host_pattern,1).alias('Host'),
                         regexp_extract('value',ts_pattern,1).alias('Timestamp'),
                         regexp_extract('value',method_uri_protocol_pattern,1).alias('Method'),
                         regexp_extract('value',method_uri_protocol_pattern,2).alias('Endpoint'),
                         regexp_extract('value',method_uri_protocol_pattern,3).alias('Protocol'),
                         regexp_extract('value',status_pattern,0).cast('integer').alias('Status'),
                         regexp_extract('value',content_size_pattern,0).cast('integer').alias('Content_size'),
                         regexp_extract('value',request_device_pattern,0).alias('Requesting_Device')
                        )                         
logs_df.show(2,truncate=True)
logs_df.describe()

+-----------+--------------------+------+--------------------+--------+------+------------+--------------------+
|       Host|           Timestamp|Method|            Endpoint|Protocol|Status|Content_size|   Requesting_Device|
+-----------+--------------------+------+--------------------+--------+------+------------+--------------------+
|           |                    |      |                    |        |  null|        null|                    |
|13.66.139.0|19/Dec/2020:13:57...|   GET|/index.php?option...|HTTP/1.1|   200|       32653|"-" "Mozilla/5.0 ...|
+-----------+--------------------+------+--------------------+--------+------+------------+--------------------+
only showing top 2 rows



                                                                                

DataFrame[summary: string, Host: string, Timestamp: string, Method: string, Endpoint: string, Protocol: string, Status: string, Content_size: string, Requesting_Device: string]

In [22]:
print((logs_df.count(), len(logs_df.columns)))

(3727592, 8)


In [23]:
logs_df.select('host').show()

+--------------+
|          host|
+--------------+
|              |
|   13.66.139.0|
|157.48.153.185|
|157.48.153.185|
|216.244.66.230|
|  54.36.148.92|
| 92.101.35.224|
|73.166.162.225|
|73.166.162.225|
| 54.36.148.108|
|   54.36.148.1|
|162.158.203.24|
|  35.237.4.214|
| 42.236.10.125|
| 42.236.10.125|
| 42.236.10.125|
| 42.236.10.125|
| 42.236.10.117|
| 42.236.10.114|
| 42.236.10.114|
+--------------+
only showing top 20 rows



## Storing the wrangled data to a csv

In [21]:
logs_df.repartition(1).write.options(header='True'
                                     , delimiter='\t'
                                     , lineSep=';'
                                    ).csv('wrangled_log')

                                                                                

In [36]:
out_df = logs_df.select('Timestamp','Host')
out_df.count()

3727592

In [37]:
out_df.show()

+--------------------+--------------+
|           Timestamp|          Host|
+--------------------+--------------+
|                    |              |
|19/Dec/2020:13:57...|   13.66.139.0|
|19/Dec/2020:14:08...|157.48.153.185|
|19/Dec/2020:14:08...|157.48.153.185|
|19/Dec/2020:14:14...|216.244.66.230|
|19/Dec/2020:14:16...|  54.36.148.92|
|19/Dec/2020:14:29...| 92.101.35.224|
|19/Dec/2020:14:58...|73.166.162.225|
|19/Dec/2020:14:58...|73.166.162.225|
|19/Dec/2020:15:09...| 54.36.148.108|
|19/Dec/2020:15:09...|   54.36.148.1|
|19/Dec/2020:15:16...|162.158.203.24|
|19/Dec/2020:15:22...|  35.237.4.214|
|19/Dec/2020:15:23...| 42.236.10.125|
|19/Dec/2020:15:23...| 42.236.10.125|
|19/Dec/2020:15:23...| 42.236.10.125|
|19/Dec/2020:15:23...| 42.236.10.125|
|19/Dec/2020:15:23...| 42.236.10.117|
|19/Dec/2020:15:23...| 42.236.10.114|
|19/Dec/2020:15:23...| 42.236.10.114|
+--------------------+--------------+
only showing top 20 rows



In [38]:
out_df.repartition(1).write.options(header='True'
                                     , delimiter='\t'
                                    ).csv('wrangled_log/only_host_large/')

                                                                                

In [1]:
!ls -ltarh wrangled_log/only_host_large/

total 322760
drwxr-xr-x  10 anmolpal  staff   320B Dec  7 00:58 [34m..[m[m
-rw-r--r--   1 anmolpal  staff   1.2M Dec  7 00:58 .part-00000-e3a8b0af-89cb-4326-bf86-018fe738bd2a-c000.csv.crc
-rw-r--r--   1 anmolpal  staff   147M Dec  7 00:58 part-00000-e3a8b0af-89cb-4326-bf86-018fe738bd2a-c000.csv
-rw-r--r--   1 anmolpal  staff     0B Dec  7 00:58 _SUCCESS
drwxr-xr-x   6 anmolpal  staff   192B Dec  7 00:58 [34m.[m[m
-rw-r--r--   1 anmolpal  staff     8B Dec  7 00:58 ._SUCCESS.crc
