<<< Importing main libreries >>>

Note:
Before start, make sure you have your csv files into "dataPruebaDataEngineer" directory.
The main idea about this ingestion pipeline is identify the .csv files into a specific directory, then, clean data and save it into a PostgreSQL database (localhost in this case), showing some stadistics outcomes like total rows saved into a database, average, minimum value and maximum value of price field for each file charged.  

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import col
import os
import shutil
from pyspark.sql.types import StructType, StructField, StringType
import numpy as np
import pandas as pd

<<< Creating read schemas for .csv files >>>

In [2]:
read_csv_schema = StructType([
    StructField("timestamp", StringType(), False),
    StructField("price", StringType(), True),
    StructField("user_id", StringType(), False)
])

<<< Starting SparkSession and configurating PostgreSQL Connection >>>

In [3]:
SparkContext.getOrCreate().stop()

spark = SparkSession.builder \
    .appName("[PragmaTest] CSV Ingestion") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.3") \
    .getOrCreate()

postgres_url = "jdbc:postgresql://localhost:5432/pragma_assessment"
properties = {
    "user": "postgres",
    "password": "pragma1234",
    "driver": "org.postgresql.Driver"
}

#print(spark._jvm.System.getProperty("java.class.path"))

<<< Creating function with ingestion logic >>>

In [42]:
# source directory where new csv files are coming
src_path = 'dataPruebaDataEngineer'

# destination path to move the processed files from source directory
# the main idea is processing each csv file and move it to keep clean source directorory
dest_path = 'processed_files'

# Ensuring that destination directoru exists or create if it isn't exist.
os.makedirs(dest_path, exist_ok=True)

# function to clean data
def cleaning_data(df_to_clean):
    # 1. cleaning data: if price value is null, we set it "0"
    # 2. delete header row
    df_cleaned = df_to_clean.fillna({"price":0})
    row_header = df_cleaned.first()
    df_cleaned = df_cleaned.filter(df_cleaned[0] != row_header[0])

    return df_cleaned

# function to process csv files
def processing_file(file_path):
    # reading csv file and creating a dataframe
    csv_df = spark.read.schema(read_csv_schema).csv(file_path)

    # cleaning data through function "cleaing_data()"
    csv_cleaned = cleaning_data(csv_df)

    # saving df into postgresql database
    csv_cleaned.write.jdbc(url=postgres_url, table="pragma_prices", 
                      mode="append", properties=properties)
    
    return csv_cleaned

# function to show total rows or records
def total_rows(df_to_process):
    # >>> total count about rows of dataframe
    total_rows_count = df_to_process.count()

    return total_rows_count

# function to calculate price avarage
def price_avg(df_to_process):
    # price avg
    df_to_process = df_to_process.toPandas()
    price_avg = df_to_process["price"].astype(int).mean()
    
    return price_avg

# function to show the minimum value of price field
def price_min(df_to_process):
    # price minimum
    df_to_process = df_to_process.toPandas()
    price_min = df_to_process["price"].astype(int).min()
    
    return price_min

# function to show the maximum value of price field and moving file
def price_max(df_to_process):
    # price maximum
    df_to_process = df_to_process.toPandas()
    price_max = df_to_process["price"].astype(int).max()

    return price_max

def moving_file_processed(file_path):
    # moving processed csv file to processed directory
    shutil.move(file_path, os.path.join(dest_path, os.path.basename(file_path)))
    

<<< Executing ingestion process >>>

In [43]:
# list names of source path and process each file (one by one)
name_files = os.listdir(src_path)

# for each file name found in name files list, we validate if 
# that one is csv file.
# Then, creating file path and executing function to process each file found.
file_processed = 2
total_count = 0
all_avg = []
all_min = []
all_max = []

for file in name_files:
    if file.endswith(".csv"):
        file_path = os.path.join(src_path, file)
        df_saved = processing_file(file_path)
        
        # Showing Stadistics 
        # >>> count
        total_count = total_count + total_rows(df_saved)
        print(f">>> Total rows processed and saved: {total_count}")

        # price avg
        all_avg.append(price_avg(df_saved))
        new_avg = sum(all_avg) / len(all_avg)
        print(f">>> Average of 'Price' field: {new_avg}")

        # min value of 'price'
        all_min.append(price_min(df_saved))
        min_value = min(all_min)
        print(f">>> Minumum value of 'Price' field: {min_value}")
        
        # max value of 'price'
        all_max.append(price_max(df_saved))
        max_value = max(all_max)
        print(f">>> Maximum value of 'Price' field: {max_value}")

        # moving file to processed_files directory
        moving_file_processed(file_path)
        
        file_processed += 1


>>> Total rows processed and saved: 22
>>> Average of 'Price' field: 54.22727272727273
>>> Minumum value of 'Price' field: 0
>>> Maximum value of 'Price' field: 97
>>> Total rows processed and saved: 51
>>> Average of 'Price' field: 54.527429467084644
>>> Minumum value of 'Price' field: 0
>>> Maximum value of 'Price' field: 100
>>> Total rows processed and saved: 82
>>> Average of 'Price' field: 56.24409276300267
>>> Minumum value of 'Price' field: 0
>>> Maximum value of 'Price' field: 100
>>> Total rows processed and saved: 112
>>> Average of 'Price' field: 55.57473623891867
>>> Minumum value of 'Price' field: 0
>>> Maximum value of 'Price' field: 100
>>> Total rows processed and saved: 143
>>> Average of 'Price' field: 56.111401894360746
>>> Minumum value of 'Price' field: 0
>>> Maximum value of 'Price' field: 100
>>> Total rows processed and saved: 151
>>> Average of 'Price' field: 53.71783491196729
>>> Minumum value of 'Price' field: 0
>>> Maximum value of 'Price' field: 100
