In [1]:
import findspark
findspark.init()

from pyspark import SparkContext
sc = SparkContext("local", "pyspark-shell")

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# Complex processing and data pipelines

## Introduction to data pipelines 

A set of steps to process data from sources to final output. A data pipeline can consist of any number of steps or components and can span many systems. A data pipeline consists of inputs, transformations and outputs of those steps. There is also validation and analysis steps. 


### Quick pipeline


In [2]:
import pyspark.sql.functions as F 
departures_df = spark.read.csv("AA_DFW_2015_Departures_Short.csv", header=True)
departures_df = departures_df.filter(departures_df[3] > 0)
departures_df = departures_df.withColumn("id", F.monotonically_increasing_id())
# departures_df.write.json("output.json", mode="overwrite")

## Data handling techniques

Spark's CSV parser can automatically remove blank lines, can remove comments using an optional argument (comment=...), can handle header fields.

### Removing commented lines

In [36]:
annotations_df = spark.read.csv('annotations.csv', sep="|", header=True)
full_count = annotations_df.count()

comment_count = annotations_df.where(F.col('_c0').startswith('#')).count()
no_comments_df = spark.read.csv('annotations.csv', sep="|", comment='#',header=True)
no_comments_count = no_comments_df.count()
print("Full count: %d\nComment count: %d\nRemaining count: %d" % (full_count, comment_count, no_comments_count))

Full count: 32794
Comment count: 1416
Remaining count: 31378


### Removing invalid rows

In [43]:
tmp_fields = F.split(no_comments_df["_c0"], " ")
initial_count = no_comments_df.count()
no_comments_df = no_comments_df.withColumn("colcount", F.size(tmp_fields))
annotations_df_filtered = no_comments_df.filter(~(no_comments_df.colcount < 5))

final_count = annotations_df_filtered.count()
print("Initial count: %d\nFinal count: %d" % (initial_count, final_count))

Initial count: 31378
Final count: 20580


### Splitting into columns

In [46]:
annotations_df_filtered.show(truncate=False)

+-------------------------------------------------------------------------------------+--------+
|_c0                                                                                  |colcount|
+-------------------------------------------------------------------------------------+--------+
|02110627 n02110627_12938 200 300 affenpinscher,0,9,173,298                           |5       |
|02093754 n02093754_1148 500 378 Border_terrier,73,127,341,335                        |5       |
|%s %s 800 600 Shetland_sheepdog,124,87,576,514                                       |5       |
|02104029 n02104029_63 500 375 kuvasz,0,0,499,327                                     |5       |
|02111500 n02111500_5137 500 375 Great_Pyrenees,124,225,403,374                       |5       |
|02104365 n02104365_7518 500 333 schipperke,146,29,416,309                            |5       |
|02105056 n02105056_2834 500 375 groenendael,168,0,469,374                            |5       |
|02093647 n02093647_541 500 33

In [63]:
split_cols = F.split(annotations_df_filtered["_c0"], " ")

split_df = annotations_df_filtered.withColumn("folder", split_cols.getItem(0))
split_df = split_df.withColumn("filename", split_cols.getItem(1))
split_df = split_df.withColumn("width", split_cols.getItem(2))
split_df = split_df.withColumn("height", split_cols.getItem(3))

split_df = split_df.withColumn("split_cols", split_cols)

split_df.show(truncate=False)

+-------------------------------------------------------------------------------------+--------+--------+---------------+-----+------+--------------------------------------------------------------------------------------------+
|_c0                                                                                  |colcount|folder  |filename       |width|height|split_cols                                                                                  |
+-------------------------------------------------------------------------------------+--------+--------+---------------+-----+------+--------------------------------------------------------------------------------------------+
|02110627 n02110627_12938 200 300 affenpinscher,0,9,173,298                           |5       |02110627|n02110627_12938|200  |300   |[02110627, n02110627_12938, 200, 300, affenpinscher,0,9,173,298]                            |
|02093754 n02093754_1148 500 378 Border_terrier,73,127,341,335                        |5

In [64]:
split_df.toPandas()

Unnamed: 0,_c0,colcount,folder,filename,width,height,split_cols
0,02110627 n02110627_12938 200 300 affenpinscher...,5,02110627,n02110627_12938,200,300,"[02110627, n02110627_12938, 200, 300, affenpin..."
1,02093754 n02093754_1148 500 378 Border_terrier...,5,02093754,n02093754_1148,500,378,"[02093754, n02093754_1148, 500, 378, Border_te..."
2,"%s %s 800 600 Shetland_sheepdog,124,87,576,514",5,%s,%s,800,600,"[%s, %s, 800, 600, Shetland_sheepdog,124,87,57..."
3,"02104029 n02104029_63 500 375 kuvasz,0,0,499,327",5,02104029,n02104029_63,500,375,"[02104029, n02104029_63, 500, 375, kuvasz,0,0,..."
4,02111500 n02111500_5137 500 375 Great_Pyrenees...,5,02111500,n02111500_5137,500,375,"[02111500, n02111500_5137, 500, 375, Great_Pyr..."
...,...,...,...,...,...,...,...
20575,"02096585 n02096585_12716 500 335 Boston_bull,1...",5,02096585,n02096585_12716,500,335,"[02096585, n02096585_12716, 500, 335, Boston_b..."
20576,02097047 n02097047_1495 375 500 miniature_schn...,5,02097047,n02097047_1495,375,500,"[02097047, n02097047_1495, 375, 500, miniature..."
20577,"02098413 n02098413_11467 390 390 Lhasa,7,51,27...",5,02098413,n02098413_11467,390,390,"[02098413, n02098413_11467, 390, 390, Lhasa,7,..."
20578,02112706 n02112706_726 560 615 Brabancon_griff...,5,02112706,n02112706_726,560,615,"[02112706, n02112706_726, 560, 615, Brabancon_..."


### Further parsing

In [115]:
from pyspark.sql.types import ArrayType, StringType

def retriever(cols, colcount):
    return cols[4:colcount]

udfRetriever = F.udf(retriever, ArrayType(StringType()))

split_df_parsed = split_df.withColumn("dog_list", udfRetriever(split_df.split_cols, split_df.colcount))

split_df_parsed = split_df_parsed.drop("_c0").drop("split_cols").drop("colcount")
split_df_parsed.show()

+--------+---------------+-----+------+--------------------+
|  folder|       filename|width|height|            dog_list|
+--------+---------------+-----+------+--------------------+
|02110627|n02110627_12938|  200|   300|[affenpinscher,0,...|
|02093754| n02093754_1148|  500|   378|[Border_terrier,7...|
|      %s|             %s|  800|   600|[Shetland_sheepdo...|
|02104029|   n02104029_63|  500|   375|[kuvasz,0,0,499,327]|
|02111500| n02111500_5137|  500|   375|[Great_Pyrenees,1...|
|02104365| n02104365_7518|  500|   333|[schipperke,146,2...|
|02105056| n02105056_2834|  500|   375|[groenendael,168,...|
|02093647|  n02093647_541|  500|   333|[Bedlington_terri...|
|02098413| n02098413_1355|  500|   375|[Lhasa,39,1,499,373]|
|02093859| n02093859_2309|  330|   500|[Kerry_blue_terri...|
|02100583|  n02100583_702|  500|   333|[vizsla,112,93,27...|
|02109961| n02109961_1017|  475|   500|[Eskimo_dog,43,20...|
|02096177|n02096177_11642|  500|   375|[cairn,71,2,319,302]|
|02108000| n02108000_349

## Data validation

Validation is verifying that a dataset complies with the expected format included number of rows and columns. 

One technique to validate data in Spark is using joins to verify th content of a DataFrame matches a known set. It is fast vs validating individual rows against a long list of entries.

### Validate rows via join

In [119]:
valid_folders_df = spark.read.csv("valid_folders.csv",header=True)
valid_folders_df = valid_folders_df.toPandas()
valid_folders_df["_c0"] = valid_folders_df["_c0"].str.split()
for i in range(1105):
    valid_folders_df.loc[i] = valid_folders_df.loc[i][0][1]
valid_folders_df = spark.createDataFrame(valid_folders_df)

valid_folders_df = valid_folders_df.withColumnRenamed("_c0", "folder")
split_count = split_df_parsed.count()

joined_df = split_df_parsed.join(F.broadcast(valid_folders_df), "folder")
joined_count = joined_df.count()
print("Before: %d\nAfter: %d" % (split_count, joined_count))

Before: 20580
After: 19956


### Examining invalid rows

In [122]:
split_count = split_df_parsed.count()
joined_count = joined_df.count()

invalid_df = split_df_parsed.join(F.broadcast(joined_df),"folder", "left_anti")
invalid_count = invalid_df.count()
print(" split_df:\t%d\n joined_df:\t%d\n invalid_df: \t%d" % (split_count, joined_count, invalid_count))

invalid_folder_count = invalid_df.select("folder").distinct().count()
print("%d distinct invalid folders found" % invalid_folder_count)

 split_df:	20580
 joined_df:	19956
 invalid_df: 	624
1 distinct invalid folders found


## Final analysis and delivery

Analysis calculations are the process of using the columns of data in a DataFrame to compute some useful value using Spark's functionality.

### Dog parsing

In [129]:
print(joined_df.select("dog_list").show(10, truncate=False))

from pyspark.sql.types import StructType, StructField, IntegerType
DogType = StructType([
    StructField("breed", StringType(), False),
    StructField("start_x", IntegerType(), False),
    StructField("start_y", IntegerType(), False),
    StructField("end_x", IntegerType(), False),
    StructField("end_y", IntegerType(), False)
])

+----------------------------------+
|dog_list                          |
+----------------------------------+
|[affenpinscher,0,9,173,298]       |
|[Border_terrier,73,127,341,335]   |
|[kuvasz,0,0,499,327]              |
|[Great_Pyrenees,124,225,403,374]  |
|[schipperke,146,29,416,309]       |
|[groenendael,168,0,469,374]       |
|[Bedlington_terrier,10,12,462,332]|
|[Lhasa,39,1,499,373]              |
|[Kerry_blue_terrier,17,16,300,482]|
|[vizsla,112,93,276,236]           |
+----------------------------------+
only showing top 10 rows

None


### Per image count

In [130]:
def dogParse(doglist):
    dogs = []
    for dog in doglist:
        (breed, start_x, start_y, end_x, end_y) = dog.split(",")
        dogs.append((breed, int(start_x), int(start_y), int(end_x), int(end_y)))
    return dogs

udfDogParse = F.udf(dogParse, ArrayType(DogType))

joined_df = joined_df.withColumn("dogs", udfDogParse("dog_list")).drop("dog_list")
joined_df.select(F.size("dogs")).show(10)

+----------+
|size(dogs)|
+----------+
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
|         1|
+----------+
only showing top 10 rows



### Percentage dog pixels

In [134]:
def dogPixelCount(doglist):
    totalpixels = 0
    for dog in doglist:
        totalpixels += (dog[3] - dog[1]) * (dog[4] - dog[2])
    return totalpixels

udfDogPixelCount = F.udf(dogPixelCount, IntegerType())
joined_df = joined_df.withColumn("dog_pixels", udfDogPixelCount("dogs"))
joined_df = joined_df.withColumn("dog_percent", (joined_df.dog_pixels/(joined_df.width * joined_df.height))*100)
joined_df.filter("dog_percent > 60").show(10)

+--------+---------------+-----+------+--------------------+----------+-----------------+
|  folder|       filename|width|height|                dogs|dog_pixels|      dog_percent|
+--------+---------------+-----+------+--------------------+----------+-----------------+
|02110627|n02110627_12938|  200|   300|[{affenpinscher, ...|     49997|83.32833333333333|
|02104029|   n02104029_63|  500|   375|[{kuvasz, 0, 0, 4...|    163173|          87.0256|
|02105056| n02105056_2834|  500|   375|[{groenendael, 16...|    112574|60.03946666666666|
|02093647|  n02093647_541|  500|   333|[{Bedlington_terr...|    144640|86.87087087087087|
|02098413| n02098413_1355|  500|   375|[{Lhasa, 39, 1, 4...|    171120|           91.264|
|02093859| n02093859_2309|  330|   500|[{Kerry_blue_terr...|    131878|79.92606060606062|
|02109961| n02109961_1017|  475|   500|[{Eskimo_dog, 43,...|    189189|79.65852631578947|
|02108000| n02108000_3491|  600|   450|[{EntleBucher, 30...|    168667|62.46925925925926|
|02085782|