In [6]:
#Install The required libraries and import them.

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
import pyspark.sql.functions as F 
from sqlalchemy import create_engine
import psycopg2
import logging
import avro.schema
import avro.io
import avro.datafile
from avro.datafile import DataFileWriter
from datetime import datetime
import os
from sqlalchemy import create_engine, MetaData, Table
import json

In [2]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Ingestion of CSV Files") \
    .getOrCreate()

In [None]:
#Create a validation function based on rules. For this example, the function will validate if the columns of a table are empty on certain rows.
def nullCount(df, inputColumns):
    df_valid = df #Initially equal to the original dataset.
    df_invalid = df.filter("1=0").withColumn("validationMessage",F.lit("")) #Creating empty dataframe with the dataset schema and validation message
    columns = inputColumns
    source_count=df_valid.count()

    if columns:
        #Create the valid and not valid conditions
        query_null = " IS NULL OR ".join(columns) + " IS NULL"
        query_not_null = " IS NOT NULL AND ".join(columns) + " IS NOT NULL"
        #Filter the dataframe from valid and not valid conditions 
        df_nullCount_invalid = df_valid.filter(query_null)
        df_nullCount_valid = df_valid.filter(query_not_null)
    
        #Split the table on valid and not valid tables.
        df_valid = df_nullCount_valid
        df_invalid = df_invalid.union(df_nullCount_invalid.withColumn("validationMessage",F.lit("this record was removed by NullCount validation")))
   
    return df_valid, df_invalid

In [None]:
#Write Tables on a PostgreSQL Database
def write_to_PostgreSQL (df,table_name,db_schema,engine):
    pandas_df = df.toPandas()
    pandas_df.to_sql(table_name, engine,schema=db_schema, if_exists='replace', index=False)


In [None]:
# Function to retrieve table schema from PostgreSQL
def get_table_schema(engine, table_name):
    metadata = MetaData(bind=engine)
    table = Table(table_name, metadata, autoload_with=engine)
    schema = {
        "type": "record",
        "name": table_name,
        "fields": []
    }
    for column in table.columns:
        avro_type = map_postgresql_to_avro(column.type)
        # Allow nulls in schema
        avro_field_type = ["null", avro_type] if column.nullable else avro_type
        schema["fields"].append({"name": column.name, "type": avro_type})
    return schema

In [None]:
# Function to map PostgreSQL data types to AVRO data types
def map_postgresql_to_avro(data_type):
    type_map = {
        'INTEGER': 'int',
        'BIGINT': 'long',
        'SMALLINT': 'int',
        'BOOLEAN': 'boolean',
        'TEXT': 'string',
        'VARCHAR': 'string',
        'DATE': 'string',
        'TIMESTAMP': 'string',
        'FLOAT': 'float',
        'NUMERIC': 'double'
    }
    return type_map.get(str(data_type),'string')  # Default to string if type not found

In [5]:
#Create backup based on Avro Files
def avro_backup(engine,table,schema,db_schema):
    try:
        #Read the table
        df = pd.read_sql_table(table_name, engine,schema=db_schema)
        
        # Generate a timestamp
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        #Generate backup folder
        currentdirectory = os.getcwd()
        parentdirectory = os.path.dirname(currentdirectory)
        folder_path =f'/backup/{table}/{timestamp}'.replace('/','\\')
        full_folder_path = parentdirectory + folder_path
        os.makedirs(full_folder_path,exist_ok=True)
    
        #generate Avro File Path
        avro_path = full_folder_path + f'/{table}_{timestamp}.avro'
    
        #Create 
        records = df.to_dict(orient='records')
        parsed_schema = avro.schema.parse(json.dumps(schema))
        with open(avro_path, 'wb') as out:
            writer = avro.datafile.DataFileWriter(out, avro.io.DatumWriter(), parsed_schema)
            for record in records:
                writer.append(record)
            writer.close()
        Print('Backup process completed successfully')
        logging.info(f"Backup for table {table_name} completed successfully. Saved to {avro_path}")
    except Exception as e:
        logging.error(f"Error during backup of table {table_name}: {e}")