In [1]:
import datetime
from dataclasses import dataclass
from typing import Any, Dict, List, Union

import numpy as np
import pandas as pd
import pyspark.sql as ps
import scipy
from pyspark.sql.functions import col, count


In [3]:
%load_ext autoreload
%autoreload 2

In [4]:
# Import the PySpark libraries
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext

# Create a SparkSession
spark = SparkSession.builder.appName("MyApp").getOrCreate()

# Create a SQLContext
sqlContext = SQLContext(spark)

# Load a CSV file
df = sqlContext.read.csv("file:///Users/btseitlin/Documents/simulator-ml/junior/data_quality/dq_metrics/ke_daily_sales.csv", header=True)

# Count the number of rows in the DataFrame
rowCount = df.count()

# Print the row count
print("Number of rows:", rowCount)

# Print the schema of the DataFrame
df.printSchema()

# Show the first 10 rows of the DataFrame
df.show(10)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/30 09:45:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

Number of rows: 7
root
 |-- day: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- qty: string (nullable = true)
 |-- price: string (nullable = true)
 |-- revenue: string (nullable = true)

+----------+-------+---+-----+-------+
|       day|item_id|qty|price|revenue|
+----------+-------+---+-----+-------+
|2022-10-24|    100|  5|120.0|  500.0|
|2022-10-24|    100|  6|120.0|  720.0|
|2022-10-24|    200|  2|200.0|  400.0|
|2022-10-24|    300| 10| 85.0|  850.0|
|2022-10-23|    100|  3|110.0|  330.0|
|2022-10-23|    200|  8|200.0| 1600.0|
|2022-10-23|    300|  0| 90.0|    0.0|
+----------+-------+---+-----+-------+



In [8]:
from pyspark.sql.functions import isnan, when, count, col

n = df.count()
k = df.filter(isnan(col('day')) | col('day').isNull()).count()
{"total": n, "count": k, "delta": k / n}

{'total': 7, 'count': 0, 'delta': 0.0}

In [12]:
import pyspark.sql.functions as f
df.groupBy(df.columns)\
    .count()\
    .where(f.col('count') > 1)\
    .select(f.sum('count'))\
    .collect()[0].asDict()['sum(count)']

In [15]:
df.show()

+----------+-------+---+-----+-------+
|       day|item_id|qty|price|revenue|
+----------+-------+---+-----+-------+
|2022-10-24|    100|  5|120.0|  500.0|
|2022-10-24|    100|  6|120.0|  720.0|
|2022-10-24|    200|  2|200.0|  400.0|
|2022-10-24|    300| 10| 85.0|  850.0|
|2022-10-23|    100|  3|110.0|  330.0|
|2022-10-23|    200|  8|200.0| 1600.0|
|2022-10-23|    300|  0| 90.0|    0.0|
+----------+-------+---+-----+-------+



In [38]:
from metrics import CountNull

df = spark.createDataFrame([(1, float('nan')), (None, 1.0), (1, 1.0)], ("a", "b"))
print(df.show())

CountNull(
    columns=['a', 'b'],
    aggregation='all',
)(df)

+----+---+
|   a|  b|
+----+---+
|   1|NaN|
|NULL|1.0|
|   1|1.0|
+----+---+

None


{'total': 3, 'count': 0, 'delta': 0.0}

In [42]:
from metrics import CountDuplicates

df = spark.createDataFrame([(1, 1.0), (None, 1.0), (1, 1.0)], ("a", "b"))
print(df.show())

CountDuplicates(
    columns=['a', 'b'],
)(df)

+----+---+
|   a|  b|
+----+---+
|   1|1.0|
|NULL|1.0|
|   1|1.0|
+----+---+

None


{'total': 3, 'count': 2, 'delta': 0.6666666666666666}

In [48]:
from metrics import CountDuplicates

df = spark.createDataFrame([(1, 1.0), (None, 1.0), (None, 1.0), (None, None), (None, None)], ("a", "b"))
print(df.show())

CountDuplicates(
    columns=['a', 'b'],
)(df)

+----+----+
|   a|   b|
+----+----+
|   1| 1.0|
|NULL| 1.0|
|NULL| 1.0|
|NULL|NULL|
|NULL|NULL|
+----+----+

None


{'total': 5, 'count': 4, 'delta': 0.8}

In [44]:
from metrics import CountBelowValue

df = spark.createDataFrame([(1, 1.0), (5, 1.0), (1, 1.0)], ("a", "b"))
print(df.show())

CountBelowValue(
    column='a',
    value=2,
)(df)

+---+---+
|  a|  b|
+---+---+
|  1|1.0|
|  5|1.0|
|  1|1.0|
+---+---+

None


{'total': 3, 'count': 2, 'delta': 0.6666666666666666}

In [54]:
from metrics import CountRatioBelow

df = spark.createDataFrame([(1, 1.0, 5), (1, 0.0, 5), (None, 1.0, 1), (None, 1.0, 2), (None, None, 3), (None, None, 4)], ("a", "b", "c"))
print(df.show())

CountRatioBelow(
    column_x='a',
    column_y='b',
    column_z='c',
)(df)

+----+----+---+
|   a|   b|  c|
+----+----+---+
|   1| 1.0|  5|
|   1| 0.0|  5|
|NULL| 1.0|  1|
|NULL| 1.0|  2|
|NULL|NULL|  3|
|NULL|NULL|  4|
+----+----+---+

None


{'total': 6, 'count': 1, 'delta': 0.16666666666666666}

In [58]:
from metrics import CountCB

df = spark.createDataFrame([(1, 1.0, 5), (1, 0.0, 5), (None, 1.0, 1), (None, 1.0, 2), (None, None, 3), (None, None, 4)], ("a", "b", "c"))
print(df.show())

CountCB(
    column='a',
)(df)

+----+----+---+
|   a|   b|  c|
+----+----+---+
|   1| 1.0|  5|
|   1| 0.0|  5|
|NULL| 1.0|  1|
|NULL| 1.0|  2|
|NULL|NULL|  3|
|NULL|NULL|  4|
+----+----+---+

None


                                                                                

{'lcb': 1.0, 'ucb': 1.0}

In [61]:
from metrics import CountLag

df = spark.createDataFrame([("2022-10-24", 1, 1)], ("a", "b", "c"))
print(df.show())

CountLag(
    column='a',
)(df)

+----------+---+---+
|         a|  b|  c|
+----------+---+---+
|2022-10-24|  1|  1|
+----------+---+---+

None


{'today': '2023-10-30', 'last_day': '2022-10-24', 'lag': 371}