##Reading a csv file

In [0]:
spark

In [0]:
flight_df = spark.read.format("csv")\
            .option("header","false")\
            .option("inferschema","false")\
            .option("mode","FAILFAST")\
            .load("/FileStore/tables/2010_summary.csv")

In [0]:
flight_df.show(5)

+-----------------+-------------------+-----+
|              _c0|                _c1|  _c2|
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
flight_df1 = spark.read.format("csv")\
            .option("header","true")\
            .option("inferschema","false")\
            .option("mode","FAILFAST")\
            .load("/FileStore/tables/2010_summary.csv")

In [0]:
flight_df1.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
flight_df1.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: string (nullable = true)



In [0]:
# But count should be in INT type

#so we just put inferschema is true, spark autmatically read the and assign the appropriate data type to our dataset.

flight_df2 = spark.read.format("csv")\
            .option("header","true")\
            .option("inferschema","true")\
            .option("mode","FAILFAST")\
            .load("/FileStore/tables/2010_summary.csv")


In [0]:
flight_df2.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



#Reading the file by defining schema 

In [0]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType

In [0]:
my_schema = StructType([ StructField("DEST_COUNTRY_NAME",StringType(), True),
                         StructField("ORIGIN_COUNTRY_NAME",StringType(), True),
                         StructField("count",IntegerType(), True)
                        ])

In [0]:

flight_df_schema = spark.read.format("csv")\
            .option("header","false")\
            .option("inferschema","false")\
            .schema(my_schema)\
            .option("mode","PERMISSIVE")\
            .load("/FileStore/tables/2010_summary.csv")

flight_df_schema.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| null|
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:

flight_df_schema = spark.read.format("csv")\
            .option("header","false")\
            .option("inferschema","false")\
            .option("skipRows",1)\
            .schema(my_schema)\
            .option("mode","PERMISSIVE")\
            .load("/FileStore/tables/2010_summary.csv")

flight_df_schema.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



#Handling Corrupted Data

In [0]:
emp_df = spark.read.format("csv")\
            .option("header","true")\
            .option("inferschema","true")\
            .option("mode","FAILFAST")\
            .load("/FileStore/tables/employee.csv")
    
emp_df.show(5)


[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-170963324258917>:7[0m
[1;32m      1[0m emp_df [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mcsv[39m[38;5;124m"[39m)\
[1;32m      2[0m             [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mheader[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)\
[1;32m      3[0m             [38;5;241m.[39moption([38;5;124m"[39m[38;5;124minferschema[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)\
[1;32m      4[0m             [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mmode[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mFAILFAST[39m[38;5;124m"[39m)\
[1;32m      5[0m             [38;5;241m.[39mload([38;5;124m"[39m[38;5;124m/FileStore/tables/employee.csv[39m[

In [0]:
emp_df = spark.read.format("csv")\
            .option("header","true")\
            .option("inferschema","true")\
            .option("mode","DROPMALFORMED")\
            .load("/FileStore/tables/employee.csv")
    
emp_df.show(5)


+---+------+---+------+------------+--------+
| id|  name|age|salary|     address| nominee|
+---+------+---+------+------------+--------+
|  1|Manish| 26| 75000|       bihar|nominee1|
|  2|Nikita| 23|100000|uttarpradesh|nominee2|
|  5|Vikash| 31|300000|        null|nominee5|
+---+------+---+------+------------+--------+



In [0]:
emp_df = spark.read.format("csv")\
            .option("header","true")\
            .option("inferschema","true")\
            .option("mode","PERMISSIVE")\
            .load("/FileStore/tables/employee.csv")
    
emp_df.show(5)

+---+--------+---+------+------------+--------+
| id|    name|age|salary|     address| nominee|
+---+--------+---+------+------------+--------+
|  1|  Manish| 26| 75000|       bihar|nominee1|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|
|  3|  Pritam| 22|150000|   Bangalore|   India|
|  4|Prantosh| 17|200000|     Kolkata|   India|
|  5|  Vikash| 31|300000|        null|nominee5|
+---+--------+---+------+------------+--------+



In [0]:
%fs ls /FileStore/tables/

path,name,size,modificationTime
dbfs:/FileStore/tables/2010_summary.csv,2010_summary.csv,7121,1716379200000
dbfs:/FileStore/tables/employee.csv,employee.csv,236,1716443570000
dbfs:/FileStore/tables/users_6M0xxK_2024_csv.csv,users_6M0xxK_2024_csv.csv,1738911,1714723094000


In [0]:
emp_schema = StructType([ StructField("id",IntegerType(), True),
                         StructField("name",StringType(), True),
                         StructField("age",IntegerType(), True),
                         StructField("salary",IntegerType(), True),
                         StructField("address",StringType(), True),
                         StructField("nominee",StringType(), True),
                         StructField("Corrupted_record",StringType(), True)


])

In [0]:
employee_df = spark.read.format("csv")\
            .option("header","true")\
            .option("inferschema","false")\
            .option("mode","PERMISSIVE")\
            .schema(emp_schema)\
            .load("/FileStore/tables/employee.csv")
    
employee_df.show()

+---+--------+---+------+------------+--------+----------------+
| id|    name|age|salary|     address| nominee|Corrupted_record|
+---+--------+---+------+------------+--------+----------------+
|  1|  Manish| 26| 75000|       bihar|nominee1|            null|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|            null|
|  3|  Pritam| 22|150000|   Bangalore|   India|        nominee3|
|  4|Prantosh| 17|200000|     Kolkata|   India|        nominee4|
|  5|  Vikash| 31|300000|        null|nominee5|            null|
+---+--------+---+------+------------+--------+----------------+



this command save the badrecords in the given path:

.option("badRecordsPath", "/FileStore/tables/badRecordsPath")   

In [0]:
employee_df = spark.read.format("csv")\
            .option("header", "true")\
            .option("inferschema", "false")\
            .option("mode", "PERMISSIVE")\
            .option("badRecordsPath", "/FileStore/tables/badRecordsPath")\
            .schema(emp_schema)\
            .load("/FileStore/tables/employee.csv")

#Reading JSON


In [0]:
spark.read.format("json")\
            .option("header","true")\
            .option("inferschema","true")\
            .option("mode","PERMISSIVE")\
            .load("/FileStore/tables/linedelimited.json").show()
    

+---+--------+------+
|age|    name|salary|
+---+--------+------+
| 20|  Manish| 20000|
| 25|  Nikita| 21000|
| 16|  Pritam| 22000|
| 35|Prantosh| 25000|
| 67|  Vikash| 40000|
+---+--------+------+



reading multiline json

In [0]:
spark.read.format("json")\
            .option("header","true")\
            .option("inferschema","true")\
            .option("mode","PERMISSIVE")\
            .load("/FileStore/tables/multilinejson.json").show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-3754063399767911>:1[0m
[0;32m----> 1[0m [43mspark[49m[38;5;241;43m.[39;49m[43mread[49m[38;5;241;43m.[39;49m[43mformat[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mjson[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m\[49m
[1;32m      2[0m [43m            [49m[38;5;241;43m.[39;49m[43moption[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mheader[39;49m[38;5;124;43m"[39;49m[43m,[49m[38;5;124;43m"[39;49m[38;5;124;43mtrue[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m\[49m
[1;32m      3[0m [43m            [49m[38;5;241;43m.[39;49m[43moption[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43minferschema[39;49m[38;5;124;43m"[39;49m[43m,[49m[38;5;124;43m"[39;49m[38;5;124;43mtrue[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m\[49m
[1;32m      4[0m

We have to enable the option ("MULTILINE", "True")

In [0]:
spark.read.format("json")\
            .option("header","true")\
            .option("inferschema","true")\
            .option("mode","PERMISSIVE")\
            .option("multiline", "true")\
            .load("/FileStore/tables/multilinejson.json").show()

+---+--------+------+
|age|    name|salary|
+---+--------+------+
| 20|  Manish| 20000|
| 25|  Nikita| 21000|
| 16|  Pritam| 22000|
| 35|Prantosh| 25000|
| 67|  Vikash| 40000|
+---+--------+------+



Reading corrupted json

In [0]:
spark.read.format("json")\
            .option("header","true")\
            .option("inferschema","true")\
            .option("mode","PERMISSIVE")\
            .load("/FileStore/tables/Corruptedjson.json").show(truncate = False)
    

+----------------------------------------+----+--------+------+
|_corrupt_record                         |age |name    |salary|
+----------------------------------------+----+--------+------+
|null                                    |20  |Manish  |20000 |
|null                                    |25  |Nikita  |21000 |
|null                                    |16  |Pritam  |22000 |
|null                                    |35  |Prantosh|25000 |
|{"name":"Vikash","age":67,"salary":40000|null|null    |null  |
+----------------------------------------+----+--------+------+



It has created a seperate column of corrupted record, whwn we do truncate = False then it will show the entrie data

#Reading a nested json

In [0]:
json_df = spark.read.format("json")\
            .option("header","true")\
            .option("inferschema","true")\
            .option("mode","PERMISSIVE")\
            .load("/FileStore/tables/file5.json").printSchema()

root
 |-- code: long (nullable = true)
 |-- message: string (nullable = true)
 |-- restaurants: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- restaurant: struct (nullable = true)
 |    |    |    |-- R: struct (nullable = true)
 |    |    |    |    |-- res_id: long (nullable = true)
 |    |    |    |-- apikey: string (nullable = true)
 |    |    |    |-- average_cost_for_two: long (nullable = true)
 |    |    |    |-- cuisines: string (nullable = true)
 |    |    |    |-- currency: string (nullable = true)
 |    |    |    |-- deeplink: string (nullable = true)
 |    |    |    |-- establishment_types: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- events_url: string (nullable = true)
 |    |    |    |-- featured_image: string (nullable = true)
 |    |    |    |-- has_online_delivery: long (nullable = true)
 |    |    |    |-- has_table_booking: long (nullable = true)
 |    |    |    |-- i

We have to flatten the json and create a prepare dataframe we generally get json and its widely used when we deal with API.

In [0]:
from pyspark.sql.functions import col, explode
df = spark.read.json("/FileStore/tables/file5.json")

flattened_df = df.select(
    col("code"),
    col("message"),
    col("results_found"),
    col("results_shown"),
    col("results_start"),
    col("status"),
    explode("restaurants").alias("restaurant")
).select(
    "code",
    "message",
    "results_found",
    "results_shown",
    "results_start",
    "status",
    "restaurant.restaurant.*"
).select(
    "has_online_delivery",
    "has_table_booking",
    "id",
    "is_delivering_now",
    "name",
    "offers",
    "price_range",
    "switch_to_order_menu",
    "user_rating.*"
)

# Show the flattened DataFrame
flattened_df.show(truncate=False)

+-------------------+-----------------+--------+-----------------+------------------------------------+------+-----------+--------------------+----------------+------------+-----------+-----+
|has_online_delivery|has_table_booking|id      |is_delivering_now|name                                |offers|price_range|switch_to_order_menu|aggregate_rating|rating_color|rating_text|votes|
+-------------------+-----------------+--------+-----------------+------------------------------------+------+-----------+--------------------+----------------+------------+-----------+-----+
|0                  |0                |17066603|0                |The Coop                            |[]    |2          |0                   |3.6             |9ACD32      |Good       |432  |
|0                  |0                |17059541|0                |Maggiano's Little Italy             |[]    |4          |0                   |4.4             |5BA829      |Very Good  |886  |
|0                  |0                |1

#Reading Parquet File

In [0]:
df = spark.read.parquet("/FileStore/tables/part_r_00000_1a9822ba_b8fb_4d8e_844a_ea30d0801b9e_gz.parquet")

In [0]:
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

#Writing dataframe into disk

In [0]:
read_df = spark.read.format("csv")\
          .option("header","true")\
          .option("inferschema","true")\
          .load("/FileStore/tables/File.csv")

read_df.show()

+---+----------+--------+--------+----------+-----------+
| id|      name|     age|  salary|   address|     gender|
+---+----------+--------+--------+----------+-----------+
|  1|    Manish|    26.0| 75000.0|     INDIA|          m|
|  2|    Nikita|    23.0|100000.0|       USA|          f|
|  3|    Pritam|    22.0|150000.0|     INDIA|          m|
|  4|  Prantosh|    17.0|200000.0|     JAPAN|          m|
|  5|    Vikash|    31.0|300000.0|       USA|          m|
|  6|     Rahul|    55.0|300000.0|     INDIA|          m|
|  7|      Raju|    67.0|540000.0|       USA|          m|
|  8|   Praveen|    28.0| 70000.0|     JAPAN|          m|
|  9|       Dev|    32.0|150000.0|     JAPAN|          m|
| 10|    Sherin|    16.0| 25000.0|    RUSSIA|          f|
| 11|      Ragu|    12.0| 35000.0|     INDIA|          f|
| 12|     Sweta|    43.0|200000.0|     INDIA|          f|
| 13|   Raushan|    48.0|650000.0|       USA|          m|
| 14|    Mukesh|    36.0| 95000.0|    RUSSIA|          m|
| 15|   Prakas

Suppose we have done the transformation and now we want to store the clean dataframe into the disk


In [0]:
%fs ls "/FileStore/tables/"

path,name,size,modificationTime
dbfs:/FileStore/tables/2010_summary.csv,2010_summary.csv,7121,1716379200000
dbfs:/FileStore/tables/Corruptedjson.json,Corruptedjson.json,222,1716465677000
dbfs:/FileStore/tables/File.csv,File.csv,741,1716476675000
dbfs:/FileStore/tables/employee.csv,employee.csv,236,1716443570000
dbfs:/FileStore/tables/file5.json,file5.json,669503,1716466304000
dbfs:/FileStore/tables/linedelimited.json,linedelimited.json,219,1716465123000
dbfs:/FileStore/tables/multilinejson.json,multilinejson.json,310,1716465123000
dbfs:/FileStore/tables/part_r_00000_1a9822ba_b8fb_4d8e_844a_ea30d0801b9e_gz.parquet,part_r_00000_1a9822ba_b8fb_4d8e_844a_ea30d0801b9e_gz.parquet,3921,1716469909000
dbfs:/FileStore/tables/users_6M0xxK_2024_csv.csv,users_6M0xxK_2024_csv.csv,1738911,1714723094000


In [0]:
write_df = read_df.write.format("csv")\
            .option("header","true")\
            .mode("overwrite")\
            .option("path","FileStore/tables/write_csv/")\
            .save()


In [0]:
%fs ls "/FileStore/tables/write_csv/"

path,name,size,modificationTime
dbfs:/FileStore/tables/write_csv/_SUCCESS,_SUCCESS,0,1716477292000
dbfs:/FileStore/tables/write_csv/_committed_5259562044498865433,_committed_5259562044498865433,111,1716477292000
dbfs:/FileStore/tables/write_csv/_started_5259562044498865433,_started_5259562044498865433,0,1716477291000
dbfs:/FileStore/tables/write_csv/part-00000-tid-5259562044498865433-a6b6bf97-fd11-4725-8c1c-3bde8a067a16-3-1-c000.csv,part-00000-tid-5259562044498865433-a6b6bf97-fd11-4725-8c1c-3bde8a067a16-3-1-c000.csv,490,1716477292000


We can see only one partition is created here, now we crete 3 partition of our data


In [0]:
write_df = read_df.repartition(3).write.format("csv")\
            .option("header","true")\
            .mode("errorifexists")\
            .option("path","FileStore/tables/write_csv/")\
            .save()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-4435996615032020>:1[0m
[0;32m----> 1[0m write_df [38;5;241m=[39m [43mread_df[49m[38;5;241;43m.[39;49m[43mrepartition[49m[43m([49m[38;5;241;43m3[39;49m[43m)[49m[38;5;241;43m.[39;49m[43mwrite[49m[38;5;241;43m.[39;49m[43mformat[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mcsv[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m\[49m
[1;32m      2[0m [43m            [49m[38;5;241;43m.[39;49m[43moption[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mheader[39;49m[38;5;124;43m"[39;49m[43m,[49m[38;5;124;43m"[39;49m[38;5;124;43mtrue[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m\[49m
[1;32m      3[0m [43m            [49m[38;5;241;43m.[39;49m[43mmode[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43merrorIfExist[39;49m[38;5;124;43m"[39;49m[43m)

We an see it gives error since fil;e already exist

In [0]:
write_df = read_df.repartition(3).write.format("csv")\
            .option("header","true")\
            .mode("overwrite")\
            .option("path","FileStore/tables/write_csv/")\
            .save()

In [0]:
%fs ls "/FileStore/tables/write_csv/"

path,name,size,modificationTime
dbfs:/FileStore/tables/write_csv/_SUCCESS,_SUCCESS,0,1716477483000
dbfs:/FileStore/tables/write_csv/_committed_5259562044498865433,_committed_5259562044498865433,111,1716477292000
dbfs:/FileStore/tables/write_csv/_committed_7458394400099194090,_committed_7458394400099194090,382,1716477482000
dbfs:/FileStore/tables/write_csv/_started_5259562044498865433,_started_5259562044498865433,0,1716477291000
dbfs:/FileStore/tables/write_csv/_started_7458394400099194090,_started_7458394400099194090,0,1716477482000
dbfs:/FileStore/tables/write_csv/part-00000-tid-7458394400099194090-1cb984a4-80c3-4215-81cb-e1e7b708e8a7-6-1-c000.csv,part-00000-tid-7458394400099194090-1cb984a4-80c3-4215-81cb-e1e7b708e8a7-6-1-c000.csv,184,1716477482000
dbfs:/FileStore/tables/write_csv/part-00001-tid-7458394400099194090-1cb984a4-80c3-4215-81cb-e1e7b708e8a7-7-1-c000.csv,part-00001-tid-7458394400099194090-1cb984a4-80c3-4215-81cb-e1e7b708e8a7-7-1-c000.csv,184,1716477482000
dbfs:/FileStore/tables/write_csv/part-00002-tid-7458394400099194090-1cb984a4-80c3-4215-81cb-e1e7b708e8a7-8-1-c000.csv,part-00002-tid-7458394400099194090-1cb984a4-80c3-4215-81cb-e1e7b708e8a7-8-1-c000.csv,190,1716477482000


In [0]:
display(dbutils.fs.ls("FileStore/tables/write_csv/part-00000-tid-7458394400099194090-1cb984a4-80c3-4215-81cb-e1e7b708e8a7-6-1-c000.csv"))

path,name,size,modificationTime
dbfs:/FileStore/tables/write_csv/part-00000-tid-7458394400099194090-1cb984a4-80c3-4215-81cb-e1e7b708e8a7-6-1-c000.csv,part-00000-tid-7458394400099194090-1cb984a4-80c3-4215-81cb-e1e7b708e8a7-6-1-c000.csv,184,1716477482000


In [0]:
df1 = spark.read.format("csv")\
          .option("header","true")\
          .option("inferschema","true")\
          .load("/FileStore/tables/write_csv/part-00000-tid-7458394400099194090-1cb984a4-80c3-4215-81cb-e1e7b708e8a7-6-1-c000.csv")

df1.show()

+---+-------+----+--------+-------+------+
| id|   name| age|  salary|address|gender|
+---+-------+----+--------+-------+------+
|  7|   Raju|67.0|540000.0|    USA|     m|
|  8|Praveen|28.0| 70000.0|  JAPAN|     m|
| 13|Raushan|48.0|650000.0|    USA|     m|
| 15|Prakash|52.0|750000.0|  INDIA|     m|
|  9|    Dev|32.0|150000.0|  JAPAN|     m|
+---+-------+----+--------+-------+------+



In [0]:
df2 = spark.read.format("csv")\
          .option("header","true")\
          .option("inferschema","true")\
          .load("/FileStore/tables/write_csv/part-00001-tid-7458394400099194090-1cb984a4-80c3-4215-81cb-e1e7b708e8a7-7-1-c000.csv")

df2.show()

+---+------+----+--------+-------+------+
| id|  name| age|  salary|address|gender|
+---+------+----+--------+-------+------+
| 12| Sweta|43.0|200000.0|  INDIA|     f|
|  3|Pritam|22.0|150000.0|  INDIA|     m|
|  1|Manish|26.0| 75000.0|  INDIA|     m|
|  2|Nikita|23.0|100000.0|    USA|     f|
|  5|Vikash|31.0|300000.0|    USA|     m|
+---+------+----+--------+-------+------+



In [0]:
df3 = spark.read.format("csv")\
          .option("header","true")\
          .option("inferschema","true")\
          .load("/FileStore/tables/write_csv/part-00002-tid-7458394400099194090-1cb984a4-80c3-4215-81cb-e1e7b708e8a7-8-1-c000.csv")
df3.show()

+---+--------+----+--------+-------+------+
| id|    name| age|  salary|address|gender|
+---+--------+----+--------+-------+------+
| 10|  Sherin|16.0| 25000.0| RUSSIA|     f|
|  6|   Rahul|55.0|300000.0|  INDIA|     m|
|  4|Prantosh|17.0|200000.0|  JAPAN|     m|
| 11|    Ragu|12.0| 35000.0|  INDIA|     f|
| 14|  Mukesh|36.0| 95000.0| RUSSIA|     m|
+---+--------+----+--------+-------+------+



#Partitioning AND Bucketing

In [0]:
df = spark.read.format("csv")\
     .option("header","true")\
     .option("inferschema","true")\
     .load("/FileStore/tables/File.csv")
df.show()

+---+----------+--------+--------+----------+-----------+
| id|      name|     age|  salary|   address|     gender|
+---+----------+--------+--------+----------+-----------+
|  1|    Manish|    26.0| 75000.0|     INDIA|          m|
|  2|    Nikita|    23.0|100000.0|       USA|          f|
|  3|    Pritam|    22.0|150000.0|     INDIA|          m|
|  4|  Prantosh|    17.0|200000.0|     JAPAN|          m|
|  5|    Vikash|    31.0|300000.0|       USA|          m|
|  6|     Rahul|    55.0|300000.0|     INDIA|          m|
|  7|      Raju|    67.0|540000.0|       USA|          m|
|  8|   Praveen|    28.0| 70000.0|     JAPAN|          m|
|  9|       Dev|    32.0|150000.0|     JAPAN|          m|
| 10|    Sherin|    16.0| 25000.0|    RUSSIA|          f|
| 11|      Ragu|    12.0| 35000.0|     INDIA|          f|
| 12|     Sweta|    43.0|200000.0|     INDIA|          f|
| 13|   Raushan|    48.0|650000.0|       USA|          m|
| 14|    Mukesh|    36.0| 95000.0|    RUSSIA|          m|
| 15|   Prakas

Now we are partitionin the data based on address and gender

In [0]:
df.write.format("csv")\
    .option("header","true")\
    .option("inferschema","true")\
    .mode("overwrite")\
    .option("path","/FileStore/tables/partition_by_address_gender")\
    .partitionBy("address","gender")\
    .save()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2163292688327470>:1[0m
[0;32m----> 1[0m [43mdf[49m[38;5;241;43m.[39;49m[43mwrite[49m[38;5;241;43m.[39;49m[43mformat[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mcsv[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m\[49m
[1;32m      2[0m [43m    [49m[38;5;241;43m.[39;49m[43moption[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mheader[39;49m[38;5;124;43m"[39;49m[43m,[49m[38;5;124;43m"[39;49m[38;5;124;43mtrue[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m\[49m
[1;32m      3[0m [43m    [49m[38;5;241;43m.[39;49m[43moption[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43minferschema[39;49m[38;5;124;43m"[39;49m[43m,[49m[38;5;124;43m"[39;49m[38;5;124;43mtrue[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m\[49m
[1;32m      4[0m [43m    [49m[38

In [0]:
%fs ls "/FileStore/tables/"

path,name,size,modificationTime
dbfs:/FileStore/tables/2010_summary.csv,2010_summary.csv,7121,1716379200000
dbfs:/FileStore/tables/Corruptedjson.json,Corruptedjson.json,222,1716465677000
dbfs:/FileStore/tables/File.csv,File.csv,741,1716476675000
dbfs:/FileStore/tables/employee.csv,employee.csv,236,1716443570000
dbfs:/FileStore/tables/file5.json,file5.json,669503,1716466304000
dbfs:/FileStore/tables/linedelimited.json,linedelimited.json,219,1716465123000
dbfs:/FileStore/tables/multilinejson.json,multilinejson.json,310,1716465123000
dbfs:/FileStore/tables/part_r_00000_1a9822ba_b8fb_4d8e_844a_ea30d0801b9e_gz.parquet,part_r_00000_1a9822ba_b8fb_4d8e_844a_ea30d0801b9e_gz.parquet,3921,1716469909000
dbfs:/FileStore/tables/users_6M0xxK_2024_csv.csv,users_6M0xxK_2024_csv.csv,1738911,1714723094000
dbfs:/FileStore/tables/write_csv/,write_csv/,0,0


In [0]:
dbutils.fs.rm("/FileStore/tables/File.csv", recurse = True)

Out[4]: True

In [0]:
%fs ls "/FileStore/tables/"

path,name,size,modificationTime
dbfs:/FileStore/tables/2010_summary.csv,2010_summary.csv,7121,1716379200000
dbfs:/FileStore/tables/Corruptedjson.json,Corruptedjson.json,222,1716465677000
dbfs:/FileStore/tables/employee.csv,employee.csv,236,1716443570000
dbfs:/FileStore/tables/file5.json,file5.json,669503,1716466304000
dbfs:/FileStore/tables/linedelimited.json,linedelimited.json,219,1716465123000
dbfs:/FileStore/tables/multilinejson.json,multilinejson.json,310,1716465123000
dbfs:/FileStore/tables/part_r_00000_1a9822ba_b8fb_4d8e_844a_ea30d0801b9e_gz.parquet,part_r_00000_1a9822ba_b8fb_4d8e_844a_ea30d0801b9e_gz.parquet,3921,1716469909000
dbfs:/FileStore/tables/users_6M0xxK_2024_csv.csv,users_6M0xxK_2024_csv.csv,1738911,1714723094000
dbfs:/FileStore/tables/write_csv/,write_csv/,0,0


In [0]:
df = spark.read.format("csv")\
     .option("header","true")\
     .option("inferschema","true")\
     .load("/FileStore/tables/File.csv")
df.show()

+---+--------+---+------+-------+------+
| id|    name|age|salary|address|gender|
+---+--------+---+------+-------+------+
|  1|  Manish| 26| 75000|  INDIA|     m|
|  2|  Nikita| 23|100000|    USA|     f|
|  3|  Pritam| 22|150000|  INDIA|     m|
|  4|Prantosh| 17|200000|  JAPAN|     m|
|  5|  Vikash| 31|300000|    USA|     m|
|  6|   Rahul| 55|300000|  INDIA|     m|
|  7|    Raju| 67|540000|    USA|     m|
|  8| Praveen| 28| 70000|  JAPAN|     m|
|  9|     Dev| 32|150000|  JAPAN|     m|
| 10|  Sherin| 16| 25000| RUSSIA|     f|
| 11|    Ragu| 12| 35000|  INDIA|     f|
| 12|   Sweta| 43|200000|  INDIA|     f|
| 13| Raushan| 48|650000|    USA|     m|
| 14|  Mukesh| 36| 95000| RUSSIA|     m|
| 15| Prakash| 52|750000|  INDIA|     m|
+---+--------+---+------+-------+------+



In [0]:
df.write.format("csv")\
    .option("header","true")\
    .option("inferschema","true")\
    .mode("overwrite")\
    .option("path","/FileStore/tables/partition_by_address_gender")\
    .partitionBy("address","gender")\
    .save()

In [0]:
display(dbutils.fs.ls("/FileStore/tables/partition_by_address_gender/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/partition_by_address_gender/_SUCCESS,_SUCCESS,0,1716524396000
dbfs:/FileStore/tables/partition_by_address_gender/address=INDIA/,address=INDIA/,0,0
dbfs:/FileStore/tables/partition_by_address_gender/address=JAPAN/,address=JAPAN/,0,0
dbfs:/FileStore/tables/partition_by_address_gender/address=RUSSIA/,address=RUSSIA/,0,0
dbfs:/FileStore/tables/partition_by_address_gender/address=USA/,address=USA/,0,0


In [0]:
display(dbutils.fs.ls("/FileStore/tables/partition_by_address_gender/address=INDIA/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/partition_by_address_gender/address=INDIA/gender=f/,gender=f/,0,0
dbfs:/FileStore/tables/partition_by_address_gender/address=INDIA/gender=m/,gender=m/,0,0


In [0]:
display(dbutils.fs.ls("/FileStore/tables/partition_by_address_gender/address=INDIA/gender=f"))

path,name,size,modificationTime
dbfs:/FileStore/tables/partition_by_address_gender/address=INDIA/gender=f/_SUCCESS,_SUCCESS,0,1716524395000
dbfs:/FileStore/tables/partition_by_address_gender/address=INDIA/gender=f/_committed_1987066062223935307,_committed_1987066062223935307,111,1716524395000
dbfs:/FileStore/tables/partition_by_address_gender/address=INDIA/gender=f/_started_1987066062223935307,_started_1987066062223935307,0,1716524393000
dbfs:/FileStore/tables/partition_by_address_gender/address=INDIA/gender=f/part-00000-tid-1987066062223935307-b8eaee67-1857-4ba4-9a8d-febda5255bdd-6-1.c000.csv,part-00000-tid-1987066062223935307-b8eaee67-1857-4ba4-9a8d-febda5255bdd-6-1.c000.csv,55,1716524393000


In [0]:
df_gender = spark.read.format("csv")\
     .option("header","true")\
     .option("inferschema","true")\
     .load("/FileStore/tables/partition_by_address_gender/address=INDIA/gender=f/part-00000-tid-1987066062223935307-b8eaee67-1857-4ba4-9a8d-febda5255bdd-6-1.c000.csv")

df_gender.show()

+---+-----+---+------+
| id| name|age|salary|
+---+-----+---+------+
| 11| Ragu| 12| 35000|
| 12|Sweta| 43|200000|
+---+-----+---+------+



When we don't ahve any column in the dataset to partition and we have to divide the data into chunks then in that case we use Bucketing

In [0]:
df.write.format("csv")\
    .option("header","true")\
    .option("inferschema","true")\
    .mode("overwrite")\
    .option("path","/FileStore/tables/Bucketing_by_id")\
    .bucketBy(3,"id")\
    .save()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-2163292688327481>:1[0m
[0;32m----> 1[0m [43mdf[49m[38;5;241;43m.[39;49m[43mwrite[49m[38;5;241;43m.[39;49m[43mformat[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mcsv[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m\[49m
[1;32m      2[0m [43m    [49m[38;5;241;43m.[39;49m[43moption[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mheader[39;49m[38;5;124;43m"[39;49m[43m,[49m[38;5;124;43m"[39;49m[38;5;124;43mtrue[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m\[49m
[1;32m      3[0m [43m    [49m[38;5;241;43m.[39;49m[43moption[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43minferschema[39;49m[38;5;124;43m"[39;49m[43m,[49m[38;5;124;43m"[39;49m[38;5;124;43mtrue[39;49m[38;5;124;43m"[39;49m[43m)[49m[43m\[49m
[1;32m      4[0m [43m    [49m[38

When you use saveAsTable, Spark registers the table with the Hive metastore, which includes details about bucketing. This allows Spark SQL to leverage this metadata in future queries without the need to reprocess or reanalyze the data. The metadata helps in:

Cost-Based Optimization (CBO): The optimizer can make more informed decisions about query plans by using the metadata.
Auto Query Optimization: Spark can automatically optimize queries based on the bucketing information, such as skipping irrelevant buckets and optimizing join strategies.

In [0]:
df.write.format("csv")\
    .option("header","true")\
    .option("inferschema","true")\
    .mode("overwrite")\
    .option("path","/FileStore/tables/Bucketing_by_id")\
    .bucketBy(3,"id")\
    .saveAsTable("bucket_by_id")

In [0]:
display(dbutils.fs.ls("/FileStore/tables/Bucketing_by_id/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/Bucketing_by_id/_SUCCESS,_SUCCESS,0,1716525006000
dbfs:/FileStore/tables/Bucketing_by_id/_committed_6804979900832139106,_committed_6804979900832139106,306,1716525006000
dbfs:/FileStore/tables/Bucketing_by_id/_started_6804979900832139106,_started_6804979900832139106,0,1716525005000
dbfs:/FileStore/tables/Bucketing_by_id/part-00000-tid-6804979900832139106-206184a4-c726-44fc-bac4-cbdf5fb90e65-34-1_00000.c000.csv,part-00000-tid-6804979900832139106-206184a4-c726-44fc-bac4-cbdf5fb90e65-34-1_00000.c000.csv,270,1716525005000
dbfs:/FileStore/tables/Bucketing_by_id/part-00000-tid-6804979900832139106-206184a4-c726-44fc-bac4-cbdf5fb90e65-34-2_00001.c000.csv,part-00000-tid-6804979900832139106-206184a4-c726-44fc-bac4-cbdf5fb90e65-34-2_00001.c000.csv,113,1716525005000
dbfs:/FileStore/tables/Bucketing_by_id/part-00000-tid-6804979900832139106-206184a4-c726-44fc-bac4-cbdf5fb90e65-34-3_00002.c000.csv,part-00000-tid-6804979900832139106-206184a4-c726-44fc-bac4-cbdf5fb90e65-34-3_00002.c000.csv,115,1716525006000


In [0]:
df_bucket1 = spark.read.format("csv")\
     .option("header","true")\
     .option("inferschema","true")\
     .load("/FileStore/tables/Bucketing_by_id/part-00000-tid-6804979900832139106-206184a4-c726-44fc-bac4-cbdf5fb90e65-34-1_00000.c000.csv")

df_bucket1.show()


+---+--------+---+------+-------+------+
| id|    name|age|salary|address|gender|
+---+--------+---+------+-------+------+
|  2|  Nikita| 23|100000|    USA|     f|
|  3|  Pritam| 22|150000|  INDIA|     m|
|  4|Prantosh| 17|200000|  JAPAN|     m|
|  5|  Vikash| 31|300000|    USA|     m|
|  7|    Raju| 67|540000|    USA|     m|
|  8| Praveen| 28| 70000|  JAPAN|     m|
|  9|     Dev| 32|150000|  JAPAN|     m|
| 10|  Sherin| 16| 25000| RUSSIA|     f|
| 14|  Mukesh| 36| 95000| RUSSIA|     m|
+---+--------+---+------+-------+------+



In [0]:
df_bucket2 = spark.read.format("csv")\
     .option("header","true")\
     .option("inferschema","true")\
     .load("/FileStore/tables/Bucketing_by_id/part-00000-tid-6804979900832139106-206184a4-c726-44fc-bac4-cbdf5fb90e65-34-2_00001.c000.csv")

df_bucket2.show()

+---+-------+---+------+-------+------+
| id|   name|age|salary|address|gender|
+---+-------+---+------+-------+------+
|  1| Manish| 26| 75000|  INDIA|     m|
|  6|  Rahul| 55|300000|  INDIA|     m|
| 13|Raushan| 48|650000|    USA|     m|
+---+-------+---+------+-------+------+



In [0]:
df_bucket3 = spark.read.format("csv")\
     .option("header","true")\
     .option("inferschema","true")\
     .load("/FileStore/tables/Bucketing_by_id/part-00000-tid-6804979900832139106-206184a4-c726-44fc-bac4-cbdf5fb90e65-34-3_00002.c000.csv")

df_bucket3.show()

+---+-------+---+------+-------+------+
| id|   name|age|salary|address|gender|
+---+-------+---+------+-------+------+
| 11|   Ragu| 12| 35000|  INDIA|     f|
| 12|  Sweta| 43|200000|  INDIA|     f|
| 15|Prakash| 52|750000|  INDIA|     m|
+---+-------+---+------+-------+------+

