In [0]:
type(spark)

pyspark.sql.connect.session.SparkSession

In [0]:
dfr = spark.read
print(type(dfr))

<class 'pyspark.sql.connect.readwriter.DataFrameReader'>


## Create DataFrame From CSV

In [0]:
dfr = spark.read
df = dfr.csv("/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv")
print(type(df))

In [0]:
df = spark.read.csv("/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv")
df.display()

_c0,_c1,_c2,_c3,_c4
id,age,gen,designation,salary
1,26,M,Technician,85711
2,53,F,Other,94043
3,23,M,Writer,32067
4,26,M,technician,43537
5,33,F,Other,15213
6,42,M,Cheif Executive Officer,98101
7,57,M,Administrator,91344
8,36,M,Administrator,05201
9,29,M,Student,01002


In [0]:
df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    header=True,
    inferschmena= True
)
# df.display()  // This is used to display.
df.printSchema() # This is used to print Datatype of each column

root
 |-- id: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gen: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- salary: string (nullable = true)



## Create data from Custom Delimiter

In [0]:
spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.dat",
    header=True,
    inferSchema=True,
    sep="|",
).display()


id,age,gen,designation,salary
1,26,M,Technician,85711
2,53,F,Other,94043
3,23,M,Writer,32067
4,26,M,technician,43537
5,33,F,Other,15213
6,42,M,Cheif Executive Officer,98101
7,57,M,Administrator,91344
8,36,M,Administrator,5201
9,29,M,Student,1002
10,53,M,Lawyer,90703


## Create data from JSON

In [0]:
spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_004.json",
    multiLine=True,
).display()

age,designation,gender,id,salary
0,technician,M,1,85711
53,other,F,2,94043


## Create data from Parquet -- This is a columunar format

In [0]:
spark.read.parquet(
    "/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users.parquet"
).display()

id,age,gender,designation,salary
1,0,M,technician,85711
2,53,F,other,94043
3,23,M,writer,32067
4,0,M,technician,43537
5,33,F,other,15213
6,42,M,Cheif Executive Officer,98101
7,57,M,administrator,91344
8,36,M,administrator,5201
9,29,M,student,1002
10,53,M,lawyer,90703


## Create DataFrame Cloud Storage (ADLS) using access key

In [0]:
storage_account_name = "synechron25092025"
container_name = "users"
azure_access_keys = "okaX3mprGoyrhfFCB9gBSivZz3UZjMiBwR7XHBzU9OCzU/wJfSI7LfmgSAFRqLuVOExoG+AMLVbJ+AStrHi6Ug=="
 
file_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/users_001.csv"
 
spark_conf_key_for_access = (
    f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net"
)
spark.read.option(spark_conf_key_for_access, azure_access_keys).csv(
    path=file_path, header=True, inferSchema=True
).display()

[0;31m---------------------------------------------------------------------------[0m
[0;31mSparkConnectGrpcException[0m                 Traceback (most recent call last)
File [0;32m<command-5729732916150092>, line 12[0m
[1;32m      5[0m file_path [38;5;241m=[39m [38;5;124mf[39m[38;5;124m"[39m[38;5;124mabfss://[39m[38;5;132;01m{[39;00mcontainer_name[38;5;132;01m}[39;00m[38;5;124m@[39m[38;5;132;01m{[39;00mstorage_account_name[38;5;132;01m}[39;00m[38;5;124m.dfs.core.windows.net/users_001.csv[39m[38;5;124m"[39m
[1;32m      7[0m spark_conf_key_for_access [38;5;241m=[39m (
[1;32m      8[0m     [38;5;124mf[39m[38;5;124m"[39m[38;5;124mfs.azure.account.key.[39m[38;5;132;01m{[39;00mstorage_account_name[38;5;132;01m}[39;00m[38;5;124m.dfs.core.windows.net[39m[38;5;124m"[39m
[1;32m      9[0m )
[1;32m     10[0m spark[38;5;241m.[39mread[38;5;241m.[39moption(spark_conf_key_for_access, azure_access_keys)[38;5;241m.[39mcsv(
[1;32m     11[0m  

## Copy the data in AWS S3, Google Cloud Storage (Assignment)

## Custom Schema

In [0]:
from pyspark.sql.types import *
 
CUSTOM_SCHEMA = StructType(
    [
        StructField("id", IntegerType()),
        StructField("age", IntegerType()),
        StructField("gen", StringType()),
        StructField("desig", StringType()),
        StructField("salary", IntegerType()),
    ]
)
 
spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    header=True,
    schema=CUSTOM_SCHEMA,
).display()

id,age,gen,desig,salary
1,26,M,Technician,85711
2,53,F,Other,94043
3,23,M,Writer,32067
4,26,M,technician,43537
5,33,F,Other,15213
6,42,M,Cheif Executive Officer,98101
7,57,M,Administrator,91344
8,36,M,Administrator,5201
9,29,M,Student,1002
10,53,M,Lawyer,90703


# Handling Bad Records

### PERMISSIVE (Default) - Bad Record in _courrupt_record  column however we can specify the name of the column

In [0]:
spark.read.option("columnNameOfCorruptRecord", "bad_record").json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/access_logs.json",
    mode="PERMISSIVE",
).display()

### DROPMALFORMED - Rejected the bad record

In [0]:
spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/access_logs.json",
    mode="DROPMALFORMED",
).display()

### FAILFAST - Does not allow to perform the function

In [0]:
spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/access_logs.json",
    mode="FAILFAST",
).display()

## Dataframe Writer API

In [0]:
df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    header=True,
    inferSchema=True
)
df.write.format("parquet").save("/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_parquet_new")

## Output Modes

### errorifExist

In [0]:
df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    header=True,
    inferSchema=True,
)
df.write.format("parquet").save(
    "/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_parquet_new"
)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-4523288295027597>, line 6[0m
[1;32m      1[0m df [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mcsv(
[1;32m      2[0m     path[38;5;241m=[39m[38;5;124m"[39m[38;5;124m/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv[39m[38;5;124m"[39m,
[1;32m      3[0m     header[38;5;241m=[39m[38;5;28;01mTrue[39;00m,
[1;32m      4[0m     inferSchema[38;5;241m=[39m[38;5;28;01mTrue[39;00m,
[1;32m      5[0m )
[0;32m----> 6[0m df[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mparquet[39m[38;5;124m"[39m)[38;5;241m.[39msave(
[1;32m      7[0m     [38;5;124m"[39m[38;5;124m/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_parquet_new[39m[38;5;124m"[39m
[1;32m      8[0m )

File [0;32m/

### overwrite

In [0]:
df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    header=True,
    inferSchema=True,
)
df.write.format("parquet").save(
    "/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_parquet_new",
    mode="overwrite",
)

### append

In [0]:
df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    header=True,
    inferSchema=True,
)
df.write.format("parquet").save(
    "/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_parquet_new",
    mode="append",
)

### Ignore

In [0]:
df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    header=True,
    inferSchema=True,
)
df.write.format("parquet").save(
    "/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_parquet_new",
    mode="ignore",
)

In [0]:
import pyarrow.parquet as pq

user_table = pq.read_table("/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_parquet_new")
user_table.to_pandas().Head()