### Data Reading


In [0]:
# DBFS root access (e.g., /workspace/default/big_mart_sales) is disabled for security, so you cannot read data using file paths.
# Unity Catalog tables must be accessed using spark.table("catalog.schema.table").
# You cannot specify a custom schema when reading Unity Catalog tables; use DataFrame transformations to change column types.
# For external files (e.g., S3), you can use spark.read.format with a custom schema.

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
df=spark.read.format('csv').option('header', 'True').option('inferschema',True).load()

#this does not work since we are trying to read a file from unity catalog

In [0]:
df = spark.table("workspace.default.big_mart_sales")


In [0]:
display(df)

In [0]:
df.printSchema()

### **DDL SCHEMA**


In [0]:
my_ddl_schema = '''
  Item_Identifier string,
  Item_Weight string,
  Item_Fat_Content string,
  Item_Visibility double,
  Item_Type string,
  Item_MRP double,
  Outlet_Identifier string,
  Outlet_Establishment_Year int,
  Outlet_Size string,
  Outlet_Location_Type string,
  Outlet_Type string,
  Item_Outlet_Sales double
'''

#this approach is used when we read files from sources like s3 etc

In [0]:
df=spark.read.format('csv')\
         .schema(my_ddl_schema)\
         .option('header', True)\
         .load('/workspace/default/big_mart_sales')

#this does not work since we are trying to read a file from unity catalog

In [0]:
df = spark.table(
    "workspace.default.big_mart_sales"
).withColumn(
    "Item_Weight", 
    col("Item_Weight").cast("string")
) 
display(df)

#Unity Catalog tables must be accessed using spark.table("catalog.schema.table").
# You cannot specify a custom schema when reading Unity Catalog tables; use DataFrame transformations to change column types.

### Structtype() **Schema**

In [0]:
my_struct_schema = StructType([
    StructField("Item_Identifier", StringType(), True),
    StructField("Item_Weight", StringType(), True),  # Changed to StringType
    StructField("Item_Fat_Content", StringType(), True),
    StructField("Item_Visibility", DoubleType(), True),
    StructField("Item_Type", StringType(), True),
    StructField("Item_MRP", DoubleType(), True),
    StructField("Outlet_Identifier", StringType(), True),
    StructField("Outlet_Establishment_Year", StringType(), True),
    StructField("Outlet_Size", StringType(), True),
    StructField("Outlet_Location_Type", StringType(), True),
    StructField("Outlet_Type", StringType(), True),
    StructField("Item_Outlet_Sales", DoubleType(), True)
])

In [0]:
#You cannot apply a custom schema directly when reading a Unity Catalog table. Instead, read the table and cast columns as needed. The custom schema you created with StructType is only used when reading external files (e.g., from S3) with spark.read.format. For Unity Catalog tables, use DataFrame transformations:



### SELECT

In [0]:
df.display()


In [0]:
df.select("Item_Identifier","Item_Weight","Item_Fat_Content").display()

#first method to select the specific columns

In [0]:
df.select(col("Item_Identifier"),col("Item_Weight"),col("Item_Fat_Content")).display() 

#this is the seccond method to select the specific columns using col


### USING ALIAS

In [0]:

df.select(col("Item_Identifier").alias("Item_id")).display()

### Transformations

## scenario-1

In [0]:
df.filter(col("Item_Fat_Content")=="Regular").display()

###scenerio-2

In [0]:
df.filter((col("Item_type")=="Soft Drinks") & (col("Item_Weight")<10)).display()

### scenerio-3

In [0]:
df.filter((col("Outlet_Location_Type").isin(['Tier 1','Tier 2'])) & 
          (col("Outlet_Size").isNull())).display()

### withColumnRenamed()

In [0]:
df=df.withColumnRenamed("Item_Weight", "item_wt")

In [0]:
display(df)

### withColumn()

In [0]:
df=df.withColumn("flag",lit('new')).display()

In [0]:
#scenerio-2

df=df.withColumn("total sales amount", col("Item_MRP")*col("Item_Outlet_Sales")).display()

In [0]:
#scenerio-3

df=df.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),        'Regular','Reg'))\
    .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),        'Low Fat','LF'))

display(df)    




### Type Casting

In [0]:
df=df.withColumn('Item_Weight', col('Item_Weight').cast(StringType()))

In [0]:
display(df)

### Sort

In [0]:
df=df.sort(col("Item_Weight").desc())

In [0]:
display(df)

In [0]:
df=df.sort(["Item_Weight","Item_Visibility"], ascending=[0,0]).display()


### Limit

In [0]:
df.limit(10).display()

### Drop

In [0]:
df.drop("Item_Weight").display()

In [0]:
df.drop("Item_Fat_Content","Item_Type").display()

### Drop Duplicates

In [0]:
df.dropDuplicates().display()

In [0]:
df=df.dropDuplicates(subset=["Item_Type"]).display()

In [0]:
df.distinct().display()

### UNION and UNION BY NAME
Preaparing Dataframes

In [0]:
data1 = [('1','kad'),
        ('2','sid')]
schema1 = 'id STRING, name STRING' 

df1 = spark.createDataFrame(data1,schema1)

data2 = [('3','rahul'),
        ('4','jas')]
schema2 = 'id STRING, name STRING' 

df2 = spark.createDataFrame(data2,schema2)



In [0]:
df1.display()

In [0]:
df2.display()

In [0]:
df1.union(df2).display()

### Union By Name

In [0]:
df1.unionByName(df2).display()

### Initcap()

In [0]:
df.select(initcap("Item_Type").alias("upper_item_type")).display()

### Current Date

In [0]:
df=df.withColumn("date_column", current_date())
df.display()

### Date Add

In [0]:
df=df.withColumn("week after",date_add("date_column",7 )).display()

### Date Sub

In [0]:

df=df.withColumn('week_before',date_sub('date_column',7))

display(df)

### Date Diff

In [0]:
df=df.withColumn("date_diff", datediff("date_column","week after"))

df.display()

### Date Format


In [0]:

df = df.withColumn('week_before',date_format('week_before','dd-MM-yyyy'))

df.display()

### Handling Nulls

In [0]:
df.dropna('all').display()

In [0]:
df.dropna(subset=['Outlet_Size']).display()

### Filling Nulls

In [0]:
df.fillna('NOT available').display

In [0]:
df.fillna('Not Available', subset=['Outlet_Size']).display()

### Split and Indexing


## split

In [0]:
df.withColumn('Outlet_Type', split('Outlet_Type',' ')).display()

### Indexing

In [0]:
df.withColumn('Outlet_Type', split('Outlet_Type',' ')[1]).display()

### EXPLODE

In [0]:

df_exp = df.withColumn('Outlet_Type',split('Outlet_Type',' '))

df_exp.display()
     

In [0]:
df_exp.withColumn('Outlet_Type', explode('Outlet_Type')).display()

### Array Contains

In [0]:
df_exp.withColumn('Type1_flag', array_contains('Outlet_Type','Type1')).display()

### Group By

Scenario - 1

In [0]:
df.groupBy('Item_Type').agg(sum('Item_MRP')).display()  

Scenario - 2

In [0]:
df.groupBy('Item_Type').agg(avg('Item_MRP')).display()

SCenario - 3

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP')).display()

Scenario - 4

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP'),avg('Item_MRP')).display()

### Collect_List

In [0]:

data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]

schema = 'user string, book string'

df_book = spark.createDataFrame(data,schema)

df_book.display()

In [0]:
df_book.groupBy('user').agg(collect_list('book')).display()

In [0]:
df.select('Item_Type','Outlet_Size','Item_MRP').display()

### PIVOT

In [0]:
df.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).display()

     

### When-Otherwise

Scenario - 1

In [0]:
df=df.withColumn('Veg_FLag', when(col('Item_Type')=='Meat','Non veg').otherwise('Veg'))

df.display()


     

In [0]:
df=df.withColumn(
    'Veg_exp_Flag',
     when(
         (col('Veg_FLag')=='Veg') & (col('Item_MRP')<100),
     'veg_inexpensive')\
    .when(
        (col('Veg_FLag')=='Non veg') & (col('Item_MRP')>100),
    'Veg_expensive')\
        .otherwise('NOn veg')
)

In [0]:
df.display()

### JOINS

In [0]:

dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 

schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 

df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schemaj2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataj2,schemaj2)

In [0]:
df1.display()

In [0]:
df2.display()

In [0]:
df1.join(df2, df1['dept_id']==df2['dept_id'], 'inner').display()

### Left Join

In [0]:
df1.join(df2, df1['dept_id']==df2['dept_id'], 'left').display()

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'right').display()

### Anti Join

In [0]:
df1.join(df2, df1['dept_id']==df2['dept_id'],'anti').display()


### Window Functions

In [0]:
df = spark.table("workspace.default.big_mart_sales")
df.display()

In [0]:
from pyspark.sql.window import *
from pyspark.sql.functions import *

In [0]:
df.withColumn('row_col', row_number().over(Window.orderBy('Item_Identifier'))).display()

### RANK VS DENSE RANK

In [0]:

df.withColumn('rank',rank().over(Window.orderBy(col('Item_Identifier').desc())))\
        .withColumn('denseRank',dense_rank().over(Window.orderBy(col('Item_Identifier').desc()))).display()
 

In [0]:

df.withColumn('dum',sum('Item_MRP').over(Window.orderBy('Item_Identifier').rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()
     


### Cummulative Sum

In [0]:
df.withColumn('cumsum', sum('Item_MRP').over(Window.orderBy('Item_Identifier'))).display()

In [0]:
df.withColumn('totalsum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).display()


### User Defined Functions

### Step-1

In [0]:
def my_func(x):
    return x*x

In [0]:
my_udf=udf(my_func)

In [0]:
df.withColumn('mynewcol',my_udf('Item_MRP')).display()


### DATA WRITING

### CSV 

In [0]:

df.write.format('csv')\
        .save('/FileStore/tables/CSV/data.csv')

### APPEND

In [0]:
df.write.format('csv')\
        .mode('append')\
        .save('/FileStore/tables/CSV/data.csv')
     


In [0]:
df.write.format('csv')\
        .mode('append')\
        .save('/FileStore/tables/CSV/data.csv')
     


### Overwrite

In [0]:
df.write.format('csv')\
.mode('overwrite')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

### Error

In [0]:

df.write.format('csv')\
.mode('error')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

### Ignore

In [0]:
df.write.format('csv')\
.mode('ignore')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

### PARQUET

In [0]:

df.write.format('parquet')\
.mode('overwrite')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

### TABLE

In [0]:

df.write.format('parquet')\
.mode('overwrite')\
.saveAsTable('my_table')

In [0]:
df.display()


### SPARK SQL

createTempView

In [0]:

df.createTempView('my_view')

In [0]:

%sql

select * from my_view where Item_Fat_Content = 'Lf'

In [0]:

df_sql = spark.sql("select * from my_view where Item_Fat_Content = 'Lf'")


In [0]:

df_sql.display()