# Complex processing and data pipelines
Learn how to process complex real-world data using Spark and the basics of pipelines.

## Quick pipeline
Before you parse some more complex data, your manager would like to see a simple pipeline example including the basic steps. For this example, you'll want to ingest a data file, filter a few rows, add an ID column to it, then write it out as JSON data.

In [2]:
import pyspark.sql.functions as F

# File Path
file_path = ".../data/datacamp/"

# Load the CSV file
departures_df = spark.read.csv(file_path + "flights_csv/2015-departures.csv", header=True).repartition(100)

# Remove any duration of 0
departures_df = departures_df.filter(departures_df["Actual elapsed time (Minutes)"] > 0)

# Add an ID column
departures_df = departures_df.withColumn('id', F.monotonically_increasing_id())

# Write the file out to JSON format
departures_df.write.json(file_path + "flights_csv/output.json", mode='overwrite')

## Removing commented lines
Your boss would like you to perform some complex parsing on a new dataset. The data represents annotation data for the ImageNet dataset, but focusing specifically on dog breeds and identifying them in images. Before any actual analysis can occur, you'll need to clear out several components of invalid / incorrect data. The general schema of the document is unknown so you'd like to import the rows into a single column, allowing for quick analysis.

To start, you need to remove all commented rows in the dataset.

In [3]:
from pyspark.sql.functions import col

# Import the file to a DataFrame and perform a row count
annotations_df = spark.read.csv(file_path + 'imagenet/annotations.csv', sep='|')
full_count = annotations_df.count()

# Count the number of rows beginning with '#'
comment_count = annotations_df.where(col('_c0').startswith('#')).count()

# Import the file to a new DataFrame, without commented rows
no_comments_df = spark.read.csv(file_path + 'imagenet/annotations.csv', sep='|', comment='#')

# Count the new DataFrame and verify the difference is as expected
no_comments_count = no_comments_df.count()
print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, no_comments_count))

Full count: 32794
Comment count: 1416
Remaining count: 31378


## Removing invalid rows
Now that you've successfully removed the commented rows, you have received some information about the general format of the data. There should be at minimum 5 tab separated columns in the DataFrame. Remember that your original DataFrame only has a single column, so you'll need to split the data on the tab (\t) characters.

In [4]:
annotations_df = no_comments_df
initial_count = no_comments_count

# Split _c0 on the tab character and store the list in a variable
tmp_fields = F.split(annotations_df['_c0'], '\t')

# Create the colcount column on the DataFrame
annotations_df = annotations_df.withColumn('colcount', F.size(tmp_fields))

# Remove any rows containing fewer than 5 fields
annotations_df_filtered = annotations_df.filter(~ (annotations_df.colcount < 5))

# Count the number of rows
final_count = annotations_df_filtered.count()
print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))

Initial count: 31378
Final count: 20580


A big portion of data cleaning is removing data that simply doesn't fit the format needed. You will often have several filter steps within a given data pipeline to mold the data as needed.

## Splitting into columns
You've cleaned up your data considerably by removing the invalid rows from the DataFrame. Now you want to perform some further transformations by generating specific meaningful columns based on the DataFrame content.

In [5]:
# Split the content of _c0 on the tab character (aka, '\t')
split_cols = F.split(annotations_df['_c0'], '\t')

# Add the columns folder, filename, width, and height
split_df = annotations_df.withColumn('folder', split_cols.getItem(0))
split_df = split_df.withColumn('filename', split_cols.getItem(1))
split_df = split_df.withColumn('width', split_cols.getItem(2))
split_df = split_df.withColumn('height', split_cols.getItem(3))

# Add split_cols as a column
split_df = split_df.withColumn('split_cols', split_cols)

You may be wondering why we're not using a schema instead to define the content layout. Spark's CSV parser can't handle advanced types (Arrays or Maps) so it wouldn't process correctly. In our example, we bypass using the types.

## Further parsing
You've molded this dataset into a significantly different format than it was before, but there are still a few things left to do. You need to prep the column data for use in later analysis and remove a few intermediary columns.

In [6]:
from pyspark.sql.types import *

def retriever(cols, colcount):
  # Return a list of dog data
  return cols[4:colcount]

# Define the method as a UDF
udfRetriever = F.udf(retriever, ArrayType(StringType()))

# Create a new column using your UDF
split_df = split_df.withColumn('dog_list', udfRetriever(split_df.split_cols, split_df.colcount))

# Remove the original column, split_cols, and the colcount
split_df = split_df.drop('_c0').drop('split_cols').drop('colcount')

In [7]:
split_df.show(5)

+---------+---------------+-----+------+--------------------+
|   folder|       filename|width|height|            dog_list|
+---------+---------------+-----+------+--------------------+
|025865917| n023521131_781| null|  null|                  []|
|022684404|n029380957_9768| null|  null|                  []|
|021267273|n022910760_9023| null|  null|                  []|
| 02110627|n02110627_12938|  200|   300|[affenpinscher,0,...|
| 02093754| n02093754_1148|  500|   378|[Border_terrier,7...|
+---------+---------------+-----+------+--------------------+
only showing top 5 rows



## Validate rows via join
Another example of filtering data is using joins to remove invalid entries. You'll need to verify the folder names are as expected based on a given DataFrame named valid_folders_df. The DataFrame split_df is as you last left it with a group of split columns.

In [8]:
# Load data
valid_folders_df = spark.read.csv(file_path + 'imagenet/validation.csv').repartition(100)

# Rename the column in valid_folders_df
valid_folders_df = valid_folders_df.withColumnRenamed('_c0', 'folder')

# Count the number of rows in split_df
split_count = split_df.count()

# Join the DataFrames
joined_df = split_df.join(F.broadcast(valid_folders_df), "folder")

# Compare the number of rows remaining
joined_count = joined_df.count()
print("Before: %d\nAfter: %d" % (split_count, joined_count))

Before: 31378
After: 20945


Nicely done - using joins in this fashion drastically simplifies a validation task if your data permits it. The validation data doesn't necessarily need to be loaded from a file - it could be calculated on the fly, or based on a previous dataset. Optimizing these tasks will improve your overall data cleaning process. Note: There are multiple ways to define the join statement. As both DataFrames have a column with the name 'folder', Spark handles this for us automatically.

## Examining invalid rows
You've successfully filtered out the rows using a join, but sometimes you'd like to examine the data that is invalid. This data can be stored for later processing or for troubleshooting your data sources.

You want to find the difference between two DataFrames and store the invalid rows.

In [9]:
# Determine the row counts for each DataFrame
split_count = split_df.count()
joined_count = joined_df.count()

# Create a DataFrame containing the invalid rows
invalid_df = split_df.join(F.broadcast(joined_df), 'folder', 'left')

# Validate the count of the new DataFrame is as expected
invalid_count = invalid_df.count()
print(" split_df:\t%d\n joined_df:\t%d\n invalid_df: \t%d" % (split_count, joined_count, invalid_count))

# Determine the number of distinct folder rows removed
invalid_folder_count = invalid_df.select('folder').distinct().count()
print("%d distinct invalid folders found" % invalid_folder_count)

 split_df:	31378
 joined_df:	20945
 invalid_df: 	3507678
10912 distinct invalid folders found


## Dog parsing
You've done a considerable amount of cleanup on the initial dataset, but now need to analyze the data a bit deeper. There are several questions that have now come up about the type of dogs seen in an image and some details regarding the images. You realize that to answer these questions, you need to process the data into a specific type. Before you can use it, you'll need to create a schema / type to represent the dog details.

In [10]:
# Select the dog details and show 10 untruncated rows
print(joined_df.select('dog_list').show(5, truncate=False))

# Define a schema type for the details in the dog list
DogType = StructType([
	StructField("breed", StringType(), False),
    StructField("start_x", IntegerType(), False),
    StructField("start_y", IntegerType(), False),
    StructField("end_x", IntegerType(), False),
    StructField("end_y", IntegerType(),False)
])

+--------------------------------+
|dog_list                        |
+--------------------------------+
|[affenpinscher,0,9,173,298]     |
|[Border_terrier,73,127,341,335] |
|[kuvasz,0,0,499,327]            |
|[Great_Pyrenees,124,225,403,374]|
|[schipperke,146,29,416,309]     |
+--------------------------------+
only showing top 5 rows

None


Nicely done - you'll use this schema soon to determine some details about the dogs in the data. As you've just seen, schemas can be used for importing data, but they can also be used to simplify accessing information within pre-parsed data. If you're wondering why we didn't just define a full schema for the import, the Spark CSV parser is not capable of using complex schema types using lists.

## Per image count
Your next task in building a data pipeline for this dataset is to create a few analysis oriented columns. You've been asked to calculate the number of dogs found in each image based on your dog_list column created earlier. You have also created the DogType which will allow better parsing of the data within some of the data columns.

In [11]:
# Create a function to return the number and type of dogs as a tuple
def dogParse(doglist):
  dogs = []
  for dog in doglist:
    (breed, start_x, start_y, end_x, end_y) = dog.split(',')
    dogs.append((breed, int(start_x), int(start_y), int(end_x), int(end_y)))
  return dogs

# Create a UDF
udfDogParse = F.udf(dogParse, ArrayType(DogType))

# Use the UDF to list of dogs and drop the old column
joined_df = joined_df.withColumn('dogs', udfDogParse('dog_list')).drop('dog_list')

# Show the number of dogs in the first 10 rows
joined_df.select(F.size('dogs')).show(5)

+----------+
|size(dogs)|
+----------+
|         1|
|         1|
|         1|
|         1|
|         1|
+----------+
only showing top 5 rows



## Percentage dog pixels
The final task for parsing the dog annotation data is to determine the percentage of pixels in each image that represents a dog (or dogs). You'll need to use the various techniques you've learned in this course to help calculate this information and add it as columns for later analysis.

To calculate the percentage of pixels, first calculate the total number of pixels representing each dog then sum them for the image. You can calculate the bounding box with the formula:

(Xend - Xstart) * (Yend - Ystart)

NOTE: You can ignore the possibility of overlapping bounding boxes in this instance.

For the percentage, calculate the total number of "dog" pixels divided by the total size of the image, multiplied by 100.

In [12]:
# Define a UDF to determine the number of pixels per image
def dogPixelCount(doglist):
  totalpixels = 0
  for dog in doglist:
    totalpixels += (dog[3] - dog[1]) * (dog[4] - dog[2])
  return totalpixels

# Define a UDF for the pixel count
udfDogPixelCount = F.udf(dogPixelCount, IntegerType())
joined_df = joined_df.withColumn('dog_pixels', udfDogPixelCount('dogs'))

# Create a column representing the percentage of pixels
joined_df = joined_df.withColumn('dog_percent', (joined_df.dog_pixels / (joined_df.width * joined_df.height)) * 100)

# Show the first 10 annotations with more than 60% dog
joined_df.where('dog_percent > 60').show(10)

+--------+---------------+-----+------+--------------------+----------+-----------------+
|  folder|       filename|width|height|                dogs|dog_pixels|      dog_percent|
+--------+---------------+-----+------+--------------------+----------+-----------------+
|02110627|n02110627_12938|  200|   300|[[affenpinscher, ...|     49997|83.32833333333333|
|02104029|   n02104029_63|  500|   375|[[kuvasz, 0, 0, 4...|    163173|          87.0256|
|02105056| n02105056_2834|  500|   375|[[groenendael, 16...|    112574|60.03946666666666|
|02093647|  n02093647_541|  500|   333|[[Bedlington_terr...|    144640|86.87087087087087|
|02098413| n02098413_1355|  500|   375|[[Lhasa, 39, 1, 4...|    171120|           91.264|
|02093859| n02093859_2309|  330|   500|[[Kerry_blue_terr...|    131878|79.92606060606062|
|02109961| n02109961_1017|  475|   500|[[Eskimo_dog, 43,...|    189189|79.65852631578947|
|02108000| n02108000_3491|  600|   450|[[EntleBucher, 30...|    168667|62.46925925925926|
|02085782|