In [14]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import count, isnan, when, col, sum, avg, lit
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from pathlib import Path
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s-%(levelname)s-%(message)s")

logger= logging.getLogger(__name__)

In [2]:
spark = SparkSession.builder.appName("datavalidation").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/19 15:57:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
df = spark.read.csv("../inputs/finance/credit_card_fraud.csv", header=True, inferSchema=True)

In [6]:
df.show(5, 0)

+-------------+------+-----------------+-------------------+---------------+-----------------------------+--------------+----------+-------+
|TransactionID|UserID|TransactionAmount|TransactionDate    |TransactionType|Merchant                     |Location      |CardType  |IsFraud|
+-------------+------+-----------------+-------------------+---------------+-----------------------------+--------------+----------+-------+
|1            |4174  |3240.11          |2024-01-01 00:00:00|ATM Withdrawal |Griffin-Walters              |Calvinfurt    |Visa      |1      |
|2            |4507  |2261.45          |2024-01-01 00:01:00|NULL           |Gamble, Garcia and Montgomery|North Kristen |Amex      |0      |
|3            |1860  |NULL             |2024-01-01 00:02:00|POS            |Copeland, Adams and Parker   |New Kathyburgh|MasterCard|0      |
|4            |2294  |423.81           |2024-01-01 00:03:00|Online         |Bennett Inc                  |Fordbury      |Visa      |0      |
|5           

In [9]:
columns = df.columns
columns

['TransactionID',
 'UserID',
 'TransactionAmount',
 'TransactionDate',
 'TransactionType',
 'Merchant',
 'Location',
 'CardType',
 'IsFraud']

In [None]:
len(df.columns)

9

25/02/19 16:24:23 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 293669 ms exceeds timeout 120000 ms
25/02/19 16:24:23 WARN SparkContext: Killing executors is not supported by current scheduler.
25/02/19 16:24:28 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [16]:
schema = StructType(fields=
                            [
                            StructField("ColumnName", StringType(), False),
                            StructField("ColumnPresentCheck", StringType(), False)
                            ]
                            )

data_dict = {}
        
for column in columns:
    if column in df.columns:
        data_dict[column] = 'True'
    else:
        data_dict[column] = 'False'

final_df = spark.createDataFrame([(k, v) for k, v in data_dict.items()], schema)
final_df = final_df.withColumn("IsPresent", when(col("ColumnPresentCheck") == "True", lit("✅")).otherwise(lit("❌")))
final_df.show()

+-----------------+------------------+---------+
|       ColumnName|ColumnPresentCheck|IsPresent|
+-----------------+------------------+---------+
|    TransactionID|              True|        ✅|
|           UserID|              True|        ✅|
|TransactionAmount|              True|        ✅|
|  TransactionDate|              True|        ✅|
|  TransactionType|              True|        ✅|
|         Merchant|              True|        ✅|
|         Location|              True|        ✅|
|         CardType|              True|        ✅|
|          IsFraud|              True|        ✅|
+-----------------+------------------+---------+



In [None]:
def missing_column_check(self) -> DataFrame:
        """Returns a dataframe with missing columns"""
        logger.info("Scanning for required columns")
        columns = self.client.getConfig("subjects.credit_card.column_sequence")
        schema = StructType(fields=
                            [
                            StructField("ColumnName", StringType(), False),
                            StructField("ColumnPresentCheck", StringType(), False)
                            ]
                            )
        
        report_df = self.spark.createDataFrame([], schema)
        for column in columns:
            if column in self.df.columns:
                data = (column, "True")
                col_df = self.spark.createDataFrame(data, schema)
                report_df = report_df.union(col_df)
            else:
                data = (column, "False")
                col_df = self.spark.createDataFrame(data, schema)
                report_df = report_df.union(col_df)
        return report_df

In [None]:
class DataIngestion:
    def __init__(self, filepath: str):
        self.filepath = filepath

        if not Path(self.filepath).exist():
            logger.debug(f"{self.filepath} does not exist.")

        if self.filepath.split(".") != "csv":
            logger.info(f"Please use only csv format. {self.filepath}")

    
            
            