In [1]:
import pandas as pd
import os
import re
import sys
from pyspark.sql import SparkSession

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

<h1>Loading and Viewing the NASA Log Dataset</h1>

In [2]:
spark = SparkSession.builder \
    .appName("MyApp") \
    .getOrCreate()

sc = spark.sparkContext

In [3]:
import glob

raw_data_files = glob.glob('access_log_Jul95') + glob.glob('access_log_Aug95')

print(raw_data_files)

['access_log_Jul95', 'access_log_Aug95']


<h3>Taking a look at the metadata of our dataframe</h3>

In [4]:
base_df = spark.read.text(raw_data_files)
base_df.printSchema()

root
 |-- value: string (nullable = true)


In [5]:
type(base_df)

pyspark.sql.dataframe.DataFrame

*Below is the conversion of the DataFrame to an RDD.*

In [6]:
base_df_rdd = base_df.rdd
type(base_df_rdd)

pyspark.rdd.RDD

<h3>Viewing sample data in our dataframe</h3>

In [7]:
base_df.show(10, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                  |
+-----------------------------------------------------------------------------------------------------------------------+
|199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245                                 |
|unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985                      |
|199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085   |
|burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0               |
|199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179|
|burger.letters.com - - 

*It can be seen as above, our data needs to be wrangled and parsed*

*Accessing data from an RDD is somewhat different. The following RDD illustrates how the data representation varies.*

In [8]:
base_df_rdd.take(10)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (DESKTOP-B083T20 executor driver): java.net.SocketException: Connection reset by peer
	at java.base/sun.nio.ch.SocketDispatcher.write0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:54)
	at java.base/sun.nio.ch.NioSocketImpl.tryWrite(NioSocketImpl.java:394)
	at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:413)
	at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:440)
	at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:819)
	at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1195)
	at java.base/java.io.BufferedOutputStream.implWrite(BufferedOutputStream.java:217)
	at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:200)
	at java.base/java.io.DataOutputStream.write(DataOutputStream.java:115)
	at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:110)
	at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:310)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:322)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:322)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:80)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:751)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.net.SocketException: Connection reset by peer
	at java.base/sun.nio.ch.SocketDispatcher.write0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:54)
	at java.base/sun.nio.ch.NioSocketImpl.tryWrite(NioSocketImpl.java:394)
	at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:413)
	at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:440)
	at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:819)
	at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1195)
	at java.base/java.io.BufferedOutputStream.implWrite(BufferedOutputStream.java:217)
	at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:200)
	at java.base/java.io.DataOutputStream.write(DataOutputStream.java:115)
	at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:110)
	at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:310)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:322)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:322)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:80)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:751)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)


In [9]:
# Sample a smaller subset of the DataFrame/RDD
base_df_sample = base_df.sample(0.1)  # Sample 10% of the DataFrame
base_df_sample.rdd.take(10)

[Row(value='burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 304 0'),
 Row(value='d104.aa.net - - [01/Jul/1995:00:00:15 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 200 786'),
 Row(value='smyth-pc.moorecap.com - - [01/Jul/1995:00:00:38 -0400] "GET /history/apollo/apollo-13/images/70HC314.GIF HTTP/1.0" 200 101267'),
 Row(value='205.189.154.54 - - [01/Jul/1995:00:00:40 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 200 786'),
 Row(value='ppp-mia-30.shadow.net - - [01/Jul/1995:00:00:41 -0400] "GET /images/USA-logosmall.gif HTTP/1.0" 200 234'),
 Row(value='www-a1.proxy.aol.com - - [01/Jul/1995:00:01:09 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985'),
 Row(value='port26.annex2.nwlink.com - - [01/Jul/1995:00:01:27 -0400] "GET /images/MOSAIC-logosmall.gif HTTP/1.0" 200 363'),
 Row(value='slip1.yab.com - - [01/Jul/1995:00:01:29 -0400] "GET /shuttle/resources/orbiters/endeavour.gif HTTP/1.0" 200 16991'),
 Row(value='unicomp6.unicomp.net - 

<h1>Data Wrangling</h1>

<h3>Data Understanding</h3>

*The data shown above follows the <u>Common Log Format</u>.*

*The fields are: <b>remotehost&nbsp;&nbsp;&nbsp;&nbsp;rfc931&nbsp;&nbsp;&nbsp;&nbsp;authuser&nbsp;&nbsp;&nbsp;&nbsp;[date]&nbsp;&nbsp;&nbsp;&nbsp;"request"&nbsp;&nbsp;&nbsp;&nbsp;status&nbsp;&nbsp;&nbsp;&nbsp;bytes</b>*


| Field      | Meaning                                                                                  |
|------------|------------------------------------------------------------------------------------------|
| remotehost | Remote hostname (or IP number if DNS hostname is not available or if DNSLookup is off). |
| rfc931     | The remote logname of the user if at all it is present.                                 |
| authuser   | The username of the remote user after authentication by the HTTP server.                |
| [date]     | Date and time of the request.                                                           |
| "request"  | The request, exactly as it came from the browser or client.                             |
| status     | The HTTP status code the server sent back to the client.                                |
| bytes      | The number of bytes (Content-Length) transferred to the client.                         |
