
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
# File location and type
file_location = "/FileStore/tables/emp.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

_c0,_c1,_c2
id,name,age
1,Alice,25
2,Bob,30
3,Charlie,not_a_number
4,David,45
invalid_row,,
5,Emma,50


In [0]:
# Create a view or table

temp_table_name = "emp_csv"

df.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

/* Query the created temp table in a SQL cell */

select * from `emp_csv`

_c0,_c1,_c2
id,name,age
1,Alice,25
2,Bob,30
3,Charlie,not_a_number
4,David,45
invalid_row,,
5,Emma,50


In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "emp_csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
 
spark = SparkSession.builder.appName("Read").getOrCreate()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

my_schema = StructType([StructField("id",IntegerType(),True),StructField("name",StringType(),True),StructField("age",IntegerType(),True)])
my_schema2 = StructType([StructField("id",IntegerType(),True),StructField("name",StringType(),True),StructField("age",IntegerType(),True),StructField("_corrupt_record",StringType(),True)])

In [0]:
# read the csv file
#mode = dropmalformed for clearing all the invalid data

df = spark.read.format("csv").schema(my_schema).option("header",True).option("mode","FAILFAST").load('/FileStore/tables/emp.csv')

df.display()
df.printSchema()

Interview Questions

Do you know different reading modes?

What are they?

have you worked with corrupted records?

When do u say a record is corrupted?

What happens to bad data/ corrup data in different read modes?

How can we print bad records?

How you can store bad records and handle it later efficiently?

In [0]:
# read the csv file

df_permissive = spark.read.format("csv").schema(my_schema).option("header",True).option("mode","PERMISSIVE").load('/FileStore/tables/emp.csv')

df_permissive.display()
df_permissive.printSchema()

id,name,age
1.0,Alice,25.0
2.0,Bob,30.0
3.0,Charlie,
4.0,David,45.0
,,
5.0,Emma,50.0


root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



In [0]:
# read the csv file
#mode = dropmalformed for clearing all the invalid data

df_dropmalformed = spark.read.format("csv").schema(my_schema).option("header",True).option("mode","DROPMALFORMED").load('/FileStore/tables/emp.csv')

df_dropmalformed.display()
df_dropmalformed.printSchema()

id,name,age
1,Alice,25
2,Bob,30
4,David,45
5,Emma,50


root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



In [0]:
# read the csv file
#mode = failfast will not execute the process if it detects any invalid data

df_failfast = spark.read.format("csv").schema(my_schema).option("header",True).option("mode","FAILFAST").load('/FileStore/tables/emp.csv')

df_failfast.display()
df_failfast.printSchema()

Displaying the corrupt record table

In [0]:
# read the csv file
#mode = dropmalformed for clearing all the invalid data

df = spark.read.format("csv").schema(my_schema2).option("header",True).option("badRecordsPath","/FileStore/tables/bad_records").load('/FileStore/tables/emp.csv')

df.display()
df.printSchema()

id,name,age,_corrupt_record
1,Alice,25,
2,Bob,30,
4,David,45,
5,Emma,50,


root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- _corrupt_record: string (nullable = true)



In [0]:
dbutils.fs.ls("/FileStore/tables/bad_records/20241126T031207/bad_records")

Out[37]: [FileInfo(path='dbfs:/FileStore/tables/bad_records/20241126T031207/bad_records/part-00000-9c265408-34ba-4edc-b441-368038f00e14', name='part-00000-9c265408-34ba-4edc-b441-368038f00e14', size=328, modificationTime=1732590729000)]

In [0]:
df_bad_data_read = spark.read.format("json").load("/FileStore/tables/bad_records/20241126T031207/bad_records")

df_bad_data_read.display()

path,reason,record
dbfs:/FileStore/tables/emp.csv,"java.lang.NumberFormatException: For input string: ""not_a_number""","3,Charlie,not_a_number"
dbfs:/FileStore/tables/emp.csv,org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: invalid_row,invalid_row
