### Spark Course : Assignment2

#### Compulsory Part :

Implement a Spark application that performs the following operations:
￼
1. Loads the provided input log file for this assignment without specifying the num of partitions.
2. Counts the number occurrences of the word “​info​” in the log
3. Counts the number occurrences of the word “​error​” in the log
4. Saves -each- of the results from steps b and c into a text file format output (one output file for errors , another for info)

From the Spark Web UI answer the following questions:

1. How many jobs were created for your Spark App
2. How many stages did each job split up into
3. How many tasks per stage did your application split up into
4. How many RDD blocks are present in your application

#### Bonus Part

Implement a Spark application that performs the following operations:

1. How many distinct ip addresses appear in this log
2. How many entries of each distinct ip can be found in the log
3. Which is the latest entry in the log for ip = 64.242.88.10

## Compulsory Part

In [44]:
from __future__ import print_function
import sys
from operator import add
from pyspark.sql import SparkSession

In [45]:
# Compulsory Part : 
# 1 : Loads the provided input log file for this assignment without specifying the num of partitions.

In [46]:
lines = sc.textFile('app.log')

In [47]:
# Compulsory Part : 
# 2 : Counts the number occurrences of the word “info” in the log

In [48]:
infos = lines.flatMap(lambda x: x.split(' ')) \
        .filter(lambda x : "info" in x) \
        .map(lambda word : (word, 1)) \
        .reduceByKey(add)

infos.collect()

[('[info]', 96)]

In [49]:
# Compulsory Part : 
# 3 : Counts the number occurrences of the word “error” in the log

In [50]:
errors = lines.flatMap(lambda x: x.split(' ')) \
        .filter(lambda x : "error" in x) \
        .map(lambda word : (word, 1)) \
        .reduceByKey(add)

errors.collect()

[('[error]', 5)]

In [51]:
# Compulsory Part :
# 4 : Saves -each- of the results from steps b and c into a text file format output (one output file for errors , another for info)

In [52]:
# Note that infos and errors are RDD
# So the result of this save are the RDDs partition contents
# These will be directories with the partitions contents
infos.saveAsTextFile('infos_part')
errors.saveAsTextFile('errors_part')

In [53]:
# From the Spark Web UI answer the following questions:
# 1. How many jobs were created for your Spark App
# 2. How many stages did each job split up into
# 3. How many tasks per stage did your application split up into
# 4. How many RDD blocks are present in your application

ANSWERS

1. A total of 4 Jobs have been created :

    - 1 job for the collect action applied to obtain the counts of 'info' occurrences
    - 1 job for the collect action applied to obtain the counts of 'error' occurences
    - 1 job to save the rdd contents of infos
    - 1 job to save the rdd contents of errors

2. A total of 6 Stages have been created :  

    - Each collect job was split in 2 stages : ( 2 job x 2 stage/job  = 4 ) 
    
      This is due to a shuffle boundary introduced by the reduceByKey transformation
    
    - Each saveAsTextFile job only had 1 stage : ( 2 job x 1 stage/job   = 2 )
    
      Note : 1 stage skip due to the same cause. When a shuffle operation is detected
      and because it's an expensive operation Spark automatically caches the generated partitions 
      (totally in heap if they fit) to avoid recomputation from scratch if a new action is then triggered.
      
      In our case that's exactly what is happening. When we called collect() action Spark cached (silently)
      the partitions for us. When we called another action saveAsTextFile() Spark used the cached partitions  
      to avoid recomputation. 
      
      See : https://spark.apache.org/docs/1.5.0/programming-guide.html#performance-impact
      
      
3. A total of 12 Tasks have been created :  

     - Each stage in generated for a collect job was split in 4 tasks ( 2 stages x 4 task/stage = 8 )
     - Each stage in generated for a saveAsTextFile job was split in 2 tasks ( 2 stages x 2 task/stage = 4 )
     
4. http://localhost:4040/executors/ A total of 1 RDD blocks have been generated
 

## Bonus Part

In [54]:
# Bonus Part:
# 1 : How many distinct ip addresses appear in this log

In [55]:
# a 'quick and dirty' way
ips = lines.flatMap(lambda x: x.split("[")) \
        .flatMap(lambda x: x.split("]")) \
        .filter(lambda x : "client" in x) \
        .filter(lambda x : "." in x) \
        .distinct()

In [56]:
ips.collect()

['client 64.242.88.10',
 'client 24.70.56.49',
 'client 24.71.236.129',
 'client 200.174.151.3',
 'client 61.9.4.61',
 'client 81.226.63.194',
 'client 140.113.179.131']

In [57]:
ips.count()

7

In [58]:
# 2. How many entries of each distinct ip can be found in the log
entries = lines.flatMap(lambda x: x.split("[")) \
        .flatMap(lambda x: x.split("]")) \
        .filter(lambda x : "client" in x) \
        .filter(lambda x : "." in x) \
        .map(lambda x : x.split()) \
        .map(lambda ip : (ip[1],1)) \
        .reduceByKey(add)

In [59]:
entries.collect()

[('61.9.4.61', 2),
 ('81.226.63.194', 1),
 ('200.174.151.3', 1),
 ('140.113.179.131', 1),
 ('64.242.88.10', 93),
 ('24.70.56.49', 1),
 ('24.71.236.129', 1)]

In [74]:
# 3. Which is the latest entry in the log for ip = 64.242.88.10

# This part requires some non-trivial coding

import re
import datetime
from pyspark.sql import Row

def parseTime(s):
    """ Create a Datetime object
    
    Typically: datetime.datetime(2003, 8, 4, 12, 30, 45)
    
    Args:
        s (str): date and time (example : "Sun Mar  7 16:02:00 2004")
    Returns:
        datetime: datetime object
    """
    time_parts=s.split(" ")
    
    months_map = dict(
        [("Jan", 1),("Feb", 2),("Mar", 3), \
        ("Apr", 4),("May", 5),("Jun", 6), \
        ("Jul", 7),("Aug", 8),("Sep", 9), \
        ("Oct", 10),("Nov", 11),("Dec", 12)]
        )
        
    return datetime.datetime(int(time_parts[5]),
                             int(months_map[time_parts[1]]),
                             int(time_parts[3]),
                             int(time_parts[4].split(":")[0]),
                             int(time_parts[4].split(":")[1]),
                             int(time_parts[4].split(":")[2])
                            )  

def parseLogLine(line):
    """ Parse a line from the log
    Args:
        logline (str): a line of text in the log
        example : 
        [Sun Mar  7 17:27:37 2004] [info] [client 64.242.88.10] (104)Connection reset by peer: client stopped connection before send body completed
        ====> !! But some lines will not contain all the fields
    Returns:
        Tuple: 
          success : containing the parsed elements from the parsing and a 1
            error : containing the original line and 0
    """
    
    LOG_PATTERN="\[([^]]+)\]"
         
    parts=re.split(LOG_PATTERN, line)
   
    try:
    
        time = parts[1]
        info = parts[3]
        rest = parts[5]
       
        return (
            Row(
                datetime = parseTime(time),
                infofield= info,
                ipaddress= rest 
           ), 1)
    
    except IndexError:
        # Return an empty line
        return (line,0)

In [85]:
# We created a tuple with succesfull (1) parsings and wrong ones (0)
# (1) so filter on succesfull ones 
# (2) retrive only the first entry of the tuple (our data) 
rdd = lines \
        .map(lambda line: parseLogLine(line)) \
        .filter(lambda x : x[1]==1) \
        .map(lambda x : x[0])

In [86]:
rdd.collect()

[Row(datetime=datetime.datetime(2004, 3, 7, 16, 5, 49), infofield='info', ipaddress='client 64.242.88.10'),
 Row(datetime=datetime.datetime(2004, 3, 7, 16, 45, 56), infofield='info', ipaddress='client 64.242.88.10'),
 Row(datetime=datetime.datetime(2004, 3, 7, 17, 13, 50), infofield='info', ipaddress='client 64.242.88.10'),
 Row(datetime=datetime.datetime(2004, 3, 7, 17, 21, 44), infofield='info', ipaddress='client 64.242.88.10'),
 Row(datetime=datetime.datetime(2004, 3, 7, 17, 27, 37), infofield='info', ipaddress='client 64.242.88.10'),
 Row(datetime=datetime.datetime(2004, 3, 7, 17, 58), infofield='info', ipaddress='client 64.242.88.10'),
 Row(datetime=datetime.datetime(2004, 3, 7, 18, 0, 9), infofield='info', ipaddress='client 64.242.88.10'),
 Row(datetime=datetime.datetime(2004, 3, 7, 18, 10, 9), infofield='info', ipaddress='client 64.242.88.10'),
 Row(datetime=datetime.datetime(2004, 3, 7, 18, 19, 1), infofield='info', ipaddress='client 64.242.88.10'),
 Row(datetime=datetime.datet

In [87]:
# Get a Dataframe from this RDD so that we are able to query
# That's why we created a Dataset[Row]
df = rdd.toDF()

In [88]:
df.columns

['datetime', 'infofield', 'ipaddress']

In [92]:
# Now answering question 3 becomes much easier
times=df.filter(df.ipaddress=='client 64.242.88.10').select('datetime')

In [97]:
times.show()

+-------------------+
|           datetime|
+-------------------+
|2004-03-07 16:05:49|
|2004-03-07 16:45:56|
|2004-03-07 17:13:50|
|2004-03-07 17:21:44|
|2004-03-07 17:27:37|
|2004-03-07 17:58:00|
|2004-03-07 18:00:09|
|2004-03-07 18:10:09|
|2004-03-07 18:19:01|
|2004-03-07 18:42:29|
|2004-03-07 18:52:30|
|2004-03-07 18:58:52|
|2004-03-07 19:08:55|
|2004-03-07 19:22:11|
|2004-03-07 19:31:25|
|2004-03-07 19:39:40|
|2004-03-07 19:41:33|
|2004-03-07 19:42:45|
|2004-03-07 20:02:13|
|2004-03-07 20:04:35|
+-------------------+
only showing top 20 rows



In [137]:
reverse_sorted=times.sort(['datetime'],ascending=False)

In [141]:
print(str(reverse_sorted.head()['datetime']))

2004-03-08 14:54:56
