In [0]:
pip install nltk

# Quality Assurance

The main objective here is to ensure that all transformations work perfectly. All the pipeline stages here are the same found on the Data Engineering notebook.

In [0]:
from pyspark.ml.pipeline import Transformer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.sql.functions import log10
from pyspark.sql.functions import col, lower, regexp_replace, split
from pyspark.sql.types import *
from nltk.stem.porter import *
from pyspark.sql import functions as f
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, when
global total_lost

class MeanImputation(Transformer):
    def __init__(self, inputCols, outputCol):
        self.inputCols = inputCols
        self.outputCol = outputCol 
    def this():
        this(Identifiable.uid("imputer"))
    def copy(extra):
        defaultCopy(extra)
    def _transform(self, df):
        # operating in the speficied group of input columns
        w = Window().partitionBy(self.inputCols)
        #related to the data integrity specification
        #print("------------- Percentage Recovered With Mean Imputation - " + self.outputCol + ":", df.filter((data[self.outputCol] == "") | df[self.outputCol].isNull() | isnan(df[self.outputCol])).count()/data.count()*100, "% -----------------")
        # returns the new windowed average values when there is a null value on the column
        return df.withColumn(self.outputCol,when(col(self.outputCol).isNull(),avg(col(self.outputCol)).over(w)).otherwise(col(self.outputCol)))
      
class DropRows(Transformer):
    def __init__(self, inputCols):
        self.inputCols = inputCols
    def this():
        this(Identifiable.uid("Dropper_rows"))
    def copy(extra):
        defaultCopy(extra)
    def _transform(self, df):
        # related to the data integrity specification
        #print("------------- Percentage Lost in Drop Rows Stage:", (df.na.drop(subset=self.inputCols).count())/df.count(), "%-----------------")
        # returns a dataframe where there are not rows with null values in the specified columns
        return df.na.drop(subset=self.inputCols)
      
class LogTransform(Transformer):
    def __init__(self, inputCol, outputCol):
        self.inputCol = inputCol
        self.outputCol = outputCol
    def this():
        this(Identifiable.uid("log_transform"))
    def copy(extra):
        defaultCopy(extra)
    def _transform(self, df):
        # returns a dataframe with a log transformed input column as the specified output column
        return df.withColumn(self.outputCol, log10((self.inputCol)))
      
class CleanText(Transformer):
    def __init__(self, inputCol, outputCol):
        self.inputCol = inputCol
        self.outputCol = outputCol
    def this():
        this(Identifiable.uid("clean_text"))
    def copy(extra):
        defaultCopy(extra)
    def _transform(self, df):
        def clean_text(c):
          # removes all these characters
          c = lower(c)
          c = regexp_replace(c, "^rt ", "")
          c = regexp_replace(c, "(https?\://)\S+", "")
          c = regexp_replace(c, "[^a-zA-Z0-9\\s]", "")
          return c
        # applies the specified function above to the input column
        return df.withColumn((self.outputCol), clean_text((self.inputCol)))
  
class StemmerTransform(Transformer):
    def __init__(self, inputCol, outputCol):
        self.inputCol = inputCol
        self.outputCol = outputCol
    def this():
        this(Identifiable.uid("clean_text"))
    def copy(extra):
        defaultCopy(extra)
    def _transform(self, df):
        def stem(in_vec):
          out_vec = []
          stemmer = PorterStemmer()
          for t in in_vec:
              t_stem = stemmer.stem(t)
              if len(t_stem) > 2:
                  out_vec.append(t_stem)       
          return out_vec
        stemmer_udf = udf(lambda x: stem(x), ArrayType(StringType()))
        return df.withColumn((self.outputCol), stemmer_udf(col(self.inputCol)))
  
class CleanOutliers(Transformer):
    def __init__(self, inputCol, outputCol, threshold):
        self.inputCol = inputCol
        self.outputCol = outputCol
        self.threshold = threshold
    def this():
        this(Identifiable.uid("clean_outliers"))
    def copy(extra):
        defaultCopy(extra)
    def _transform(self, df):
        return df.where(f.col(self.inputCol) < self.threshold)              
      
      
class EnsuretestSetQuality(Transformer):
    def __init__(self, df_comparative, inputCols):
        self.inputCols = inputCols
        self.df_comparative = df_comparative
    def this():
        this(Identifiable.uid("testSetQual"))
    def copy(extra):
        defaultCopy(extra)
    def _transform(self, df):
        for col in self.inputCols:
          newCategorical = set(list(self.df_comparative.toPandas()[col])) - set(list(df.toPandas()[col]))
          for cat in newCategorical:
            df =  df = (df.filter(df[col] != cat))
          
        return df

# Unit Tests

We are unit testing the following customized pipeline stages:
* LogTransform 
* MeanImputation
* DropRows
* CleanOutliers
* CleanText
* testSetQuality

All the testes were inherited from the unittest class.

In [0]:
import unittest

class Tests(unittest.TestCase):
  
  def testLogTransform(self):
    # Calling the class constructor
    lt = LogTransform(inputCol='price',outputCol='priceLog')
    # specifying the columns names and values
    columns = ["price","any"]
    price_values = [(2,3),(3,2)]
    # creating the df
    df = sqlContext.createDataFrame(price_values,columns)
    temp = lt.transform(df).toPandas()
    # getting the column names
    result1 = list(temp.columns)
    # getting the column values
    values = (temp.priceLog.values).tolist()
    result2 = []
    for i in range(len(values)):
      # rounding values 
      result2.append(round(values[i],2))
    # expected transformed df column names
    expected1 = ['price','any','priceLog']
    # expected transformed df values for d priceLog column
    expected2 = [0.30, 0.48]
    # test if this elements are equal
    self.assertEqual(result1, expected1)
    self.assertEqual(result2, expected2)
    
    
  def testMeanImputation(self):
    mi = MeanImputation(inputCols='city', outputCol='price')
    #define the df
    columns = ['city','price']
    price = [('NY', 10), ('CG',10), ('SP',15),('SP',20),('CG',None),('NY',None),('CG',15),('SP',None)]
    df = sqlContext.createDataFrame(price,columns)
    temp = mi.transform(df).toPandas()
    # column names of the transformed df
    result1 = list(temp.columns)
    expected1 = ['city', 'price']
    self.assertEqual(result1, expected1)
    # values of the transformed df
    result2 = list(temp.price.values)
    expected2 = [10, 12.5, 15, 10, 10, 15, 20, 17.5]
    self.assertEqual(result2,expected2)
  
  def testDropRows(self):
    dr = DropRows(inputCols=['room_type','availability_365'])
    columns = ['room_type','availability_365']
    values = [('private room',300), (None,365),('full', None),('shared', 120)]
    df = sqlContext.createDataFrame(values,columns)
    temp = dr.transform(df).toPandas()
    result1 = list(temp.columns)
    expected1 = ['room_type', 'availability_365']
    result2 = list(temp.room_type.values)
    expected2 = ['private room', 'shared']
    result3 = list(temp.availability_365.values)
    expected3 = [300,120]
    self.assertEqual(result1,expected1)
    self.assertEqual(result2,expected2)
    self.assertEqual(result3,expected3)
    
    
  def testCleanOutliers(self):
    dr = CleanOutliers(inputCol='price', outputCol='price', threshold = 300)
    columns = ['room_type','price']
    values = [('private room',299), ('private room',300),('full', 305),('shared', 120)]
    df = sqlContext.createDataFrame(values,columns)
    temp = dr.transform(df).toPandas()
    result1 = list(temp.columns)
    expected1 = ['room_type', 'price']
    result2 = list(temp.price.values)
    expected2 = [299,120]
    self.assertEqual(result1,expected1)
    self.assertEqual(result2,expected2)
  
  def testcleanText(self):
    ct = CleanText(inputCol='room_type', outputCol='room_type_clean')
    columns = ['room_type','price']
    values = [('p$rivate \ room',299), ('?:// private room ',300),('full', 305),('sh^ared', 120)]
    df = sqlContext.createDataFrame(values,columns)
    temp = ct.transform(df).toPandas()
    result1 = list(temp.columns)
    expected1 = ['room_type', 'price', 'room_type_clean']
    result2 = list(temp.room_type_clean)
    expected2 = ['private  room',' private room ','full', 'shared']
    self.assertEqual(result1,expected1)
    self.assertEqual(result2,expected2)
    
  def testEnsuretestSetQuality(self):
    # defining the result df
    columns = ['neighbourhood_group','neighbourhood', 'room_type']
    values = [('Manhattan','Bronx','Shared')]
    df_train = sqlContext.createDataFrame(values,columns)
    # defining the test df
    columns = ['neighbourhood_group','neighbourhood', 'room_type']
    values = [('Manhattan','Bronx','Shared'), ('Staten','Zepa', 'Private'),('Manhattan','Bronx','Private'),('Queens','Bronx','Full')]
    df = sqlContext.createDataFrame(values,columns)
    tq = EnsuretestSetQuality(df_comparative = df, inputCols=['neighbourhood_group','neighbourhood', 'room_type'])
    result1 = (tq.transform(df_train).toPandas()).iloc[0].values.tolist()
    expected1 = ['Manhattan','Bronx','Shared']
    self.assertEqual(result1,expected1)

As one can see, all unit tests for the custom pipeline stages were succesfull

In [0]:
suite = unittest.TestLoader().loadTestsFromTestCase(Tests)
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suite)