In [0]:
df= spark.read.format("csv").option("header","true").option("inferschema","true").load("/FileStore/files/emp_test-1.csv")
display(df) # Read a CSV file into a Spark DataFrame 'df'.
# - .format("csv"): specifies the file format as CSV
# - .option("header", "true"): treat the first row as column headers
# - .option("inferschema", "true"): automatically infer the data types of columns
# - .load("/FileStore/files/emp_test-1.csv"): path to the CSV file in Databricks FileStore
# Display the DataFrame in an interactive tabular view in Databricks.

Month,EmpID,EmpCount,ProductionUnit,Expenses
January,1,78,3245.76,1125.43
February,2,93,4678.24,1458.21
March,3,57,1987.91,892.34
April,4,65,3562.51,1234.78
May,5,82,2894.37,987.65
June,6,74,1245.89,765.43
July,7,68,3987.32,hi
August,8,89,2312.45,1100.56
September,9,73,1789.34,800.21
October,10,96,4321.76,1650.32


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType # Import necessary PySpark SQL types for defining a schema.

schema = StructType([
    StructField("Month", StringType(), True),
    StructField("EmpID", IntegerType(), True),
    StructField("EmpCount", IntegerType(), True),
    StructField("ProductionUnit", FloatType(), True),
    StructField("Expenses", FloatType(), True),  # Change from IntegerType() to FloatType()
    StructField("_corrupt_record", StringType(), True),  # Change from FloatType() to StringType()
]) # Define a custom schema for a DataFrame using StructType and StructField.
# - 'Month': StringType, allows nulls
# - 'EmpID': IntegerType, allows nulls
# - 'EmpCount': IntegerType, allows nulls
# - 'ProductionUnit': FloatType, allows nulls
# - 'Expenses': FloatType, allows nulls (previously IntegerType)
# - '_corrupt_record': StringType, allows nulls (previously FloatType) to handle corrupt or malformed records

In [0]:
df = spark.read.format("csv").option("mode","PERMISSIVE").option("header","true").schema(schema).load("/FileStore/files/emp_test-1.csv")
display(df) # Read a CSV file into a Spark DataFrame 'df' with a predefined schema.
# - .format("csv"): specifies that the file format is CSV
# - .option("mode", "PERMISSIVE"): allows Spark to load corrupt or malformed records into the '_corrupt_record' column instead of failing
# - .option("header", "true"): treat the first row as column headers
# - .schema(schema): apply the custom schema defined earlier
# - .load("/FileStore/files/emp_test-1.csv"): path to the CSV file in Databricks FileStore
# Display the DataFrame in an interactive tabular view in Databricks.

Month,EmpID,EmpCount,ProductionUnit,Expenses,_corrupt_record
January,1,78,3245.76,1125.43,
February,2,93,4678.24,1458.21,
March,3,57,1987.91,892.34,
April,4,65,3562.51,1234.78,
May,5,82,2894.37,987.65,
June,6,74,1245.89,765.43,
July,7,68,3987.32,,"July,7,68,3987.32,hi"
August,8,89,2312.45,1100.56,
September,9,73,1789.34,800.21,
October,10,96,4321.76,1650.32,


In [0]:
df = spark.read.format("csv").option("mode","DROPMALFORMED").option("header","true").schema(schema).load("/FileStore/files/emp_test-1.csv")
display(df) # Read a CSV file into a Spark DataFrame 'df' with a predefined schema, dropping malformed records.
# - .format("csv"): specifies that the file format is CSV
# - .option("mode", "DROPMALFORMED"): skip any rows that do not match the schema instead of loading them
# - .option("header", "true"): treat the first row as column headers
# - .schema(schema): apply the custom schema defined earlier
# - .load("/FileStore/files/emp_test-1.csv"): path to the CSV file in Databricks FileStore
# Display the DataFrame in an interactive tabular view in Databricks.

Month,EmpID,EmpCount,ProductionUnit,Expenses,_corrupt_record
January,1,78,3245.76,1125.43,
February,2,93,4678.24,1458.21,
March,3,57,1987.91,892.34,
April,4,65,3562.51,1234.78,
May,5,82,2894.37,987.65,
June,6,74,1245.89,765.43,
August,8,89,2312.45,1100.56,
September,9,73,1789.34,800.21,
October,10,96,4321.76,1650.32,
November,11,53,1345.78,700.43,
