In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
def read_csv(filepath,options):
    df_read=spark.read.csv(filepath,**options)
    return df_read


options={
    'header':True,
    'delimeter':',',
    'inferschema':True
}
#**options means it is a kwargs that means we are passing more num of values as key value pairs
#when we give header as true means it give the column names or else it shows like col1,col2...
#delimiter means separation of data by special characters
#inferschema means it defines the schema of the file when we given it as true

filepath='dbfs:/FileStore/fish_data.csv'

In [0]:
#It will works only for json files
df_json = spark.read.format("json").options("multiline"=True).load("dbfs:/FileStore/sample__1_.json")

In [0]:
#Here is the function to read json file formats
def read_json(filepath,option):
    df_readfile=spark.read.json(filepath,option)
    return df_readfile


option={
    'multiline':True      #Multiline indicates that a piece of text or content can span multiple lines. 
}
filepath='dbfs:/FileStore/sample__1_.json'

In [0]:
#Here is the function to read parquet file formats
def read_parquet(filepath,option):
    df_pqfile=spark.read.parquet(filepath,option)
    return df_pqfile


option={
    'header':True  
}
filepath='dbfs:/FileStore/mt_cars.parquet'

In [0]:
data = [
    (1, 10.5, 3.2, "Forest", 7.0, "Brown", True, 15.5),
    (2, 8.0, 2.5, "River", 6.5, "Blue", False, 10.0),
    (3, 12.2, 5.0, "Desert", 8.2, "Green", True, 20.0)
]

schema = StructType([
    StructField("id", IntegerType(), True), #nullable = true, means it allows null values also
    StructField("average_length_inches", DoubleType(), True),
    StructField("average_weight_inches", DoubleType(), True),
    StructField("habitat", StringType(), True),
    StructField("ph_of_water", DoubleType(), True),
    StructField("color", StringType(), True),
    StructField("Gender", BooleanType(), True),
    StructField("life_span", DoubleType(), True)
])
#structtype means it collection of all structfields
#structfield means collection of columnnames and which datatype it indicates


df=spark.createDataFrame(data=data, schema=schema)


In [0]:
df.display()

id,average_length_inches,average_weight_inches,habitat,ph_of_water,color,Gender,life_span
1,10.5,3.2,Forest,7.0,Brown,True,15.5
2,8.0,2.5,River,6.5,Blue,False,10.0
3,12.2,5.0,Desert,8.2,Green,True,20.0


In [0]:
data = [
    (1, 10.5, 3.2, "Forest", 7.0, "Brown", True, 15.5),
    (2, 8.0, 2.5, "River", 6.5, "Blue", False, 10.0),
    (3, 12.2, 5.0, "Desert", None, "Green", True, 20.0)
]

schema = StructType([
    StructField("id", IntegerType(), True), #nullable = true, means it allows null values also
    StructField("average_length_inches", DoubleType(), True),
    StructField("average_weight_inches", DoubleType(), True),
    StructField("habitat", StringType(), True),
    StructField("ph_of_water", DoubleType(), False),
    StructField("color", StringType(), True),
    StructField("Gender", BooleanType(), True),
    StructField("life_span", DoubleType(), True)
])
#structtype means it collection of all structfields
#structfield means collection of columnnames and which datatype it indicates


df=spark.createDataFrame(data=data, schema=schema)

#if give nullable as flase means it wont accept null...

[0;31m---------------------------------------------------------------------------[0m
[0;31mValueError[0m                                Traceback (most recent call last)
File [0;32m<command-4106982960200150>:21[0m
[1;32m      7[0m schema [38;5;241m=[39m StructType([
[1;32m      8[0m     StructField([38;5;124m"[39m[38;5;124mid[39m[38;5;124m"[39m, IntegerType(), [38;5;28;01mTrue[39;00m), [38;5;66;03m#nullable = true, means it allows null values also[39;00m
[1;32m      9[0m     StructField([38;5;124m"[39m[38;5;124maverage_length_inches[39m[38;5;124m"[39m, DoubleType(), [38;5;28;01mTrue[39;00m),
[0;32m   (...)[0m
[1;32m     15[0m     StructField([38;5;124m"[39m[38;5;124mlife_span[39m[38;5;124m"[39m, DoubleType(), [38;5;28;01mTrue[39;00m)
[1;32m     16[0m ])
[1;32m     17[0m [38;5;66;03m#structtype means it collection of all structfields[39;00m
[1;32m     18[0m [38;5;66;03m#structfield means collection of columnnames and which datatype it

In [0]:
#Function for writing modes of append and overwrite
def write_function(df,file_format,mode,path,tblname):
    return df.write.format(file_format).mode(mode).option('path',path).saveAsTable(tblname)

In [0]:
write_function(df,"csv","append","dbfs:/FileStore/practice","appendtable")

In [0]:
data = [
    (1, 10.5, 3.2, "Forest", 7.0, "Brown", True, 15.5)
]
df = spark.createDataFrame(data,schema)

In [0]:
write_function(df,"csv","overwrite","dbfs:/FileStore/practice","appendtable")

In [0]:
# Previously we have a dataframe with 3 records now we are overwriting so we had only one record in df
df_overwrite = spark.read.format("csv").option("header","True").load('dbfs:/FileStore/practice')
df_overwrite.display()

1,10.5,3.2,Forest,7.0,Brown,true,15.5


In [0]:
#This is common function to read csv file
def read_file(formats,path,**options):
    rd_file=spark.read.format(formats).options(**options).load(path)  
    #formats is a parameter which helps in passing different format files,options are incidate the parameter which is useful for each different format options like for csv and json etc,path means passing the format path which we want for reading a file.
    return rd_file

In [0]:
options={
    'header': True,
    'delimiter': ',',
    'inferschema' : True
}
#when we give header as true means it give the column names or else it shows like col1,col2...
#delimiter means separation of data by special characters
#inferschema means it defines the schema of the file when we given it as true
df_csv=read_file('csv',options,'dbfs:/FileStore/fish_data.csv')
display(df_csv)

In [0]:
path = 'dbfs:/Pract/remo'
def merge(df,path,dbname,tablename,mergeCol):
    silverpath = f"{path}/{dbname}/{tablename}"
    mappedCol = " AND ".join(list(map((lambda x: f"source.{x} = target.{x}"),mergeCol)))
    if not DeltaTable.isDeltaTable(spark,silverpath):
        spark.sql(f'create database if not exists {dbname}') 
        df.write.mode("overwrite").format("delta").option("path",silverpath).saveAsTable(f"{dbname}.{tablename}")
    else:
        delta = DeltaTable.forPath(spark, silverpath)
        delta.alias("target").merge(
          source = df.alias("source"),
          condition=mappedCol).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
        

#here we are passing arguments like dataframe,path and databasename ,tablename and mergecol that means comparing common column in both the tables
#silver path means we are giving a path to store the data ,for that we are doing string formatting to create a path by using databasename and tablename
#mappedcol means here we are comparing common column by string formating using lambda function and stored it in mergeCol
#by using that silverpath we are checking it stores like deltatable. For the first time the path checks for if else condition if not delta table it will create the delta table by using path if there is no data base it will create the database and first time it will overwrite.  
#When we are tryting to write it for second time it will go to else block and it will update when there is a match in the primary key and if the values are not present it will insert them.
#If duplicates are present in the source  if source is containing only one record then it will update if the source is having multipe duplicate records then it will fail.