# PySpark Toolkit

In [42]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

### Use case : Data Profiling 

In [112]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0)), # Totally duplicate with previous line 
    Row(a=4, b=5., c='string3', d=date(2000, 5, 1), e=datetime(2000, 1, 3, 12, 0)), # PK duplicate
    Row(a=None, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0)), # PK contains Null
    Row(a=6, b=None, c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0)) # PK contains Null
])
df.show()

+----+----+-------+----------+-------------------+
|   a|   b|      c|         d|                  e|
+----+----+-------+----------+-------------------+
|   1| 2.0|string1|2000-01-01|2000-01-01 12:00:00|
|   2| 3.0|string2|2000-02-01|2000-01-02 12:00:00|
|   4| 5.0|string3|2000-03-01|2000-01-03 12:00:00|
|   4| 5.0|string3|2000-03-01|2000-01-03 12:00:00|
|   4| 5.0|string3|2000-05-01|2000-01-03 12:00:00|
|null| 5.0|string3|2000-03-01|2000-01-03 12:00:00|
|   6|null|string3|2000-03-01|2000-01-03 12:00:00|
+----+----+-------+----------+-------------------+



In [52]:
PK = ['a','b','c']

In [6]:
# PK null check
from pyspark.sql.functions import lit
null_check_str = ' is null or '.join(PK) + ' is null'
df_null = df.filter(null_check_str)
df_null = df_null.withColumn('desc',lit('PK Contains Null'))
df_null.show()

+----+----+-------+----------+-------------------+----------------+
|   a|   b|      c|         d|                  e|            desc|
+----+----+-------+----------+-------------------+----------------+
|null| 5.0|string3|2000-03-01|2000-01-03 12:00:00|PK Contains Null|
|   6|null|string3|2000-03-01|2000-01-03 12:00:00|PK Contains Null|
+----+----+-------+----------+-------------------+----------------+



In [7]:
from pyspark.sql.functions import count


In [104]:
from pyspark.sql.functions import col
not_null_check_str = ' is not null and '.join(PK)+' is not null'

df = df.where(not_null_check_str)
df_dups = df.groupby(PK).count().filter('count>1').select(PK).withColumn('desc',lit('Duplicate row'))
df.join(df_dups,on = PK,how = 'left').filter('desc is not null').show()



a is not null and b is not null and c is not null
+---+---+-------+----------+-------------------+-------------+
|  a|  b|      c|         d|                  e|         desc|
+---+---+-------+----------+-------------------+-------------+
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|Duplicate row|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|Duplicate row|
|  4|5.0|string3|2000-05-01|2000-01-03 12:00:00|Duplicate row|
+---+---+-------+----------+-------------------+-------------+



In [68]:
df.join(df_dups,on=PK,how='left').where('count is null').select(PK).show()


+----+----+-------+
|   a|   b|      c|
+----+----+-------+
|   1| 2.0|string1|
|   2| 3.0|string2|
|null| 5.0|string3|
|   6|null|string3|
+----+----+-------+



In [152]:
def pk_null_check(df,cols = []):
    from pyspark.sql.functions import lit
    
    null_check_str = ' is null or '.join(cols) + ' is null'
    df_null = df.filter(null_check_str)
    df_null = df_null.withColumn('desc',lit('PK Contains Null'))
    return df_null

def pk_dups_check(df,cols = []):
    not_null_check_str = ' is not null and '.join(cols)+' is not null'
    df_dups = df.where(not_null_check_str).groupby(PK).count().filter('count>1').select(PK).withColumn('desc',lit('Duplicate row'))
    
    return df.join(df_dups,on = PK,how = 'sleft').filter('desc is not null')


def profiling_data(df,PK = []):
    df_null = pk_null_check(df,PK)
    df_dups = pk_dups_check(df,PK)
    
    df_reject = df_null.union(df_dups)
    
    not_null_check_str = ' is not null and '.join(PK)+' is not null'
    df_clean = df.filter(not_null_check_str).join(df_dups.groupby(PK).count(),on=PK,how='left').filter('count is null').select(df.columns)
    return df_clean,df_reject
    

In [158]:
clean,reject = profiling_data(df,PK)
clean.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
+---+---+-------+----------+-------------------+



In [159]:
reject.show()

+----+----+-------+----------+-------------------+----------------+
|   a|   b|      c|         d|                  e|            desc|
+----+----+-------+----------+-------------------+----------------+
|null| 5.0|string3|2000-03-01|2000-01-03 12:00:00|PK Contains Null|
|   6|null|string3|2000-03-01|2000-01-03 12:00:00|PK Contains Null|
|   4| 5.0|string3|2000-03-01|2000-01-03 12:00:00|   Duplicate row|
|   4| 5.0|string3|2000-03-01|2000-01-03 12:00:00|   Duplicate row|
|   4| 5.0|string3|2000-05-01|2000-01-03 12:00:00|   Duplicate row|
+----+----+-------+----------+-------------------+----------------+



### Use case : Data Sampling

### Use case : Row to Column