
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
# File location and type
file_location = "/FileStore/tables/flight_data.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,1
United States,Ireland,264
United States,India,69
Egypt,United States,24
Equatorial Guinea,United States,1
United States,Singapore,25
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44


In [0]:
df1 = spark.read.format("csv")\
    .option("inferSchema", "false") \
  .option("header", "false") \
  .option("mode", "FAILFAST") \
  .load("/FileStore/tables/flight_data.csv")

df1.show(2)

+-----------------+-------------------+-----+
|              _c0|                _c1|  _c2|
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
|    United States|            Romania|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [0]:
# Create a view or table

temp_table_name = "flight_data_csv"

df.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

/* Query the created temp table in a SQL cell */

select * from `flight_data_csv`

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,1
United States,Ireland,264
United States,India,69
Egypt,United States,24
Equatorial Guinea,United States,1
United States,Singapore,25
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44


In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "flight_data_csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)

In [0]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: string (nullable = true)



In [0]:
df1 = spark.read.format(file_type) \
  .option("inferSchema", "true") \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df1)

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,1
United States,Ireland,264
United States,India,69
Egypt,United States,24
Equatorial Guinea,United States,1
United States,Singapore,25
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44


In [0]:
df1.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



In [0]:
import pyspark

In [0]:
from pyspark.sql.types import StructType, StructField, StringType,IntegerType 

In [0]:
#manual schema

my_schema = StructType([
    StructField("DEST_COUNTRY_NAME",StringType(),True),
    StructField("ORIGIN_COUNTRY_NAME",StringType(),True),
    StructField("count",IntegerType(),True)
])

In [0]:

df1 = spark.read.format(file_type) \
  .option("inferSchema", "false") \
  .schema(my_schema)\
  .option("header", "false") \
  .option('skipRows',1)\
  .option("mode", "PERMISSIVE") \
  .load(file_location)
#changed from 'FAILFAST' #coz its giving error to read count column null as integer
df1.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
# CURRUPTED RECORDS
emp_df = spark.read.format("csv")\
    .option("header","true")\
    .option("inferschema","true")\
    .option("mode","PERMISSIVE")\
    .load("/FileStore/tables/employee.csv")

emp_df.show()
# in permissive it show all records

+---+--------+---+------+------------+--------+
| id|    name|age|salary|     address| nominee|
+---+--------+---+------+------------+--------+
|  1|  Manish| 26| 75000|       bihar|nominee1|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|
|  3|  Pritam| 22|150000|   Bangalore|   India|
|  4|Prantosh| 17|200000|     Kolkata|   India|
|  5|  Vikash| 31|300000|        null|nominee5|
+---+--------+---+------+------------+--------+



In [0]:
# CURRUPTED RECORDS
emp_df = spark.read.format("csv")\
    .option("header","true")\
    .option("inferschema","true")\
    .option("mode","DROPMALFORMED")\
    .load("/FileStore/tables/employee.csv")

emp_df.show()

+---+------+---+------+------------+--------+
| id|  name|age|salary|     address| nominee|
+---+------+---+------+------------+--------+
|  1|Manish| 26| 75000|       bihar|nominee1|
|  2|Nikita| 23|100000|uttarpradesh|nominee2|
|  5|Vikash| 31|300000|        null|nominee5|
+---+------+---+------+------------+--------+



In [0]:
# # CURRUPTED RECORDS
# emp_df = spark.read.format("csv")\
#     .option("header","true")\
#     .option("inferschema","true")\
#     .option("mode","FAILFAST")\
#     .load("/FileStore/tables/employee.csv")

# emp_df.show()
# #in failfast shows error

In [0]:
# How to print bad records

from pyspark.sql.types import StringType, StructField, StructType, IntegerType


In [0]:
emp_schema = StructType(
    [
        StructField("id",IntegerType(),True),
        StructField("name",StringType(),True),
        StructField("age",IntegerType(),True),
        StructField("salary",IntegerType(),True),
        StructField("address",StringType(),True),
        StructField("nominee",StringType(),True),
        StructField("_currupt_record",StringType(),True)
    ]
)

In [0]:
emp_df1 = spark.read.format("csv")\
    .option("header","true")\
    .option("inferschema","true")\
    .option("mode","PERMISSIVE")\
    .schema(emp_schema)\
    .load("/FileStore/tables/employee.csv")

emp_df1.show(truncate = False)

+---+--------+---+------+------------+--------+---------------+
|id |name    |age|salary|address     |nominee |_currupt_record|
+---+--------+---+------+------------+--------+---------------+
|1  |Manish  |26 |75000 |bihar       |nominee1|null           |
|2  |Nikita  |23 |100000|uttarpradesh|nominee2|null           |
|3  |Pritam  |22 |150000|Bangalore   |India   |nominee3       |
|4  |Prantosh|17 |200000|Kolkata     |India   |nominee4       |
|5  |Vikash  |31 |300000|null        |nominee5|null           |
+---+--------+---+------+------------+--------+---------------+



In [0]:
%fs
ls /FileStore/tables/

path,name,size,modificationTime
dbfs:/FileStore/tables/Bad_records/,Bad_records/,0,0
dbfs:/FileStore/tables/employee.csv/,employee.csv/,0,0
dbfs:/FileStore/tables/flight_data.csv,flight_data.csv,7121,1696146457000


In [0]:
#to create new file of bad records
emp_df1 = spark.read.format("csv")\
    .option("header","true")\
    .option("inferschema","true")\
    .schema(emp_schema)\
    .option("badRecordsPath","/FileStore/tables/Bad_records")\
    .load("/FileStore/tables/employee.csv")

emp_df1.show(truncate = False)

+---+--------+---+------+---------+-------+---------------+
|id |name    |age|salary|address  |nominee|_currupt_record|
+---+--------+---+------+---------+-------+---------------+
|3  |Pritam  |22 |150000|Bangalore|India  |nominee3       |
|4  |Prantosh|17 |200000|Kolkata  |India  |nominee4       |
+---+--------+---+------+---------+-------+---------------+



In [0]:
%fs
ls /FileStore/tables/Bad_records/20231001T144109/bad_records/

path,name,size,modificationTime
dbfs:/FileStore/tables/Bad_records/20231001T144109/bad_records/part-00000-29fdc237-0ca0-4063-bba4-58f2e5f6f49a,part-00000-29fdc237-0ca0-4063-bba4-58f2e5f6f49a,707,1696171272000


In [0]:
# to read that new stored record file

bd = spark.read.format("json")\
    .load("/FileStore/tables/Bad_records/20231001T144109/bad_records/")

bd.show()

+--------------------+--------------------+--------------------+
|                path|              reason|              record|
+--------------------+--------------------+--------------------+
|dbfs:/FileStore/t...|org.apache.spark....|1,Manish,26,75000...|
|dbfs:/FileStore/t...|org.apache.spark....|2,Nikita,23,10000...|
|dbfs:/FileStore/t...|org.apache.spark....|5,Vikash,31,30000...|
+--------------------+--------------------+--------------------+



In [0]:
# line delimited json file
spark.read.format("json")\
    .option("inferSchema","true")\
    .option("mode","PERMISSIVE")\
    .load("/FileStore/tables/line_delimited_json.json")\
    .show()

+---+--------+------+
|age|    name|salary|
+---+--------+------+
| 20|  Manish| 20000|
| 25|  Nikita| 21000|
| 16|  Pritam| 22000|
| 35|Prantosh| 25000|
| 67|  Vikash| 40000|
+---+--------+------+



In [0]:
# line delimited json file with extra field
spark.read.format("json")\
    .option("inferSchema","true")\
    .option("mode","PERMISSIVE")\
    .load("/FileStore/tables/single_file_json_with_extra_fields.json")\
    .show()

+---+------+--------+------+
|age|gender|    name|salary|
+---+------+--------+------+
| 20|  null|  Manish| 20000|
| 25|  null|  Nikita| 21000|
| 16|  null|  Pritam| 22000|
| 35|  null|Prantosh| 25000|
| 67|     M|  Vikash| 40000|
+---+------+--------+------+

