md
You should remove the `raise` exceptions below and insert your code in their place. The cells which say `DO NOT CHANGE THE CONTENT OF THIS CELL` are there to help you, if they fail, it's probably an indication of the fact that your code is wrong. You should not change their content - if you change them to make them correspond to what your program is producing, you will still not get the marks.

If you encounter an error while running your notebook that doesn't appear to be connected to RDDs (such as missing `imp`), you should check that you've run the initialization cells since you've started your latest cluster.

Before you turn your solution in, make sure everything runs as expected. With an attached cluster, you should **Clear State and Results** (under the **Clear** dropdown menu) and then click on the **Run all** icon. This runs all cells in the notebook from new. You should only submit this notebook if all cells run.

This homework is to be completed on your own. By the act of following these instructions and handing your work in, it is deemed that you have read and understand the rules on plagiarism as written in your student handbook.

# Scrubbing data

A common part of the ETL process is data scrubbing. This homework asks you to process data in order to get it into a standardized format for later processing.

The file `devicestatus.txt.zip` is available from Blackboard. This file contains data collected from mobile devices on a network, including device ID, current status, location and so on. Because the company previously acquired other mobile provider's networks, the data from different subnetworks has a different format. Note that the records in this file have different field delimiters: some use commas, some use pipes (|), and so on. 

This notebook will guide you through the steps of scrubbing this dataset. Follow the instructions carefully. In general, every time you execute a step, there is a check that tests whether you carried out that step correctly. You should not move on until a step is passing the test. All your processing should be done in this notebook. It is assumed that you have uncompressed the `devicestatus.txt` file and have placed it in `/FileStore/` on DBFS using the steps described in the labs this week.

The first two cells perform some checks: first one checks that you are running this notebook on Databricks, and the second cell ensures that your file is starting off in `/FileStore/devicestatus.txt` You should not (try to) change either of these cells.

In [0]:
# DO NOT CHANGE THE CONTENT OF THIS CELL
import sys
if not 'dbruntime.dbutils' in sys.modules.keys():
    import pyspark
    sc = pyspark.SparkContext()
    print("Unless you're grading this homework, you should be running this on Databricks.")

In [0]:
# DO NOT CHANGE THE CONTENT OF THIS CELL
if 'dbruntime.dbutils' in sys.modules.keys():
    try:
        dbutils.fs.ls("/FileStore/devicestatus.txt")
    except:
        assert False, "It is assumed that you've put your (unzipped) devicestatus.txt file in the DBFS /FileStore"
else:    
    import os
    assert os.path.exists("/FileStore/devicestatus.txt") == True, "It is assumed that you've put your (unziped) devicestatus.txt file in the DBFS /FileStore"

Load the `devicestatus.txt` dataset into a variable called `myRDD`.

In [0]:
# Read the devicestatus.txt file into an RDD. Your answer should have the following format (without comment tag)
myRDD = sc.textFile("/FileStore/devicestatus.txt")

In [0]:
# DO NOT CHANGE THE CONTENT OF THIS CELL
assert myRDD.count() == 459540, "It doesn't look like you've read in your data correctly"

Determine which delimiter(s) to use and divide up each line of the RDD into a list of its fields. Your result should be in a variale called `splitRDD`.

In [0]:
# The format of your answer should be
# splitRDD = ...
# YOUR CODE HERE
Delimiter = myRDD.map(lambda delimiter: delimiter.replace("|",",")).map(lambda delimiter: delimiter.replace("/", ","))
splitRDD=Delimiter.map(lambda line: line.split(","))

In [0]:
# DO NOT CHANGE THE CONTENT OF THIS CELL
assert splitRDD.sortBy(lambda x: x[0]).take(2) == [[u'2014-03-15:10:10:20',
  u'Sorrento F41L',
  u'8cc3b47e-bd01-4482-b500-28f2342679af',
  u'7',
  u'24',
  u'39',
  u'enabled',
  u'disabled',
  u'connected',
  u'55',
  u'67',
  u'12',
  u'33.6894754264',
  u'-117.543308253'],
 [u'2014-03-15:10:10:20',
  u'MeeToo 1.0',
  u'ef8c7564-0a1a-4650-a655-c8bbd5f8f943',
  u'0',
  u'31',
  u'63',
  u'70',
  u'39',
  u'27',
  u'enabled',
  u'enabled',
  u'enabled',
  u'37.4321088904',
  u'-121.485029632']], "Unexpected entries in the first couple of lines"

Filter out any records which do not parse correctly. The records which do have the correct number of fields should be placed in an RDD named `filteredRDD`.

In [0]:
# The format of your answer should be
# filteredRDD = ...
# YOUR CODE HERE
filteredRDD = splitRDD.filter(lambda line: len(line) == 14)

In [0]:
# DO NOT CHANGE THE CONTENT OF THIS CELL
assert filteredRDD.count() == 459540, "You may have forgotten to account for some delimiters"

Extract the date (first field), model (second field), device ID (third field), latitude and longitude (13th and 14th fields respectively). Each record's list should therefore reduced to a list of the 5 fields only. The output should be placed in a variable named `extractedRDD`.

In [0]:
# The format of your solution should be
# extractedRDD = ...
# YOUR CODE HERE
extractedRDD = splitRDD.map(lambda fields: [fields[0],fields[1],fields[2],fields[12],fields[13]])

In [0]:
# DO NOT CHANGE THE CONTENT OF THIS CELL
assert extractedRDD.sortBy(lambda x: x[0]).take(2) == [[u'2014-03-15:10:10:20',
  u'Sorrento F41L',
  u'8cc3b47e-bd01-4482-b500-28f2342679af',
  u'33.6894754264',
  u'-117.543308253'],
 [u'2014-03-15:10:10:20',
  u'MeeToo 1.0',
  u'ef8c7564-0a1a-4650-a655-c8bbd5f8f943',
  u'37.4321088904',
  u'-121.485029632']], "There seems to be some discrepancy between your answer and the expected answer"

The second field contains the device manufacturer and model name (e.g. Ronin S2). Split this field by spaces to separate the manufacturer from the model (i.e. manufacturer Ronin, model S2). Each resulting record should therefore now contain a list containing: date, manufacturer, model, device ID, latitude and longitude. The new RDD should be placed in a variable named `separatedRDD`.

In [0]:
# The format of your solution should be
# separatedRDD = ...
# YOUR CODE HERE
separatedRDD = extractedRDD.map(lambda line : [line[0], line[1].split()[0], line[1].split()[1], line[2], line[3], line[4]])
separatedRDD.take(3)

Out[29]: [['2014-03-15:10:10:20',
  'Sorrento',
  'F41L',
  '8cc3b47e-bd01-4482-b500-28f2342679af',
  '33.6894754264',
  '-117.543308253'],
 ['2014-03-15:10:10:20',
  'MeeToo',
  '1.0',
  'ef8c7564-0a1a-4650-a655-c8bbd5f8f943',
  '37.4321088904',
  '-121.485029632'],
 ['2014-03-15:10:10:20',
  'MeeToo',
  '1.0',
  '23eba027-b95a-4729-9a4b-a3cca51c5548',
  '39.4378908349',
  '-120.938978486']]

In [0]:
# DO NOT CHANGE THE CONTENT OF THIS CELL
assert separatedRDD.sortBy(lambda x: x[0]).take(2) == [[u'2014-03-15:10:10:20',
  u'Sorrento',
  u'F41L',
  u'8cc3b47e-bd01-4482-b500-28f2342679af',
  u'33.6894754264',
  u'-117.543308253'],
 [u'2014-03-15:10:10:20',
  u'MeeToo',
  u'1.0',
  u'ef8c7564-0a1a-4650-a655-c8bbd5f8f943',
  u'37.4321088904',
  u'-121.485029632']], "There is some discrepancy between your answer and the expected answer"

# Remove the output directory for the following section if it already exists
if 'dbruntime.dbutils' in sys.modules.keys():
    try:
        dbutils.fs.ls("/FileStore")
        dbutils.fs.ls("/FileStore/devicestatus_etl/")
        dbutils.fs.rm("/FileStore/devicestatus_etl", True)
    except:
        # Directory is not there yet
        pass
else:
    if os.path.exists("/FileStore/devicestatus_etl/"):
        import shutil
        shutil.rmtree("/FileStore/devicestatus_etl/")

Save the extracted data to comma delimited text files in the `/FileStore/devicestatus_etl` directory on DBFS.

In [0]:
# YOUR CODE HERE
savedEXTRACTEDdata = separatedRDD.map(lambda line: ",".join(str(a) for a in line))
savedEXTRACTEDdata.saveAsTextFile("/FileStore/devicestatus_etl")



In [0]:
# DO NOT CHANGE THE CONTENT OF THIS CELL
if 'dbruntime.dbutils' in sys.modules.keys():
    assert dbutils.fs.ls("/FileStore/devicestatus_etl"), "You don't appear to have created the required output directory"
    assert dbutils.fs.ls("/FileStore/devicestatus_etl/part-00000"), "The output doesn't appear to be as expected"
    assert dbutils.fs.head("/FileStore/devicestatus_etl/part-00000", 99) == "2014-03-15:10:10:20,Sorrento,F41L,8cc3b47e-bd01-4482-b500-28f2342679af,33.6894754264,-117.543308253", "Expecting different output to that produced"
else:
    assert os.path.exists("/FileStore/devicestatus_etl"), "You don't appear to have created the required output directory"
    assert os.path.exists("/FileStore/devicestatus_etl/part-00000"), "The output doesn't appear to be as expected"
    
    # Check contents of the first line
    with open("/FileStore/devicestatus_etl/part-00000") as f:
        first_line = f.readline().strip()
        assert first_line == "2014-03-15:10:10:20,Sorrento,F41L,8cc3b47e-bd01-4482-b500-28f2342679af,33.6894754264,-117.543308253", "Expecting different output to that produced"
    
# Count the number of lines in the second part
myRDD = sc.textFile("/FileStore/devicestatus_etl/part-00001")
assert myRDD.count() == 229802, "Unexpected number of lines in part-00001"

[Truncated to first 99 bytes]
