In [2]:
import scala.util.matching
import org.apache.spark.rdd.RDD

case class Cal(year: Int, month: Int, day: Int, hour: Int, minute: Int, second: Int)                

case class Row(host: String,
               client_identd: String,
               user_id: String,
               date_time: Cal,
               method: String,
               endpoint: String,
               protocol: String,
               response_code: Int,
               content_size: Long)
                

val month_map = Map("Jan" -> 1, "Feb" -> 2, 
                    "Mar" -> 3, "Apr" -> 4, 
                    "May" -> 5, "Jun" -> 6, 
                    "Jul" -> 7, "Aug" -> 8,  
                    "Sep" -> 9, "Oct" -> 10, 
                    "Nov" -> 11, "Dec" -> 12)

def parse_apache_time(s: String): Cal = {
    // Convert Apache time format into a Python datetime object
    // Args:
    //    s (str): date and time in Apache time format
    // Returns:
    //   datetime: datetime object (ignore timezone for now)

    return Cal(s.substring(7, 11).toInt,
            month_map(s.substring(3, 6)),
            s.substring(0, 2).toInt,
            s.substring(12, 14).toInt,
            s.substring(15, 17).toInt,
            s.substring(18, 20).toInt)
}

def parseApacheLogLine(logline: String): (Either[Row, String], Int) = {
    // Parse a line in the Apache Common Log format
    // Args:
    //    logline (str): a line of text in the Apache Common Log format
    // Returns:
    //    tuple: either a dictionary containing the parts of the Apache Access Log and 1,
    //           or the original invalid log line and 0
    
    val ret = APACHE_ACCESS_LOG_PATTERN.findAllIn(logline).matchData.toList
    if (ret.isEmpty)
        return (Right(logline), 0)

    val r = ret(0)
    val size_field = r.group(9)

    var size: Long = 0
    if (size_field != "-")
        size = size_field.toLong

    return (Left(Row(
            r.group(1),
            r.group(2),
            r.group(3),
            parse_apache_time(r.group(4)),
            r.group(5),
            r.group(6),
            r.group(7),
            r.group(8).toInt,
            size)), 1)
}

def parseLogs(): (RDD[(Either[Row, String], Int)], RDD[Row], RDD[String]) = {
    val fileName = "data/apache/apache.log"
    
    val parsed_logs = sc.textFile(fileName).map(parseApacheLogLine).cache()
    val access_logs = parsed_logs.filter(x => x._2 == 1).map(x => x._1.left.get)
    val failed_logs = parsed_logs.filter(x => x._2 == 0).map(x => x._1.right.get)

    val failed_logs_count = failed_logs.count()
    
    if (failed_logs_count > 0) {
        println(s"Number of invalid logline: $failed_logs.count()")
        failed_logs.take(20).foreach(println)
    }
    
    println(s"Read $parsed_logs.count() lines, successfully parsed $access_logs.count() lines, and failed to parse $failed_logs.count()")
    
    return (parsed_logs, access_logs, failed_logs)
}

// A regular expression pattern to extract fields from the log line
val APACHE_ACCESS_LOG_PATTERN = """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s*" (\d{3}) (\S+)""".r

val (parsed_logs, access_logs, failed_logs) = parseLogs()
access_logs.take(20).foreach(println)

Read MapPartitionsRDD[9] at map at <console>:108.count() lines, successfully parsed MapPartitionsRDD[11] at map at <console>:109.count() lines, and failed to parse MapPartitionsRDD[13] at map at <console>:110.count()
Row(in24.inetnebr.com,-,-,Cal(1995,8,1,0,0,1),GET,/shuttle/missions/sts-68/news/sts-68-mcc-05.txt,HTTP/1.0,200,1839)
Row(uplherc.upl.com,-,-,Cal(1995,8,1,0,0,7),GET,/,HTTP/1.0,304,0)
Row(uplherc.upl.com,-,-,Cal(1995,8,1,0,0,8),GET,/images/ksclogo-medium.gif,HTTP/1.0,304,0)
Row(uplherc.upl.com,-,-,Cal(1995,8,1,0,0,8),GET,/images/MOSAIC-logosmall.gif,HTTP/1.0,304,0)
Row(uplherc.upl.com,-,-,Cal(1995,8,1,0,0,8),GET,/images/USA-logosmall.gif,HTTP/1.0,304,0)
Row(ix-esc-ca2-07.ix.netcom.com,-,-,Cal(1995,8,1,0,0,9),GET,/images/launch-logo.gif,HTTP/1.0,200,1713)
Row(uplherc.upl.com,-,-,Cal(1995,8,1,0,0,10),GET,/images/WORLD-logosmall.gif,HTTP/1.0,304,0)
Row(slppp6.intermind.net,-,-,Cal(1995,8,1,0,0,10),GET,/history/skylab/skylab.html,HTTP/1.0,200,1687)
Row(piweba4y.prodigy.com,-,-,



In [3]:
val content_sizes = access_logs.map(log => log.content_size).cache()





MapPartitionsRDD[14] at map at <console>:26

In [5]:
println("Content Size Avg: " + (content_sizes.sum / content_sizes.count()) +
         ", Min: " + content_sizes.min() +
         ", Max: " + content_sizes.max())

Content Size Avg: 17528.068382978723, Min: 0, Max: 3421948




In [6]:
val responseCodeToCount = access_logs.map(log => (log.response_code, 1)).reduceByKey(_ + _).cache()





ShuffledRDD[17] at reduceByKey at <console>:26

In [12]:
val responseCodeToCountList = responseCodeToCount.take(100)

println("Found " + responseCodeToCountList.length + " response codes")
print("Response Code Counts: ")
responseCodeToCountList.foreach(x => print(x + " "))

Found 7 response codes
Response Code Counts: (404,3137) (200,426183) (302,6994) (304,33660) (500,2) (403,21) (501,3) 



In [20]:
val hostCountPairTuple = access_logs.map(log => (log.host, 1))
val hostSum = hostCountPairTuple.reduceByKey(_ + _).cache()
val hostMoreThan10 = hostSum.filter(_._2 > 10)

val hostsPick20 = hostMoreThan10.map(x => x._1).take(20)

print("Any 20 hosts that have accessed more then 10 times: ")
hostsPick20.foreach(x => print(x + " "))



Any 20 hosts that have accessed more then 10 times: n1043347.ksc.nasa.gov 193.74.242.28 d02.as1.nisiq.net jcday.nccts.drenet.dnd.ca ip-pdx2-56.teleport.com 192.112.22.82 anx3p4.trib.com 198.77.113.34 204.235.86.107 s9.its.bldrdoc.gov crc182.cac.washington.edu 204.255.92.30 161.243.222.10 telford-107.salford.ac.uk universe6.barint.on.ca gatekeeper.homecare.com kaifmv.tksc.nasda.go.jp unknown.edsa.co.za onyx.southwind.net sunspot.eds.ecip.nagoya-u.ac.jp 



In [23]:
val endpointCounts = access_logs.map(log => (log.endpoint, 1)).reduceByKey(_ + _)
val topEndpoints = endpointCounts.takeOrdered(10)(Ordering[Int].reverse.on(_._2))

print("Top Ten Endpoints: ")
topEndpoints.foreach(x => print(x + " "))

//topEndpoints.take(100).foreach(println)

Top Ten Endpoints: (/images/NASA-logosmall.gif,27035) (/images/KSC-logosmall.gif,21458) (/images/MOSAIC-logosmall.gif,20254) (/images/USA-logosmall.gif,20172) (/images/WORLD-logosmall.gif,20004) (/images/ksclogo-medium.gif,19300) (/ksc.html,13508) (/history/apollo/images/apollo-logo1.gif,11074) (/images/launch-logo.gif,10120) (/,9481) 



In [31]:
val not200 = access_logs.filter(log => log.response_code != 200)
val endpointCountPairTuple = not200.map(log => (log.endpoint, 1))
val endpointSum = endpointCountPairTuple.reduceByKey(_ + _)
val topTenErrURLs = endpointSum.takeOrdered(10)(Ordering[Int].reverse.on(_._2))

print("Top Ten failed URLs: ")
topTenErrURLs.foreach(x => print(x + " "))


Top Ten failed URLs: (/images/NASA-logosmall.gif,3914) (/images/KSC-logosmall.gif,2906) (/images/MOSAIC-logosmall.gif,2107) (/images/USA-logosmall.gif,2079) (/images/WORLD-logosmall.gif,2006) (/images/ksclogo-medium.gif,1929) (/history/apollo/images/apollo-logo1.gif,1067) (/images/launch-logo.gif,1022) (/,871) (/images/ksclogosmall.gif,663) 



In [35]:
val hosts = access_logs.map(log => log.host)
val uniqueHosts = hosts.distinct
val uniqueHostCount = uniqueHosts.count()

println("Unique hosts: " + uniqueHostCount)

Unique hosts: 26672




In [87]:
val dayToHostPairTuple = access_logs.map(log => (log.date_time.year + "-" + 
                                                    log.date_time.month + "-" + 
                                                    log.date_time.day, log.host))
val dayGroupedHosts = dayToHostPairTuple.groupByKey() 
val dayHostCount = dayGroupedHosts.map(gh => (gh._1, gh._2.size))
val dailyHosts = dayGroupedHosts.mapValues(_.toSeq.distinct).cache
val dailyHostsList = dailyHosts.take(30)

//print("Unique hosts per day: ")
//dailyHostsList.foreach(x => print(x + " "))
//dayHostCount.take(100).foreach(println)





Array((1995-8-6,List(www-c2.proxy.aol.com, 206.24.43.12, brother.cc.monash.edu.au, benzene.mit.edu, ts2-004.jaxnet.com, ad11-032.compuserve.com, dd13-037.compuserve.com, p27.s501.c41.k12.wv.us, tia1.eskimo.com, ix-sea4-05.ix.netcom.com, ip022.phx.primenet.com, dialup-10-a-5.gw.umn.edu, ppploc228.asahi-net.or.jp, www-c3.proxy.aol.com, freenet.edmonton.ab.ca, ppp-dc-1-48.ios.com, www-b4.proxy.aol.com, ts3-10.slip.uwo.ca, ppp0d-15.rns.tamu.edu, 202.245.247.11, 204.62.245.32, dd07-064.compuserve.com, piweba4y.prodigy.com, kommpark.edvina.se, ix-wc1-04.ix.netcom.com, piweba5y.prodigy.com, pm4_8.digital.net, pm4_0.digital.net, modem02.homeshopping.com.br, ppp211.interealm.com, cntgatew.cnt.com, ix-sea4-02.ix.netcom.com, netcom20.netcom.com, slip5-68.fl.us.ibm.net, rawnspc.triumf.ca, dd06-008.compuserve.com, pm4_28.digital.net, drjo015a121.embratel.net.br, drjo015a114.embratel.net.br, slip7.island.net, unlinfo2.unl.edu, osk0119.bekkoame.or.jp, ix-nyc14-07.ix.netcom.com, h98-140.ccnet.com, 204

In [97]:
val dayAndHostTuple = access_logs.map(log => (log.date_time.year + "-" + 
                                                    log.date_time.month + "-" + 
                                                    log.date_time.day, log.host))
val groupedByDay = dayAndHostTuple.groupByKey()
val sortedByDay = groupedByDay.sortByKey()
val avgDailyReqPerHost = sortedByDay.map(gh => (gh._1, gh._2.toSeq.distinct.size.toFloat / gh._2.size))
val avgDailyReqPerHostList = avgDailyReqPerHost.take(30)

print("Average number of daily requests per Hosts is ")
avgDailyReqPerHostList.foreach(x => print(x + " "))

Average number of daily requests per Hosts is (1995-8-1,0.07595011) (1995-8-10,0.07385093) (1995-8-11,0.0756654) (1995-8-3,0.077850536) (1995-8-4,0.07035632) (1995-8-5,0.07846212) (1995-8-6,0.07826382) (1995-8-7,0.071589224) (1995-8-8,0.07325995) (1995-8-9,0.071406126) 



In [99]:
val badRecords = access_logs.filter(log => log.response_code == 404).cache()

println("Found " + badRecords.count() + " 404 URLs.")



Found 3137 404 URLs.


In [103]:
val badEndpoints = badRecords.map(r => r.endpoint)
val badUniqueEndpoints = badEndpoints.distinct
val badUniqueEndpointsPick40 = badUniqueEndpoints.take(40)

print("404 URLS: ")
badUniqueEndpointsPick40.foreach(x => println(x + " "))

404 URLS: /PERSONS/NASA-CM. 
/shuttle/missions/sts-1/sts-1-mission.html 
/history/apollo/sa-1/sa-1-patch-small.gif 
/public.win3/winvn 
/shuttle/technology/STS_newsref/spacelab.html 
/shutttle/missions/sts-70/images/KSC-95EC-1059.jpg 
/shuttle/missions/sts-70/images/KSC-95EC-o667.gif" 
/%3Aspacelink.msfc.nasa.gov 
/history/apollo/sa-1/images/ 
/:/spacelink.msfc.nasa.gov 
/sts-71/visitor/ 
/history/apollo/sa-10/sa-10-patch-small.gif 
/elv/updated.gif 
/shuttle/missions/mission.html 
/enterprise 
/space/pub/gif 
/netpro/mlm/index.htm 
/shuttle/missions/sts-83/mission-sts-83.html 
/shuttle/technology/missions/missions.html 
/software/winvn/winvn/html 
/news/sci.space.shuttle/archive/sci-space-shuttle-22-apr-1995-40.txt 
/shuttle/missions/sts-69/mission-sts-69.htlm 
/history/gemini/gemini-12.html 
/shuttle/missions/sts-67/images/k95p0383.txt 
/wwwicons/red.gif 
/shuttle/missions/sts-69/mission_sts-69.htlm 
/ksc.shtml 
/history/apollo/a-004/a-004-patch-small.gif 
/shuttle/html 
/www/shuttle



In [109]:
object ErrOrdering extends Ordering[(String, Int)] {
  def compare(a: (String, Int), b: (String, Int)) = a._2 compare b._2
}

val badEndpointsCountPairTuple = badRecords.map(r => (r.endpoint, 1))
val badEndpointsSum = badEndpointsCountPairTuple.reduceByKey(_ + _)
val badEndpointsTop20 = badEndpointsSum.takeOrdered(20)(ErrOrdering.reverse)

print("Top Twenty 404 URLs: ")
badEndpointsTop20.foreach(x => println(x + " "))



Top Twenty 404 URLs: (/images/nasa-logo.gif,319) 
(/pub/winvn/readme.txt,257) 
(/pub/winvn/release.txt,199) 
(/shuttle/missions/STS-69/mission-STS-69.html,181) 
(/elv/DELTA/uncons.htm,106) 
(/images/Nasa-logo.gif,85) 
(/shuttle/missions/sts-68/ksc-upclose.gif,80) 
(/history/apollo/sa-1/sa-1-patch-small.gif,79) 
(/images/crawlerway-logo.gif,63) 
(/://spacelink.msfc.nasa.gov,55) 
(/history/apollo/a-001/a-001-patch-small.gif,49) 
(/shuttle/resources/orbiters/atlantis.gif,39) 
(/history/apollo/images/little-joe.jpg,36) 
(/history/apollo/pad-abort-test-1/pad-abort-test-1-patch-small.gif,36) 
(/shuttle/countdown/count69.gif,31) 
(/images/lf-logo.gif,27) 
(/history/apollo/sa-5/sa-5-patch-small.gif,24) 
(/shuttle/resources/orbiters/challenger.gif,24) 
(/robots.txt,23) 
(/shuttle/resources/orbiters/discovery.gif,20) 


In [110]:
val errHostsCountPairTuple = badRecords.map(r => (r.host, 1))
val errHostsSum = errHostsCountPairTuple.reduceByKey(_ + _)
val errHostsTop25 = errHostsSum.takeOrdered(25)(Ordering[Int].reverse.on(x => x._2))

print("Top 25 hosts that generated errors: ")
errHostsTop25.foreach(x => println(x + " "))



Top 25 hosts that generated errors: (maz3.maz.net,39) 
(ts8-1.westwood.ts.ucla.edu,37) 
(nexus.mlckew.edu.au,37) 
(piweba3y.prodigy.com,34) 
(spica.sci.isas.ac.jp,27) 
(203.13.168.24,25) 
(203.13.168.17,25) 
(www-c4.proxy.aol.com,23) 
(scooter.pa-x.dec.com,23) 
(onramp2-9.onr.com,22) 
(crl5.crl.com,22) 
(198.40.25.102.sap2.artic.edu,21) 
(msp1-16.nas.mr.net,20) 
(gn2.getnet.com,20) 
(isou24.vilspa.esa.es,19) 
(tigger.nashscene.com,19) 
(dial055.mbnet.mb.ca,19) 
(dialup551.chicago.mci.net,18) 
(cougar.oro.net,17) 
(utsi057.utsi.com,17) 
(quadra_alpha.rollins.edu,17) 
(ix-atl10-08.ix.netcom.com,16) 
(micromatix.jagunet.com,16) 
(redx3.cac.washington.edu,14) 
(gw2.att.com,14) 




In [114]:
val errDateCountPairTuple = badRecords.map(r => ((r.date_time.day), 1))
val errDateSum = errDateCountPairTuple.reduceByKey(_ + _)
val errDateSorted = errDateSum.cache()
val errByDate = errDateSorted.takeOrdered(30)

print("404 Errors by day: ")
errByDate.foreach(x => println(x + " "))




404 Errors by day: (1,243) 
(3,303) 
(4,346) 
(5,234) 
(6,372) 
(7,532) 
(8,381) 
(9,279) 
(10,314) 
(11,133) 


In [115]:
object DateOrdering extends Ordering[(Int, Int)] {
  def compare(a: (Int, Int), b: (Int, Int)) = a._2 compare b._2
}

val topErrDate = errDateSorted.takeOrdered(5)(DateOrdering.reverse)

print("Top Five dates for 404 requests: ")
topErrDate.foreach(x => println(x + " "))

Top Five dates for 404 requests: (7,532) 
(8,381) 
(6,372) 
(4,346) 
(10,314) 




In [118]:
val hourCountPairTuple = badRecords.map(r => ((r.date_time.hour), 1))
val hourRecordsSum = hourCountPairTuple.reduceByKey(_ + _)
val hourRecordsSorted = hourRecordsSum.sortByKey ().cache()
val errHourList = hourRecordsSorted.collect()

print("Top hours for 404 requests: ")
errHourList.foreach(x => println(x + " "))





Top hours for 404 requests: (0,101) 
(1,97) 
(2,343) 
(3,197) 
(4,36) 
(5,27) 
(6,45) 
(7,61) 
(8,121) 
(9,91) 
(10,162) 
(11,136) 
(12,239) 
(13,195) 
(14,104) 
(15,164) 
(16,202) 
(17,134) 
(18,130) 
(19,102) 
(20,157) 
(21,105) 
(22,87) 
(23,101) 
