# Example 1: Parallel Log Parsing with Map and Filter

## Step 1: Data ingest and parsing

In [5]:
from pyspark import SparkConf, SparkContext
import re

This dataset is a debug dump from a Lustre filesystem.  Typically these events occur due to code bugs (LBUG), heavy load, hardware problems, or misbehaving user application IO.

Let's analyze some of the log structure to determine what may have caused this debug dump.

In [6]:
sc

<pyspark.context.SparkContext at 0x7fb1502417d0>

In [7]:
partitions = 64
parlog = sc.textFile("/home/milroy/pyspark/lustre_debug.out", partitions)

Let's take a look at the first five lines of the debug log.  This log is colon-delimited, and roughly corresponds to the following information: 

0-1 describe subsystem ID
2
3 timestamp
4-6 PIDs
7 relevant code module
8 code line
9 function and message

In [8]:
parlog.take(5)

[u'00010000:00080000:2.1F:1433384402.983324:0:0:0:(ldlm_lib.c:2008:target_recovery_expired()) scratch-MDT0000: recovery timed out; 2 clients are still in recovery after 300s (136 clients connected)',
 u'00000100:00080000:22.0:1433439189.202419:0:28364:0:(service.c:789:ptlrpc_update_export_timer()) updating export e0b948f3-2c66-de79-82f6-858c54bcf73f at 1433439189 exp ffff8805feae1000',
 u'00000100:00080000:22.0:1433439189.205134:0:28364:0:(service.c:789:ptlrpc_update_export_timer()) updating export 1915dbbb-3d37-10c8-c161-d533b76bcbcb at 1433439189 exp ffff8805cf6dec00',
 u'00000100:00080000:22.0:1433439189.208987:0:28364:0:(service.c:789:ptlrpc_update_export_timer()) updating export 37147439-d85e-5002-0501-0315cbd3a063 at 1433439189 exp ffff8805cf5f7800',
 u'00000100:00080000:22.0:1433439189.213463:0:28364:0:(service.c:789:ptlrpc_update_export_timer()) updating export ed7c79f9-b42b-ce30-ba52-38106df6aaed at 1433439189 exp ffff8805b6a0dc00',
 u'00000100:00080000:22.0:1433439189.215960:

Now let's split each line of the RDD into lowercase "words".

Lambda functions are ubiquitous in Spark, I presume due to the functional programming underpinnings of Scala.  They act on each partition in parallel, and operate on each line.

In [66]:
words = parlog.map(lambda line: re.split('\W+', line.lower().strip()))

Notice that this map returns immediately; no actions have been taken- the DAG has been updated to prepare for transformations.  I like to think of this as analogous to a page fault, but applying to a Directed Acyclic Graph.

In [67]:
words.take(2)

[[u'00010000',
  u'00080000',
  u'2',
  u'1f',
  u'1433384402',
  u'983324',
  u'0',
  u'0',
  u'0',
  u'ldlm_lib',
  u'c',
  u'2008',
  u'target_recovery_expired',
  u'scratch',
  u'mdt0000',
  u'recovery',
  u'timed',
  u'out',
  u'2',
  u'clients',
  u'are',
  u'still',
  u'in',
  u'recovery',
  u'after',
  u'300s',
  u'136',
  u'clients',
  u'connected',
  u''],
 [u'00000100',
  u'00080000',
  u'22',
  u'0',
  u'1433439189',
  u'202419',
  u'0',
  u'28364',
  u'0',
  u'service',
  u'c',
  u'789',
  u'ptlrpc_update_export_timer',
  u'updating',
  u'export',
  u'e0b948f3',
  u'2c66',
  u'de79',
  u'82f6',
  u'858c54bcf73f',
  u'at',
  u'1433439189',
  u'exp',
  u'ffff8805feae1000'],
 [u'00000100',
  u'00080000',
  u'22',
  u'0',
  u'1433439189',
  u'205134',
  u'0',
  u'28364',
  u'0',
  u'service',
  u'c',
  u'789',
  u'ptlrpc_update_export_timer',
  u'updating',
  u'export',
  u'1915dbbb',
  u'3d37',
  u'10c8',
  u'c161',
  u'd533b76bcbcb',
  u'at',
  u'1433439189',
  u'exp',
  u'f

## Part 2: Counting Occurences

My experience with Lustre affords me (some) insight into this- I know the system has been susceptible to MDS overloading due to applications creating tons of small files, or issuing lots of MDS RPCs.  I want to look for all lines that contain mfd changes.

Let's apply a filter to this RDD.  Let's create a new RDD that only contains lines with mfs changes.

In [34]:
mfds = words.filter(lambda x: 'mfd' and 'change' in x)

Did it work?

In [35]:
mfds.take(2)

[[u'00000004',
  u'00080000',
  u'22',
  u'0',
  u'1433439244',
  u'706760',
  u'0',
  u'1563',
  u'0',
  u'mdt_open',
  u'c',
  u'646',
  u'mdt_mfd_set_mode',
  u'change',
  u'mfd',
  u'ffff88056d872d40',
  u'mode',
  u'0x0',
  u'0x42'],
 [u'00000004',
  u'00080000',
  u'22',
  u'0',
  u'1433439244',
  u'734649',
  u'0',
  u'13592',
  u'0',
  u'mdt_open',
  u'c',
  u'646',
  u'mdt_mfd_set_mode',
  u'change',
  u'mfd',
  u'ffff8805698b3240',
  u'mode',
  u'0x0',
  u'0x42'],
 [u'00000004',
  u'00080000',
  u'22',
  u'0',
  u'1433439244',
  u'774632',
  u'0',
  u'1444',
  u'0',
  u'mdt_open',
  u'c',
  u'646',
  u'mdt_mfd_set_mode',
  u'change',
  u'mfd',
  u'ffff880566f0bac0',
  u'mode',
  u'0x0',
  u'0x42'],
 [u'00000004',
  u'00080000',
  u'22',
  u'0',
  u'1433439244',
  u'781809',
  u'0',
  u'1565',
  u'0',
  u'mdt_open',
  u'c',
  u'646',
  u'mdt_mfd_set_mode',
  u'change',
  u'mfd',
  u'ffff880570df6ec0',
  u'mode',
  u'0x0',
  u'0x42'],
 [u'00000004',
  u'00080000',
  u'22',
  u'

Now we issue an action to the RDD: the DAG performs the lazily executed functions.  In this case we count the number of lines in the mfds RDD.

In [42]:
mfds.count()

285051

And as a percent of the overall file?

In [None]:
'{0:0.2f}%'.format((mfds.count()/float(parlog.count()))*100)

Now let's determine the effect of the flatMap: this behaves like map, but does not return a list for each line.  Rather, it aggregates (flattens) the output into a single list.

In [68]:
flatwords = parlog.flatMap(lambda line: re.split('\W+', line.lower().strip()))

Now filter out "words" longer than 2 characters.

In [71]:
longwords = flatwords.filter(lambda x: len(x) > 2 )

In [72]:
longwords.take(10)

[u'00010000',
 u'00080000',
 u'1433384402',
 u'983324',
 u'ldlm_lib',
 u'2008',
 u'target_recovery_expired',
 u'scratch',
 u'mdt0000',
 u'recovery']

To sort words by number of occurences we map each word of each line to a tuple: itself and 1.  We will perform a reduction on these tuples to get counts.

In [73]:
longwords = longwords.map(lambda word: (word, 1))

We utilize reduceByKey: this operation performs a function on identical keys.  By default this will be the first element of the tuple.  Since this will be the word, the behavior is desired.

Note that reduce operations are accumulators and must be associative.

In [74]:
longcount = longwords.reduceByKey(lambda a, b: a + b)

In [75]:
longcount.take(10)

[(u'378466', 2),
 (u'145204', 1),
 (u'988531', 3),
 (u'265971', 1),
 (u'611459', 1),
 (u'874193', 1),
 (u'708674099127', 2),
 (u'708674808631', 1),
 (u'012489', 1),
 (u'ffff8805846873c0', 2)]

We swap the order of the tuple's contents to sort by the number rather than words.  The argument "False" passed to sortByKey instructs it to sort descending.

In [76]:
longwords = longcount.map(lambda x: (x[1], x[0])).sortByKey(False)

In [77]:
longwords.take(20)

[(697191, u'00080000'),
 (464831, u'00000004'),
 (314189, u'0x0'),
 (285058, u'mdt_open'),
 (285051, u'mdt_mfd_set_mode'),
 (285051, u'mfd'),
 (285051, u'change'),
 (285051, u'mode'),
 (285051, u'646'),
 (223284, u'transno'),
 (155387, u'00000100'),
 (155375, u'exp'),
 (155367, u'export'),
 (155358, u'service'),
 (155357, u'ptlrpc_update_export_timer'),
 (155353, u'789'),
 (155353, u'updating'),
 (154458, u'reply'),
 (150641, u'state'),
 (150641, u'request')]