<a href="https://colab.research.google.com/github/Rakeshkrishnamurthy/Rocky_Help/blob/main/Data_preprocess_validation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Data Preprocessing

In [None]:
# Step 1: Data preprocessing - Duplicate and null value handling
def data_preprocessing(raw_csv_path, clean_path):
    """
    Performs data preprocessing - handling duplicates and null values.
    """
    data = spark.read.csv(raw_csv_path, header=True, inferSchema=True)  # Read CSV from raw layer
    # Handle duplicates and null values
    clean_data = data.dropDuplicates().na.drop()
    # Save cleaned data
    clean_data.write.mode("overwrite").parquet(clean_path)

Data validation and Schema Validation

In [None]:
# Step 2: Store cleaned data in staging layer and perform data validation
def store_and_validate(staging_path, target_path):
    """
    Stores cleaned data in the staging layer and performs data validation.
    """
    clean_data = spark.read.parquet(clean_path)  # Read cleaned data
    # Save data to staging layer
    clean_data.write.mode("overwrite").parquet(staging_path)
    # Data validation - Count and schema validation
    staging_data = spark.read.parquet(staging_path)
    assert clean_data.count() == staging_data.count(), "Data count mismatch"
    assert clean_data.schema == staging_data.schema, "Schema mismatch"

    # Save data to target layer
    staging_data.write.mode("overwrite").parquet(target_path)

Incremental_Load

In [None]:
#Get the laste dated data present in Target(raw)

def get_last_update_time(target_path):

    try:
        target_df = spark.read.option("header", "true").parquet(target_path)
        last_update_time = target_df.agg(max("last_updated")).collect()[0][0]
        return last_update_time
    except Exception as e:
        print(f"Error getting last update time: {str(e)}")
        return None

#Incremental Load and success/failed

def incremental_load(spark, source_path, target_path):

    # Get the last update time from the target data
    last_update_time = get_last_update_time(target_path)

    if last_update_time is not None:
        # Read data from source CSV
        source_df = spark.read.option("header", "true").csv(source_path)
        # Filter data based on last_update_time
        incremental_data = source_df.filter(col("last_updated") > last_update_time)

        if incremental_data.count() > 0:
            # Write incremental data to target location
            incremental_data.write.mode("append").parquet(target_path)
            print("Incremental load successful.")
        else:
            print("No new data to load.")
    else:
        print("Unable to determine last update time. Exiting.")

if __name__ == "__main__":
    # Initialize Spark session
    spark = SparkSession.builder.appName("IncrementalLoad").getOrCreate()

    # Set your source and target paths
    source_path = "source/path"
    target_path = "target/path"

    # Execute incremental load
    incremental_load(spark, source_path, target_path)

    # Stop Spark session
    spark.stop()

Copy to HDFS

In [None]:
#Copying a local file to HDFS:

hadoop fs -copyFromLocal local_file.txt /path/in/hdfs

#Copying a local directory to HDFS:
hadoop fs -copyFromLocal local_directory /path/in/hdfs

#Copying data from HDFS to the local file system:

hadoop fs -copyToLocal /path/in/hdfs/local_file.txt local_destination



Skipping first 2 lines in Hive


In [None]:
CREATE TABLE your_table_name (
  column1 datatype1,
  column2 datatype2,
  ...
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
TBLPROPERTIES ('skip.header.line.count'='2');


Skipping first 2 lines in Pyspark

In [None]:
df = spark.read.option("header", "true").option("skipRows", "2").csv(file_path)