# Databricks Practice

#### Read CSV file in Spark - 1. Fisrt uploaded data in table. New -> Table -> Uploaded Data

In [0]:
df_flight=spark.read.format("csv")\
        .option("header", "false")\
            .option("inferschema", "false")\
                .option("mode", "FAILFAST")\
                    .load("/FileStore/tables/flight.csv")

df_flight.show(5)

+-----------------+-------------------+-----+
|              _c0|                _c1|  _c2|
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
df_flight_new=spark.read.format("csv")\
        .option("header", "true")\
            .option("inferschema", "false")\
                .option("mode", "FAILFAST")\
                    .load("/FileStore/tables/flight.csv")

df_flight_new.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
df_flight_new.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: string (nullable = true)



In [0]:
df_flight_new_schema=spark.read.format("csv")\
        .option("header", "true")\
            .option("inferschema", "true")\
                .option("mode", "FAILFAST")\
                    .load("/FileStore/tables/flight.csv")

df_flight_new_schema.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
df_flight_new_schema.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



#### Define Manual Schema

In [0]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType

In [0]:
my_schema=StructType([
    StructField("DEST_COUNTRY_NAME",StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME",StringType(), True),
    StructField("count",IntegerType(), True)
])

In [0]:
df_flight_new_schema_new=spark.read.format("csv")\
        .option("header", "false")\
            .option("skipRows",1)\
            .option("inferschema", "false")\
                .schema(my_schema)\
                .option("mode", "PERMISSIVE")\
                    .load("/FileStore/tables/flight.csv")

df_flight_new_schema_new.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
%fs
ls /FileStore/tables/

path,name,size,modificationTime
dbfs:/FileStore/tables/flight.csv,flight.csv,7121,1736400722000


#### Corrupted Record Handling - 

In [0]:
df_employee=spark.read.format("csv")\
        .option("header", "true")\
            .option("inferschema", "true")\
                .option("mode", "PERMISSIVE")\
                    .load("/FileStore/tables/employee-1.csv")

df_employee.show(5)

+---+----------+---+------+-----------------+-------+
| id|      name|age|salary|          address|nominee|
+---+----------+---+------+-----------------+-------+
|  1|    Manish| 26| 75000|            Bihar|   nom1|
|  2|    Nikita| 27| 45800|               UP|   nom2|
|  3|    Pritam| 24| 45720|Bangalore,Kolkata|   nom3|
|  4|Prathamesh| 22|789651|    India,Kolkata|   nom4|
|  5|      Roni| 21| 30000|             null|   nom5|
+---+----------+---+------+-----------------+-------+



In [0]:
df_employee=spark.read.format("csv")\
        .option("header", "true")\
            .option("inferschema", "true")\
                .option("mode", "DROPMALFORMED")\
                    .load("/FileStore/tables/employee-1.csv")

df_employee.show(5)

+---+----------+---+------+-----------------+-------+
| id|      name|age|salary|          address|nominee|
+---+----------+---+------+-----------------+-------+
|  1|    Manish| 26| 75000|            Bihar|   nom1|
|  2|    Nikita| 27| 45800|               UP|   nom2|
|  3|    Pritam| 24| 45720|Bangalore,Kolkata|   nom3|
|  4|Prathamesh| 22|789651|    India,Kolkata|   nom4|
|  5|      Roni| 21| 30000|             null|   nom5|
+---+----------+---+------+-----------------+-------+



In [0]:
df_employee=spark.read.format("csv")\
        .option("header", "true")\
            .option("inferschema", "true")\
                .option("mode", "FAILFAST")\
                    .load("/FileStore/tables/employee-1.csv")

df_employee.show(5)

+---+----------+---+------+-----------------+-------+
| id|      name|age|salary|          address|nominee|
+---+----------+---+------+-----------------+-------+
|  1|    Manish| 26| 75000|            Bihar|   nom1|
|  2|    Nikita| 27| 45800|               UP|   nom2|
|  3|    Pritam| 24| 45720|Bangalore,Kolkata|   nom3|
|  4|Prathamesh| 22|789651|    India,Kolkata|   nom4|
|  5|      Roni| 21| 30000|             null|   nom5|
+---+----------+---+------+-----------------+-------+



In [0]:
from pyspark.sql.types import StringType,StructType,StructField,IntegerType

In [0]:
emp_schema=StructType([
    StructField("id",IntegerType(), True),
    StructField("name",StringType(), True),
    StructField("age",IntegerType(), True),
	StructField("salary",IntegerType(), False),
	StructField("address",StringType(), True),
	StructField("_corrupt_record",StringType(), True)
])

In [0]:
df_employee=spark.read.format("csv")\
        .option("header", "true")\
            .option("inferschema", "true")\
                .option("mode", "PERMISSIVE")\
                    .schema(emp_schema)\
                    .load("/FileStore/tables/employee-1.csv")

df_employee.show(truncate=False)

+---+----------+---+------+-----------------+-------------------------------------------+
|id |name      |age|salary|address          |_corrupt_record                            |
+---+----------+---+------+-----------------+-------------------------------------------+
|1  |Manish    |26 |75000 |Bihar            |1,Manish,26,75000,Bihar,nom1               |
|2  |Nikita    |27 |45800 |UP               |2,Nikita,27,45800,UP,nom2                  |
|3  |Pritam    |24 |45720 |Bangalore,Kolkata|3,Pritam,24,45720,"Bangalore,Kolkata",nom3 |
|4  |Prathamesh|22 |789651|India,Kolkata    |4,Prathamesh,22,789651,"India,Kolkata",nom4|
|5  |Roni      |21 |30000 |null             |5,Roni,21,30000,,nom5                      |
+---+----------+---+------+-----------------+-------------------------------------------+



#### Store Corrupted Record -

In [0]:
df_employee=spark.read.format("csv")\
        .option("header", "true")\
            .option("inferschema", "true")\
                    .option("badRecordsPath","/FileStore/tables/bad_records")\
                    .load("/FileStore/tables/employee-1.csv")

df_employee.show()

+---+----------+---+------+-----------------+-------+
| id|      name|age|salary|          address|nominee|
+---+----------+---+------+-----------------+-------+
|  1|    Manish| 26| 75000|            Bihar|   nom1|
|  2|    Nikita| 27| 45800|               UP|   nom2|
|  3|    Pritam| 24| 45720|Bangalore,Kolkata|   nom3|
|  4|Prathamesh| 22|789651|    India,Kolkata|   nom4|
|  5|      Roni| 21| 30000|             null|   nom5|
+---+----------+---+------+-----------------+-------+



#### bad records will store in json format. For this case no cottupted record thats why no directory created.

In [0]:
%fs
ls /FileStore/tables/

path,name,size,modificationTime
dbfs:/FileStore/tables/employee-1.csv,employee-1.csv,208,1736482832000
dbfs:/FileStore/tables/employee.csv,employee.csv,247,1736482562000
dbfs:/FileStore/tables/flight-1.csv,flight-1.csv,7121,1736482562000
dbfs:/FileStore/tables/flight.csv,flight.csv,7121,1736400722000
