In [1]:
#Gather relevant keys from our Secret Scope
ServicePrincipalID = dbutils.secrets.get(scope = "Analysts", key = "SPID")
ServicePrincipalKey = dbutils.secrets.get(scope = "Analysts", key = "SPKey")
DirectoryID = dbutils.secrets.get(scope = "Analysts", key = "DirectoryID")
DBUser = dbutils.secrets.get(scope = "Analysts", key = "DBUser")
DBPassword = dbutils.secrets.get(scope = "Analysts", key = "DBPword")


#Combine DirectoryID into full string
Directory = "https://login.microsoftonline.com/{}/oauth2/token".format(DirectoryID)

#Configure our ADLS Gen 2 connection with our service principal details
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", ServicePrincipalID)
spark.conf.set("fs.azure.account.oauth2.client.secret", ServicePrincipalKey)
spark.conf.set("fs.azure.account.oauth2.client.endpoint", Directory)

### Working with dirty data
In this example, we're going to look at a file that doesn't necessarily fit our expected schema. We will look at methods of rejecting rows and building resilient pipelines.

We are expecting the data to conform to a specified schema, so we'll use a hardcoded schema and see how this can be applied to our dubious dataset.

There are three different modes for parsing text files:
 - PERMISSIVE - Allow all rows through but nullify all columns for malformed rows
 - FAILFAST - Error if any row cannot be parsed
 - DROPMALFORMED - Drop any rows that cannot be parsed from the dataset

In [3]:
# Bring in the libraries needed to read JSON and convert to StructTypes
import json
from pyspark.sql.types import *

mySchema = StructType([
  StructField("LocationID", IntegerType(), True),
  StructField("Borough", StringType(), True),
  StructField("Zone", StringType(), True),
  StructField("service_zone", StringType(), True)])

# Now let's load the data, allowing any malformed records to come through
df = (spark
       .read
       .option("header","true")
       .option("mode", "PERMISSIVE")
       .schema(mySchema)
       .csv("abfss://root@dblake.dfs.core.windows.net/RAW/Public/TaxiZones/V1/taxi+_zone_lookup.csv")
     )

# If we look at the results, everything seems fine - but we might spot a NULL row or two in there
display(df.select("LocationID", "Borough", "Zone", "service_zone"))

LocationID,Borough,Zone,service_zone
1.0,EWR,Newark Airport,EWR
2.0,Queens,Jamaica Bay,Boro Zone
3.0,Bronx,Allerton/Pelham Gardens,Boro Zone
4.0,Manhattan,Alphabet City,Yellow Zone
5.0,Staten Island,Arden Heights,Boro Zone
6.0,Staten Island,Arrochar/Fort Wadsworth,Boro Zone
7.0,Queens,Astoria,Boro Zone
8.0,Queens,Astoria Park,Boro Zone
9.0,Queens,Auburndale,Boro Zone
10.0,Queens,Baisley Park,Boro Zone


In [4]:
# Now let's load the data, allowing any malformed records to come through
failfastdf = (spark
       .read
       .option("header","true")
       .option("mode", "FAILFAST")
       .schema(mySchema)
       .csv("abfss://root@dblake.dfs.core.windows.net/RAW/Public/TaxiZones/V1/taxi+_zone_lookup.csv")
     )

# This succeeds - all we have done is defined a transform, not read the data, so it hasn't validated our schema yet

In [5]:
# If we try and view the results, we'll get an error - as soon as something cannot parse, it gives up
display(failfastdf.select("LocationID", "Borough", "Zone", "service_zone"))

In [6]:
# In this occurrence, there are rows that don't fit the structure (ie: have extra columns), so we can't even run a dataframe-wide aggregation:
failfastdf.count()

Failfast has it's uses. Many ETL developers prefer this approach - having the whole data pipeline shutdown because there is a malformed file means we can be very strict about data typing & ensure that no bad data ever makes it to downstream systems. However, this does mean that we then need to implement further systems to handle these errors, maybe even having a team to support the process when it breaks

It is more common these days that we allow processing to continue, but do something with the bad data. This means we will get data to people as soon as possible, but there might be caveats associated with the data delivered

In [8]:
# Now let's load the data, allowing any malformed records to come through
dropMaldf = (spark
       .read
       .option("header","true")
       .option("mode", "DROPMALFORMED")
       .schema(mySchema)
       .csv("abfss://root@dblake.dfs.core.windows.net/RAW/Public/TaxiZones/V1/taxi+_zone_lookup.csv")
     )

# Let's take a look at the data again
display(dropMaldf.select("LocationID", "Borough", "Zone", "service_zone"))

LocationID,Borough,Zone,service_zone
1,EWR,Newark Airport,EWR
2,Queens,Jamaica Bay,Boro Zone
3,Bronx,Allerton/Pelham Gardens,Boro Zone
4,Manhattan,Alphabet City,Yellow Zone
5,Staten Island,Arden Heights,Boro Zone
6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone
7,Queens,Astoria,Boro Zone
8,Queens,Astoria Park,Boro Zone
9,Queens,Auburndale,Boro Zone
10,Queens,Baisley Park,Boro Zone


In [9]:
# If we force the dataframe to perform a count, it will not try to conform rows to the schema and will return the full record count
dropMaldf.count()

# You should see Out[]: 280 as a result

In [10]:
# If we then force a cache, it will validate the schema and drop any rows, so we will see our overall count go down as malformed rows are dropped from the dataframe
dropMaldf.cache()
dropMaldf.count()

# Out[]: 274

This is fairly simple - our DataFrame is populated and we know the data fits properly... but most data pipelines can't accept the loss of data, no matter what the problem was. So we need to look at syphoning off those malformed rows into a separate dataset

## Rejecting Failed Rows

The above methods are all well and good - but all have their drawbacks when building an ETL pipeline. A much more common approach is to push data that does not parse correctly down a different path.

Let's try it again with the PERMISSIVE option, except this time we will add in a system field "_corrupt_record". This is specifically with the PERMISSIVE mode to store the original contents of any fields that did not properly conform

In [13]:
from pyspark.sql.functions import *

mySchema = StructType([
  StructField("LocationID", IntegerType(), True),
  StructField("Borough", StringType(), True),
  StructField("Zone", StringType(), True),
  StructField("service_zone", StringType(), True),
  StructField("_corrupt_record", StringType(), True)])

# Now create our DataFrame over the whole fileset, but applying our the schema from the sample rather than inferring
Reddf = (spark
       .read
       .option("header","true")
       .option("mode", "PERMISSIVE")
       .schema(mySchema)4
       .csv("abfss://root@dblake.dfs.core.windows.net/RAW/Public/TaxiZones/V1/taxi+_zone_lookup.csv")
     )

display(Reddf.select("*"))

LocationID,Borough,Zone,service_zone,_corrupt_record
1.0,EWR,Newark Airport,EWR,
2.0,Queens,Jamaica Bay,Boro Zone,
3.0,Bronx,Allerton/Pelham Gardens,Boro Zone,
4.0,Manhattan,Alphabet City,Yellow Zone,
5.0,Staten Island,Arden Heights,Boro Zone,
6.0,Staten Island,Arrochar/Fort Wadsworth,Boro Zone,
7.0,Queens,Astoria,Boro Zone,
8.0,Queens,Astoria Park,Boro Zone,
9.0,Queens,Auburndale,Boro Zone,
10.0,Queens,Baisley Park,Boro Zone,


In [14]:
# We can filter using the _corrupt_record column to find only those rows which failed parsing in some manner
display(Reddf.select("*").filter(col("_corrupt_record").isNotNull()))

LocationID,Borough,Zone,service_zone,_corrupt_record
12.0,Manhattan,Battery Park,Yellow,"12,Manhattan,Battery Park,Yellow, Zone"
23.0,Staten Island,Bloomfield/Emerson Hill,Boro,"23,Staten Island,Bloomfield/Emerson Hill,Boro, Zone"
39.0,Brooklyn,Canarsie,Boro,"39,Brooklyn,Canarsie,Boro, Zone"
168.0,Bronx,Mott Haven/Port Morris,Boro,"168,Bronx,Mott Haven/Port Morris,Boro, Zone"
178.0,Brooklyn,Ocean Parkway South,Boro,"178,Brooklyn,Ocean Parkway South,Boro, Zone"
,,,,"N/A,N/A,N/A,N/A"


In this example we have two different types of parse failure happening

There are some rows that had an extra column - in this cases we've managed to fill our DataFrame attributes and the additional data has been left out. This assumes that the additional field was appended to the end of the record, which isn't always the case! Note - DROPMALFORMED removed all of these rows, even though they had some valid data.

The other type is an attribute parsing failure - our LocationID expects an integer but received "N/A". In this case, PERMISSIVE mode has replaced ALL columns with null, not just the offending attribute.

We can use the logic above in our data preparation pipelines. We could divert these rows to a separate fileset within the Lake for a later process to review, or allow the ones with some population through and raise an issue for someone to check at a later date. This all depends on how resilient & fault tolerant we want our pipeline to be

In [16]:
# Our data pipeline to continue for further processing
goodRowsdf = Reddf.select("*").filter(col("_corrupt_record").isNull())

# Our rejected rows to be review at a later date
rejectRowsdf = Reddf.select("*").filter(col("_corrupt_record").isNotNull())