In [1]:
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, length, regexp_replace, lit

def spark_read_csv_from_os(spark, file_path, schema, header=True, **options):
    """
    Reads a CSV file from the operating system into a Spark DataFrame.

    Args:
        spark: The SparkSession object.
        file_path: The path to the CSV file.  Can be a local path or a path
                   that your Spark environment can access (e.g., if you're
                   using a distributed file system like HDFS).
        header (bool, optional): Whether the CSV file has a header row. Defaults to True.
        inferSchema (bool, optional): Whether to infer the schema from the data. Defaults to True.
        **options: Additional options to pass to the Spark CSV reader.  See
                   the Spark documentation for available options like `delimiter`,
                   `quote`, `escape`, etc.

    Returns:
        A Spark DataFrame representing the CSV data, or None if there's an error.

    Raises:
       FileNotFoundError: If the file path doesn't exist.
    """
    try:
        df = spark.read.csv(file_path, header=header, inferSchema=False, schema=schema, **options).coalesce(30)
        return df
    except FileNotFoundError:
        print(f"Error: File not found at path: {file_path}")
        return None
    except Exception as e:  # Catch other potential exceptions (e.g., parsing errors)
        print(f"An error occurred while reading the CSV: {e}")
        return None
    
def pandas_read_csv(file_path,**options):
    """
        Read small volume of data only using read.csv
        Args:
            **Options ----> Any
    """
    try:
        df = pd.read_csv(file_path,**options)
        return df
    except FileNotFoundError:
        print(f"Error: File not found at path: {file_path}")
        return None
    except Exception as e:  # Catch other potential exceptions (e.g., parsing errors)
        print(f"An error occurred while reading the CSV: {e}")
        return None
    
def construct_sql_schema(**kwargs):
    """
        Args: kwargs path and sep -->>> Any
        this function is best practice to compute large amount of data to not reading schema metadata
        recommendation : 
    """

    fields = []
    type_mapping = {
        "varchar": StringType(),
        "nvarchar": StringType(),
        "int": IntegerType(),
        "bigint": LongType(),
        "date": DateType(),
        "decimal": DecimalType
    }

    df = pandas_read_csv(kwargs["path"],sep=kwargs["sep"])

    for row in df.itertuples():
        try:
            name, data_type_str = row.DataType.split("(", 1) if "(" in row.DataType else (row.DataType,"")
            name = name.strip()
            data_type_str = data_type_str[:-1].strip()
            parts = data_type_str.split(",")
            name_lower = name.lower()

            for keyword,spark_type in type_mapping.items():
                if keyword in name_lower:
                    if spark_type == DecimalType:
                        data_type = DecimalType() if not data_type_str else DecimalType(int(parts[0]),int(parts[1]))
                        fields.append(StructField(row.ColumnName, data_type, True))
                    else:
                        data_type = spark_type
                        fields.append(StructField(row.ColumnName, data_type, True))
                    break
        except Exception as e:  # Catch other potential errors
            print(f"Error processing file in construct schema {kwargs["path"]}: {e}")
            return None
    return StructType(fields)

def validateDecimal(**kwargs):
    df_contents = kwargs["df_contents"]
    is_valid = False
    errors = []
    dqcId = "DQ000001"
    for field in kwargs["dtypes"]:
        colName = field.name
        dType = str(field.dataType)
        if "decimal" in dType.lower():
            #print(colName)
            df_cleaned = df_contents.withColumn(
                f"{colName}_cleaned",
                regexp_replace(col(colName), "[^0-9.]", "")
            )
            df_empty = df_cleaned.filter((col(f"{colName}_cleaned")).isNull()) # is null due to schema defined as decimal if we use inferSchema its a string
            empty_count = df_empty.count()

            if empty_count > 0:
                is_valid = True
                error_msg = (f"Invalid {colName} values (containing only non-numeric characters). Total count: {empty_count}")
                errors.append(error_msg)
            df_contents = df_contents.drop(f"{colName}_cleaned") 
            
    if is_valid == False:        
        error_msg = "DDL Decimal/Int Data Type Structure Checks Passed."
        errors.append(error_msg)
    msg = "\n".join(errors) if errors else "Data Quality Checks Passed." # this is for breakdown into rows from array
    return is_valid, errors, df_contents, dqcId

def writeOptions(df, dataMovement, **kwargs):
    base_options = {  # Keep this separate
        "header": "true",
        "delimiter": "|",
        "quote": '"'
    }
    base_options.update(kwargs)

    # existing_columns = df.columns
    # columns_to_clone = len(existing_columns)

    # # for i in range(20):
    # #     if i < columns_to_clone:
    # #         original_col_name = existing_columns[i]
    # #         new_col_name = f"Cloned_{original_col_name}"
    # #         df = df.withColumn(new_col_name, col(original_col_name))
    # #     else:
    # #         print()
    # #         new_col_name = f"Cloned_{i + 1}"
    #         df = df.withColumn(new_col_name, lit("hardcoded contents for 250 columns"))

    #df.coalesce(50).write.parquet(dataMovement, mode="overwrite", compression="snappy")
    df.coalesce(1).write.format("csv").mode("overwrite").options(**base_options).save(dataMovement)

def writeToParquet(df, path):
    df = df.coalesce(50)
    df.write.parquet(path, mode="overwrite", compression="snappy")

def loadTable(**kwargs):
    spark.sql(f"DROP TABLE IF EXISTS spark_catalog.default.{kwargs['tableName']}") 
    spark.sql(f"""
    CREATE EXTERNAL TABLE {kwargs["tableName"]}
    USING CSV
    OPTIONS (
        header 'true',  -- If your CSV has a header row
        inferSchema 'false', -- Important: Set to false since we provide schema
        delimiter '|' -- Specify the delimiter if it's not a comma
    )
        LOCATION '{kwargs["path"]}'
    """)
    df = spark.sql(f"SELECT * FROM {kwargs["tableName"]}")
    return df


if __name__ == "__main__":
    path = "/mnt/apps/Files/Config/master_job.csv"
    pathSchema = "/mnt/apps/Files/Schema/"
    outputFile = "/mnt/apps/Files/data-movement/"
    parquetOutput = "/mnt/apps/Files/data-movement/Parquet/"
    dqcOutput = []

    spark = SparkSession. \
        builder. \
        appName("Testing"). \
        getOrCreate()

    df = pandas_read_csv(path,sep="|")
    df = df.query(f"JobName == 'Renova'")

    for row in df.itertuples():  # Collects all data to the driver - NOT recommended for large datasets
        filePath = row.SourceDirectory + '/' + row.FileName + '.' + row.FileType
        filePath = filePath.replace("/gcs", "/Files")
        dataMovement = outputFile + row.JobName + '/'
        dataMovementParquet = parquetOutput + row.JobName
        print(row.JobName)
        FullPathSchema = pathSchema + row.FileName + '.' + row.FileType
        #spark.stop()
        df_dtype = construct_sql_schema(path=FullPathSchema, sep="|")
        df = spark_read_csv_from_os(spark, filePath, schema=df_dtype, quote='"', sep=row.Delimiter)
        #df_load = loadTable(tableName=row.JobName, path=filePath, sep=row.Delimiter)
        result, dqc_msg, df_final, dqcId = validateDecimal(dtypes=df_dtype, df_contents=df)
        #df_final.show()
        #sql = spark.sql("SELECT * FROM PAT")
        #sql.show()
        df_count = df_final.count()
        #print(df.rdd.getNumPartitions()) # rdd create while read csv
        #df_count = df_final.count()
        #print(df.columns)
        #df.limit(10)
        #num_cores = spark.sparkContext.defaultParallelism
        #print(f"Type of num_cores: {type(num_cores)}") # Add this line!
        #num_partitions = num_cores * 1
        #print(f"Number of partitions: {num_partitions}")
        #partitioned_df = df.repartition(num_partitions)
        if result:
            print(dqc_msg)
            dqcOutput.append({"JobName":row.JobName, "Path":row.SourceDirectory, "dqID":dqcId, "CountRecords":df_count, "Message":dqc_msg, "Status":"Failed"})
        else:
            #writeOptions(df_final, dataMovement)
            #writeToParquet(df, dataMovementParquet)
            #df_load = loadTable(tableName=row.JobName, path=filePath, sep=row.Delimiter)
            #df_load.show()
            print(dqc_msg)
            dqcOutput.append({"JobName":row.JobName, "Path":row.SourceDirectory, "dqID":dqcId, "CountRecords":df_count, "Message":dqc_msg, "Status":"Successful"})
    print(dqcOutput)
    spark.stop()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/20 17:48:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Renova


                                                                                

['DDL Decimal/Int Data Type Structure Checks Passed.']
[{'JobName': 'Renova', 'Path': '/mnt/apps/gcs/ETL4/LAS', 'dqID': 'DQ000001', 'CountRecords': 2000000, 'Message': ['DDL Decimal/Int Data Type Structure Checks Passed.'], 'Status': 'Successful'}]


In [25]:
path = "/mnt/apps/Files/Config/master_job.csv"
df = pandas_read_csv(path,sep="|")
df = df.query(f"JobName == 'ACMVPF' | JobName =='RTRNPF'")
print(df)

sparkNew = SparkSession. \
        builder. \
        appName("Thread"). \
        getOrCreate()

tables = []
for row in df.itertuples():
    filePath = row.SourceDirectory + '/' + row.FileName + '.' + row.FileType
    filePath = filePath.replace("/gcs", "/Files")
    tables.append(filePath)

def loadTable(path):
    sc = path.split("/")[5]
    pathParquet = f"/mnt/apps/Files/data-movement/Parquet/{sc}"
    print(pathParquet)
    df_dtype = construct_sql_schema(path=FullPathSchema, sep="|")
    df = sparkNew.read.csv(path, header=True, inferSchema=False, schema=df_dtype, sep="|")
    writeToParquet(df, pathParquet)

print(tables)

  JobName            SourceDirectory FileName FileType  Flag Delimiter
1  ACMVPF  /mnt/apps/gcs/ETL4/ACMVPF   ACMVPF      csv     1         |
2  RTRNPF  /mnt/apps/gcs/ETL4/RTRNPF   RTRNPF      csv     1         |
['/mnt/apps/Files/ETL4/ACMVPF/ACMVPF.csv', '/mnt/apps/Files/ETL4/RTRNPF/RTRNPF.csv']


In [26]:
from threading import Thread
from queue import Queue

q = Queue()

workerCount = 2

def run_task(function, q):
    while not q.empty():
        value = q.get()
        function(value)
        q.task_done()

for table in tables:
    q.put(table)

for i in range(workerCount):
    t=Thread(target=run_task, args=(loadTable, q))
    t.daemon = True
    t.start()

print("running load")
q.join()
print("running completed")

/mnt/apps/Files/data-movement/Parquet/ACMVPF
/mnt/apps/Files/data-movement/Parquet/RTRNPF
running load


25/02/20 17:54:27 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/02/20 17:54:27 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/02/20 17:54:27 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/02/20 17:54:27 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/02/20 17:54:27 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
25/02/20 17:54:27 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 58.46% for 13 writers
25/02/20 17:54:27 WARN MemoryManager: Total allocation exceeds 95.

running completed


In [None]:
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name
import dask.dataframe as dd

#outputFile = "/mnt/apps/Files/ETL4/LAS/customers.csv"
outputFile = "/mnt/apps/Files/data-movement/Renova/part-00000*"
FullPathSchema = "/mnt/apps/Files/Schema/customers.csv"
ParquetPath = "/mnt/apps/Files/data-movement/Parquet/Renova/part-000*"
PdoutputFile = "/mnt/apps/Files/ETL4/LAS/customers.csv"

#df_dtype = construct_sql_schema(path=FullPathSchema, sep="|")
#print(df_dtype)

spark = SparkSession.builder.appName("TestReadCSV").getOrCreate()

#CSV reader
#df = spark.read.csv(outputFile, sep="|", header=True, schema=df_dtype, inferSchema=False).repartition(10)

# df_with_filename = df.withColumn("filename", input_file_name())
# null_count = df_with_filename.filter(col("Index").isNotNull())
# df_count = null_count.count()
# print(df_count)
# #null_count.limit(10).show(truncate=False)
# null_count.createOrReplaceTempView("readCSV")
# #df_parquet_count = null_count_parquet.count()
# #print(df_parquet_count)
# null_count = spark.sql(
#     """
#     SELECT country, cnt, sum(cnt) over () total_all
#     FROM (
#     SELECT Country, COUNT(1) AS cnt
#         FROM readCSV
#         GROUP BY Country
#     ) Z
# """)
# print(null_count.count())
#df.show(n=5, truncate=False) #default 20

#parquet part
# df_read_parquet = spark.read.parquet(outputFile, schema=df_dtype)
# null_count_parquet = df_read_parquet.filter(col("Index").isNotNull())
# null_count_parquet.createOrReplaceTempView("my_parquet_table")
# #df_parquet_count = null_count_parquet.count()
# #print(df_parquet_count)
# null_count_parquet = spark.sql(
#     """
#     SELECT country, cnt, sum(cnt) over () total_all
#     FROM (
#     SELECT Country, COUNT(1) AS cnt
#         FROM my_parquet_table
#         GROUP BY Country
#     ) Z
# """)
# print(null_count_parquet.count())
# null_count_parquet.show(n=5, truncate=False) #default 20

spark.stop()


In [None]:
list_of = []
for i in range(240):
    list_of.append(f"Cloned_{i + 1}|Varchar(100)")
   
df = pd.DataFrame(list_of, columns=["ColumnName"])
df.head(n=250)

df.to_csv("/mnt/apps/Files/NewSChema/new_schema_cust.csv",header=True, index=False)

In [None]:
path = "/mnt/apps/Files/Config/master_job.csv"
pathSchema = "/mnt/apps/Files/Schema/"

df = pandas_read_csv(path,sep="|")
df = df.query("JobName == 'PAT'")

for row in df.itertuples():  # Collects all data to the driver - NOT recommended for large datasets
    filePath = row.SourceDirectory + '/' + row.FileName + '.' + row.FileType
    FullPathSchema = pathSchema + row.FileName + '.' + row.FileType
    spark = SparkSession.builder.appName(f"{row.JobName}").getOrCreate()
    df_dtype = construct_sql_schema(path=FullPathSchema, sep="|")
    df = spark_read_csv_from_os(spark, filePath, schema=df_dtype, quote='"', sep="|")
    # Handle the error, e.g., skip the file, log the error, etc.
    df.show()
"""     result, dqc_msg, df_final = validateDecimal(dtypes=df_dtype, df_contents=df)
    df_final.show()
    if result:
        print(dqc_msg)
    else:
        print(dqc_msg) """
spark.stop()


In [None]:
path = "/mnt/apps/Files/Config/master_job.csv"

df = pandas_read_csv(path,sep="|")
df = df.query("Flag == 1")
df.head()


In [None]:
from pyspark.sql.types import *

path = "/mnt/apps/Files/Schema/etl4pat.csv"
fields = []
type_mapping = {
    "varchar": VarcharType,
    "nvarchar": VarcharType,
    "int": IntegerType(),
    "bigint": LongType(),
    "date": DateType(),
    "decimal": DecimalType
}

df = pandas_read_csv(path,sep="|")
print(df)

for row in df.itertuples():
    name, data_type_str = row.DataType.split("(", 1) if "(" in row.DataType else (row.DataType,"")
    name = name.strip()
    data_type_str = data_type_str[:-1].strip()
    parts = data_type_str.split(",")
    name_lower = name.lower()
    print(data_type_str)

    for keyword,spark_type in type_mapping.items():
        if keyword in name_lower:
            if spark_type == VarcharType:
                data_type = VarcharType(4000) if data_type_str == "MAX" else VarcharType(int(data_type_str))
                fields.append(StructField(row.ColumnName, data_type, True))
            elif spark_type == DecimalType:
                data_type = DecimalType() if not data_type_str else DecimalType(int(parts[0]),int(parts[1]))
                fields.append(StructField(row.ColumnName, data_type, True))
            else:
                data_type = spark_type
                fields.append(StructField(row.ColumnName, data_type, True))
            break

print(fields)


In [None]:
data_type_str = "Decimal(2,0)"
data_type, *args = data_type_str.split(")")

print(f"data_type: {data_type}")  # Output: data_type: Decimal(2,0
print(f"args: {args}")          # Output: args: ['']

data_type_str = "VARCHAR"
data_type, *args = data_type_str.split(")")

print(f"data_type: {data_type}")  # Output: data_type: VARCHAR
print(f"args: {args}")          # Output: args: []

In [None]:
data = "A,2"

print(len(data))

In [None]:
path = "/mnt/apps/Files/Schema/etl4pat.csv"
test = construct_sql_schema(path=path,sep="|")
print(test)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *  # Import data types for clarity

# Create a SparkSession (if you don't have one already)
spark = SparkSession.builder.appName("DataTypeExample").getOrCreate()

# Sample data (replace with your actual data)
data = [("Alice", 25, 2000.00), ("Bob", 30, 2000.00), ("Charlie", 22, 2000.00)]

# Define the schema explicitly (best practice)
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("height", DecimalType(2,0), True)
])

df = spark.createDataFrame(data, schema=schema)

# Display the schema
df.printSchema()

# Stop the SparkSession (good practice)
spark.stop()

In [None]:
data = "/mnt/apps/gcs/ETL4/CONFIG/etl4pat*.csv"



In [None]:
from pyspark.sql import SparkSession

def writeOptions(df, path, **kwargs):
    base_options = {  # Keep this separate
        "header": "true",
        "delimiter": "|",
        "quote": '"',
        "mode":"overwrite",
        "format":"csv"
    }

    # Correct way to merge options:
    all_options = base_options.copy()  # Create a copy to avoid modifying base_options
    all_options.update(kwargs)       # Add or overwrite kwargs
    print(all_options)

    df.write.options(**all_options).save(path)

# Example usage (important: complete example):
spark = SparkSession.builder.appName("Example").getOrCreate()
data = [("Alice", 25), ("Bob", 30)]
df = spark.createDataFrame(data, ["name", "age"])

dataMovement = "path/to/save.csv" # Or your actual path

writeOptions(df, dataMovement)  # Now works correctly

spark.stop()  # Don't forget to stop the SparkSession