In [1]:
import os
import sys
import spark_utils as sut
import pandas as pd

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

## Demo dataframe

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import to_date

# Create SparkSession
spark = SparkSession.builder \
    .appName("SparkDemo") \
    .getOrCreate()

# Define schema
schema = StructType([
    StructField("ID1", IntegerType(), True),
    StructField("ID2", StringType(), True),
    StructField("Name", StringType(), True),
    StructField("DOB", StringType(), True),
    StructField("City", StringType(), True)
])

# Sample data with duplicates based on ID1 and ID2
data = [
    (101, 'A', 'Alice', '2000-01-01', 'New York'),
    (102, 'B', 'Bob', '1990-01-01', 'Los Angeles'),
    (103, 'E', 'Elly', '1982-01-01', 'San Francisco'),
    (104, 'J', 'Jesse', '1995-01-01', 'Chicago'),
    (105, 'B', 'Bingo', '1987-01-01', 'Los Angeles'),
    (101, 'A', 'Alice', '2000-01-01', 'NY'),
    (102, 'B', 'Bob', '1990-01-01', 'LA'),
    (105, 'B', 'Binggy', '1987-01-01', 'Los Angeles'),  
]

# Create DataFrame with schema
df = spark.createDataFrame(data, schema=schema)

# Convert DOB column to DateType
df = df.withColumn("DOB", to_date(df["DOB"], "yyyy-MM-dd"))

df = df.orderBy('ID1','ID2')

# Show DataFrame
df.show()


+---+---+------+----------+-------------+
|ID1|ID2|  Name|       DOB|         City|
+---+---+------+----------+-------------+
|101|  A| Alice|2000-01-01|     New York|
|101|  A| Alice|2000-01-01|           NY|
|102|  B|   Bob|1990-01-01|           LA|
|102|  B|   Bob|1990-01-01|  Los Angeles|
|103|  E|  Elly|1982-01-01|San Francisco|
|104|  J| Jesse|1995-01-01|      Chicago|
|105|  B| Bingo|1987-01-01|  Los Angeles|
|105|  B|Binggy|1987-01-01|  Los Angeles|
+---+---+------+----------+-------------+



# Custom pySpark functions

## shape

In [8]:
df_ = df.crossJoin(spark.range(1, 1234567)).drop("id")
sut.shape(df_)

Number of rows: 9,876,528
Number of columns: 5


## value counts with percent

In [9]:
sut.value_counts_with_pct(df_, 'Name')

+------+---------+----+
|  Name|    count| pct|
+------+---------+----+
| Alice|2,469,132|25.0|
|   Bob|2,469,132|25.0|
|  Elly|1,234,566|12.5|
| Jesse|1,234,566|12.5|
| Bingo|1,234,566|12.5|
|Binggy|1,234,566|12.5|
+------+---------+----+



DataFrame[Name: string, count: bigint, pct: double]

## print schema alphabetically

In [4]:
sut.print_schema_alphabetically(df)

root
 |-- City: string (nullable = true)
 |-- DOB: date (nullable = true)
 |-- ID1: integer (nullable = true)
 |-- ID2: string (nullable = true)
 |-- Name: string (nullable = true)



## verify primary key

In [5]:
id_cols = ['ID1', 'ID2']
sut.is_primary_key(df, id_cols)

Total row count after filtering out missings: 8
Unique row count after filtering out missings: 5
The column(s) ID1, ID2 does not form a primary key.


False

## find duplicates

In [6]:
if sut.is_primary_key(df, id_cols) == False:
    dups = sut.find_duplicates(df, id_cols)
    dups.show()

Total row count after filtering out missings: 8
Unique row count after filtering out missings: 5
The column(s) ID1, ID2 does not form a primary key.
+-----+---+---+------+----------+-----------+
|count|ID1|ID2|  Name|       DOB|       City|
+-----+---+---+------+----------+-----------+
|    2|101|  A| Alice|2000-01-01|   New York|
|    2|101|  A| Alice|2000-01-01|         NY|
|    2|102|  B|   Bob|1990-01-01|Los Angeles|
|    2|102|  B|   Bob|1990-01-01|         LA|
|    2|105|  B| Bingo|1987-01-01|Los Angeles|
|    2|105|  B|Binggy|1987-01-01|Los Angeles|
+-----+---+---+------+----------+-----------+



## identify columns responsible for dups

With our simple dummy table, it is relatively easy to tell which columns are responsible for the dups. The `City` column is responsible for the differences in 101-A and 102-B ID combinations, while the `Name` column is responsible for the difference in the 105-B combination. 

Identifying the sources for the dups is important for us to find the suitable fix. But when we have very wide table and many more dups, this task becomes much trickier. 

The `cols_responsible_for_id_dups` function comes in rescue by summarizing the `difference_counts` for each column based on the primary key(s) provided. For example, in the output below, we can easily see that the field `City` is responsible for differences in two unique ID combinations while `Name` is responsible for one pair. 

In [7]:
if sut.is_primary_key(df, id_cols) == False:
    dup_cols = sut.cols_responsible_for_id_dups(df, id_cols)
    dup_cols.show()

Total row count after filtering out missings: 8
Unique row count after filtering out missings: 5
The column(s) ID1, ID2 does not form a primary key.
+--------+-----------------+
|col_name|difference_counts|
+--------+-----------------+
|    City|                2|
|    Name|                1|
|     DOB|                0|
+--------+-----------------+



## filter by string

In [8]:
search_strings = ['NY', 'New York']
check = sut.filter_df_by_strings(df, 'City', search_strings)

In [9]:
check.show()

+---+---+-----+----------+--------+
|ID1|ID2| Name|       DOB|    City|
+---+---+-----+----------+--------+
|101|  A|Alice|2000-01-01|      NY|
|101|  A|Alice|2000-01-01|New York|
+---+---+-----+----------+--------+

