## Data Reading

In [0]:
dbutils.fs.ls('/FileStore/tables')

In [0]:
df  = spark.read.format("csv").option('inderschema',True).option('header',True).load( '/FileStore/tables/BigMart_Sales.csv')

In [0]:
df.display()

## Reading JSON data

In [0]:
dbutils.fs.ls('FileStore/tables')

In [0]:
df_json = spark.read.format('json').option('inferschema',True).option('header',True).load( '/FileStore/tables/drivers.json')

In [0]:
df_json.show()

In [0]:
df_json.display()

## Using schema DDl, Struct Type

In [0]:
df.printSchema()

## DDL schema


In [0]:
ddl_schema = '''
Item_Identifier string,
Item_Weight float,
Item_Fat_Content string,
Item_Visibility float,
Item_Type string,
Item_MRP string,
Outlet_Identifier string,
Outlet_Establishment_Year string,
Outlet_Size string,
Outlet_Location_Type string,
Outlet_Type string,
Item_Outlet_Sales string
'''

In [0]:
dbutils.fs.ls('FileStore/tables')

In [0]:
df_ddlschema = spark.read.format('csv').option('Header',True).schema(ddl_schema).load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df_ddlschema.display()

## Defining schema using struct Type

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [0]:
df_ddlschema.printSchema()

In [0]:
struct_schema = StructType([
StructField('Item_Identifier',StringType(),True),
StructField('Item_Weight',StringType(),True),
StructField('Item_Fat_Content',StringType(),True),
StructField('Item_Visibility',StringType(),True),
StructField('Item_type',StringType(),True),
StructField('Item_MRP',StringType(),True),
StructField('Outlet_Identifier',StringType(),True),
StructField('Outlet_Establishment_Year',StringType(),True),
StructField('Outlet_Size',StringType(),True),
StructField('Outlet_Location_type',StringType(),True),
StructField('Outlet_type',StringType(),True),
StructField('Item_Outlet_Sales',StringType(),True)
])

In [0]:
df_structSchema = spark.read.format('csv').option('Header',True).schema(struct_schema).load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df_structSchema.display()

In [0]:
df_structSchema.printSchema()

### selecting columns

In [0]:
df.select('Item_Identifier','Item_Weight','Item_Fat_Content').display()

In [0]:
df.select(col('Item_Identifier'),col('Item_Weight'),col('Item_Fat_Content')).display()

## Aliasing using col().alias function 

In [0]:
df.select(col('Item_Identifier').alias('ItemID')).display()

### Slicing and filtering Data

In [0]:
display(df.head(5))

In [0]:
df.filter(col('Item_Fat_Content')=='Regular').display()

In [0]:
df.filter((col('Item_Weight')<20) & (col('Item_Type')=='Frozen Foods')).display()

In [0]:
df.filter(col('Outlet_Location_Type').isin('Tier 1','Tier 2') & (col('Outlet_Size').isNull())).display()

In [0]:
df.withColumnRenamed('Item_Identifier','Item_id')

### With column renamed


In [0]:
df.withColumnRenamed('Item_Weight','Item_wt').display()

### With column

In [0]:
df.withColumn('Country',lit('USA')).display()

In [0]:
df.withColumn('Item_MRP_After_Tax',col('Item_MRP')*1.085).head(5)

In [0]:
df.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),'Regular','reg'))\
    .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),'Low Fat','low'))\
        .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),'low Fat','low'))\
            .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),'LF','low'))\
                .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),'low fat','low')).display()

###Typecasting 

In [0]:
df.display()

In [0]:
df = df.withColumn('Item_Weight',col('Item_Weight').cast(dataType=FloatType()))

In [0]:
df.printSchema()

### sort asc()/desc()


In [0]:
df.sort(col('Item_Weight').desc()).display()

In [0]:
df.sort(col('Item_MRP').desc()).display()

In [0]:
df.sort(['Item_MRP','Item_Weight'],ascending=[0,1]).display()

In [0]:
df.sort(['Item_Weight','Item_MRP'],descending=[0,0]).display()

In [0]:
df.sort(col('Item_MRP').desc()).display()

In [0]:
display(df.sort(['Item_Weight','Item_MRP'],desc = [1,0]).limit(100))

In [0]:
df.sort(['Item_Weight','Item_MRP'],desc = [1,0]).display()

### Why there is a conflict of interest when using .sort() and orderby?
spark UI doesn't show exactly what happened since it tries to optimize visualization it's hard to comprehend what actually is happending in background
Also sort() is not reliable for multi directional sort! df.sort([...], desc=[...]) is not reliable for multi-column sort with mixed directions.
That syntax is inherited from the older SQL-style interface (DataFrame.sort()), and doesn’t always behave consistently, especially with multiple sort directions.

It may silently default to ascending order if the desc flags are not properly handled.
| Function / Method                                   | Global Sort     | Partition Sort      | Description                                                                                   | UI Reliable?          |
| --------------------------------------------------- | --------------- | ------------------- | --------------------------------------------------------------------------------------------- | --------------------- |
| `df.orderBy(col(...))`                              | ✅ Yes           | ❌ No                | Sorts entire DataFrame globally across all partitions. Shuffles data.                         | ✅ Mostly              |
| `df.sort(col(...))`                                 | ✅ Yes           | ❌ No                | Alias for `orderBy()`. Global sort. Shuffles data.                                            | ✅ Mostly              |
| `df.sort("col1", "col2")`                           | ✅ Yes           | ❌ No                | Global sort on columns. Same as `orderBy()`.                                                  | ✅ Mostly              |
| `df.sort(["col1", "col2"], desc=[1, 0])`            | ⚠️ *Unreliable* | ❌ No                | Global sort intended, but behavior with `desc` is inconsistent.                               | ❌ Often wrong         |
| `df.orderBy(col("col1").desc(), col("col2").asc())` | ✅ Yes           | ❌ No                | Explicit direction on each column. Recommended for mixed sort orders.                         | ✅ Yes                 |
| `df.sortWithinPartitions("col1")`                   | ❌ No            | ✅ Yes               | Sorts data **within each partition only**. No shuffle. Useful before saving partitioned data. | ❌ Not globally sorted |
| `df.repartition(...).sortWithinPartitions(...)`     | ❌ No            | ✅ Yes               | Sorts **within newly repartitioned** data. Still not globally sorted.                         | ❌                     |
| `df.orderBy(...).coalesce(1)` or `repartition(1)`   | ✅ Yes           | ✅ Yes (1 partition) | Forces global sort by collapsing all into 1 partition. Expensive.                             | ✅ Accurate            |
| `df.rdd.sortBy(...)`                                | ✅ Yes           | ❌ No                | RDD-level global sort. Use with caution for DataFrames.                                       | ❌ Requires `.toDF()`  |
| `display(df)`                                       | ❓ Maybe         | ❓ Partial           | UI **may sample** a few partitions only. Misleading without `limit()` or `collect()`.         | ⚠️ Often misleading   |



In [0]:
df.orderBy(col('Item_Weight').desc(),col('Item_MRP').asc()).display()

### Limit 

In [0]:
df.limit(15)

In [0]:
df.limit(20).display()

### Drop
we can drop single or multiple columns from data frame

In [0]:
df.drop('Item_Visibility').display()

In [0]:
df.drop('Item_Weight','Item_MRP').display()

### Drop Duplicates

In [0]:
df.dropDuplicates().display()

In [0]:
#
df.drop_duplicates(subset=['Item_Type']).display()

### Distinct
but is limited to all rows
We cannot give any column  name


In [0]:
df.distinct().display()

### Union and Union by name

In [0]:
data2 = [('Abhi',1),('charls',2),('Srikanth',3)]
data3 = [('Chaitanya',4),('gopi',6),('avinash',7)]

schema2 = '''
name string,
person_id int
'''
schema3 = '''
name string,
person_id int
'''

In [0]:
df2 = spark.createDataFrame(data2,schema2)
df2.display()

In [0]:
df3 = spark.createDataFrame(data=data3,schema=schema3)
df3.display()

In [0]:
df2.union(df3).display()


### The problem is when the schema is jumbled but we know that row names and datatypes match 


In [0]:
data2 = [('Abhi',1),('charls',2),('Srikanth',3)]
data3 = [(4,'Chaitanya'),(6,'gopi'),(7,'avinash')]

schema2 = '''
name string,
person_id int
'''
schema3 = '''
person_id int,
name string

'''

In [0]:
df2 = spark.createDataFrame(data=data2,schema=schema2)
df2.display()
df3 = spark.createDataFrame(data=data3,schema=schema3)
df3.display()

In [0]:
df2.union(df3).display()

In [0]:
df2.unionByName(df3).display()

### String Functions
initcap()
upper()
lower()

In [0]:
df.select(initcap(col('Item_Type'))).display()

In [0]:
df.select(upper(col('Item_Type'))).display()

In [0]:
df.select(lower(col('Item_Type'))).display()

### Date Functions
current_date()
date_add()
Date_sub()

In [0]:
df = df.withColumn('Todays_date',current_date())
df.display()

In [0]:
df = df.withColumn('Date_plus_week',date_add(col('Todays_date'),7))
df.display()

In [0]:
df = df.withColumn('Date_minus_week',date_sub(col('Todays_date'),7))
df.display()

In [0]:
df = df.withColumn('Difference_in_date',datediff(col('Date_plus_week'),col('Date_minus_week')))
df.display()

In [0]:
df.printSchema()

### If we want to change format of a date column we use date_format()

In [0]:
df.withColumn('date_format',date_format(col('Todays_date'),'MM-dd-yy')).display()

### Handling Nulls
dropping nulls
filling nulls


In [0]:
df.select('Outlet_Size').dropna().display()

In [0]:
df.dropna(how='any').display()

In [0]:
df.dropna(how='all').display()

In [0]:
df.dropna(subset=('Item_Weight')).display()

In [0]:
df.dropna(subset=('Item_Weight','Outlet_Size')).display()

In [0]:
df.dropna(how = 'any',subset=('Item_Weight','Outlet_Size')).display()

In [0]:
df.dropna(how = 'all',subset=('Item_Weight','Outlet_Size')).display()

In [0]:
df.dropna(subset=('Item_Weight','Outlet_Size')).display()

In [0]:
### Filling NA values
df.fillna('Not Available',subset=('Item_Weight','Outlet_Size')).display()

### Read doc string, values which are not of similar datatype as columns are simply ignored

In [0]:
### Filling NA values using dict
df.fillna({'Outlet_Size':'Not Available','Item_Weight':0}).display()

In [0]:
df.count()

In [0]:
df.fillna('Not for sale or lease', subset=['Outlet_Size']).display()

### split and indexing


In [0]:
df.withColumn('outlet_type_split',split(col('Outlet_Type'),pattern=' ')).display()

In [0]:
### Indexing can be simply done by slice operator 
df.withColumn('outlet_type_no',split(col('Outlet_Type'),pattern=' ')[1]).display()

### Explode function


In [0]:
df_explode = df.withColumn('outlet_type_no',split(col('Outlet_Type'),' '))

In [0]:
df_explode.display(5)

In [0]:
df_explode.withColumn('explode',explode(col('outlet_type_no'))).display()

In [0]:
df_explode.withColumn('explode',explode(col('outlet_type_no'))).count()

#### Array Contains
### Basically what it does is "if x in [list] return True else False, and Null if [list] is Null"

In [0]:
df_explode.withColumn('Type1_flag',array_contains(col('outlet_type_no'),value='Type1')).display()

#### Group_by

In [0]:
'''find sum of MRP for each item type
'''

df.groupBy(col('Item_Type')).agg(sum('Item_MRP')).display()

In [0]:
df.groupBy('Item_Type').agg(avg("Item_MRP")).display()

In [0]:
df.groupBy('Item_Type').agg(avg(col("Item_MRP"))).display()

In [0]:
df.groupBy(['Item_Type','Outlet_Size']).agg(sum('Item_MRP').alias('Total_sum')).display()

In [0]:
df.groupBy(['Item_Type','Outlet_Size']).agg(sum('Item_MRP').alias('Total_sum'),avg('Item_MRP').alias('Avg_sum')).display()

### Collect_List (It is also an aggregator fn)
This function is a replacement to group concat in mysql
What it does is, like how a  aggregator function collects all objects and applys the function(sum/avg)
similarly collect_list appends all the groupby values into a single list

In [0]:
df_explode.display()

In [0]:
df.groupBy('Outlet_Location_Type').agg(collect_list('Item_Type')).orderBy('Outlet_Location_Type').display()

In [0]:
# If you want to collect unique items use a set insted of a list!
df.groupBy(col('Outlet_Location_Type').alias('Tiers')).agg(collect_set('Item_Type').alias('Item_types')).orderBy(col('Tiers').desc()).display()

### Pivot 

In [0]:
df_explode.display()

In [0]:
# Lets say we want to know Tier wise, Fat consumptions and their sales we can simply use a pivot
df_explode.groupBy(['Outlet_Location_Type','Outlet_size']).pivot('Item_Fat_Content').agg(sum('Item_MRP'),avg('Item_MRP')).display()

### When Otherwise is similar to case when cond then ; statement of sql

In [0]:
df.withColumn('Veg_flag',when(col('Item_Type')=='Meat',True).otherwise(False)).display()

In [0]:
df1 = df.withColumn('Veg_flag',when(col('Item_Type')=='Meat',True).otherwise(False))
df2 = df.withColumn('Avg_meatprice',col('Item_Type')=='Meat').groupBy('Outlet_Location_Type').agg(avg('Item_MRP').alias('Avg_meatprice'))
# df1.join()

In [0]:
df3 = df1.join(df2,on='Outlet_Location_Type')
df3.filter(col('Veg_flag')==True).withColumn('category',when(col('Item_MRP')>col('Avg_meatprice'),'expensive').otherwise('normal')).display()

What we have done is applies these transformation on df3 filtered partion, it won't be applicable on df3 as a whole 
for it to get applied on df3 as a whole, we write two or more conditions in when statement

In [0]:
df3.withColumn('category',when(col('Item_Type')!='Meat' ,None)\
  .when((col('Item_Type')=='Meat') & (col('Item_MRP')>col('Avg_meatprice')), 'Expensive' ).otherwise('normal')).display()

## Most Imp-- Joins

In [0]:
dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 

schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 

df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schemaj2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataj2,schemaj2)

In [0]:
df1.display()
df2.display()

In [0]:
# Inner Join A intersection B
df1.join(df2,on=df1['dept_id']==df2['dept_id'],how='inner').display()

In [0]:
# Left Join (A + A intersection B)
df1.join(df2,on=df1['dept_id']==df2['dept_id'],how='left').display()

In [0]:
#Right Join ( B + A intersection B)
df1.join(df2,on=df1['dept_id']==df2['dept_id'],how='Right').display()

In [0]:
# Anti Join (A - (A intersection B))
df1.join(df2,on=df1['dept_id']==df2['dept_id'],how='anti').display()

In [0]:
# Anti Join (B - (B intersection A))
df2.join(df1,on=df1['dept_id']==df2['dept_id'],how='anti').display()

In [0]:
df2.display()
df1.display()

## Windows Functions

In [0]:
from pyspark.sql.window import Window

In [0]:
 df.withColumn('rownumber',row_number().over(Window.orderBy(col('Item_Identifier')))).display()

In [0]:
 # with partition by and order by
 df.withColumn('rownumber',row_number().over(Window.partitionBy(col('Item_Identifier')).orderBy(col('Item_Identifier')))).display()

In [0]:
#  rank vs row number
df.withColumn('ranknumber',rank().over(Window.orderBy(col('Item_Identifier')))).display()

In [0]:
# rank vs row number() vs dense rank()
df.withColumn('row_number',row_number().over(Window.orderBy(col('Item_Identifier'))))\
    .withColumn('rank_number',rank().over(Window.orderBy(col('Item_Identifier'))))\
        .withColumn('dense_ranknumber',dense_rank().over(Window.orderBy(col('Item_Identifier')))).display()

In [0]:
df.display()

In [0]:
df.withColumn('row_no', row_number().over(Window.orderBy('Item_Fat_Content'))).display()

In [0]:
from pyspark.sql.window import Window

In [0]:
df.withColumn('Row_no',row_number().over(Window.orderby('Item_Fat_Content'))).display()

In [0]:
# cumilative sum of all item type(agg(sum('Item_MRP')))
df.withColumn('Item_agg_sum',sum('Item_MRP').over(Window.orderBy('Item_Type')) ).display()

In [0]:
# cumulative sum(Total running sum)
df.withColumn('Item_cum_sum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()

In [0]:
df.withColumn('Total_sum', sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.currentRow,Window.unboundedFollowing))).display()

In [0]:
df.withColumn('Item_sum_UP(Running sum)',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.currentRow)))\
.withColumn('Item_cum_totalsum(All rows considered)',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)))\
.withColumn('Item_sum_UF(Running sum but away from total sum = Total- current MRP)',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.currentRow,Window.unboundedFollowing))).display()

### User Defined Functions
It is always recommended not to use UDF unless until it's required...UDF's are not optimized for spark jobs and are inconsistent!

In [0]:
from pyspark.sql.functions import hash
def sqrt(value):
    if value:
        return value**0.5
    else:
        return None


In [0]:
sqrt_udf = udf(sqrt)

In [0]:
df.withColumn('MRP_sqrt',sqrt_udf(col('Item_MRP').cast('int'))).display()

### data Writing
CSV

In [0]:
df.write.format('CSV').save('/FileStore/tables/CSV/saved_csv.csv')

## Data Writing modes :
### Append : Appends data on top of existing data
### Overwrite : Overwrites Data by deleting already existing data(Data lose occurs)
### Error : If old data exists it throws an error
### Ignore : If old data exists, it simply ignores!


In [0]:
df.write.format('CSV').mode('Append').save('/FileStore/tables/CSV/saved_csv1.csv') 

In [0]:
df.write.format('CSV').mode('Append').save('/FileStore/tables/CSV/saved_csv1.csv') 

In [0]:
## We have twice the rows because Data is apeend twice
df_append  = spark.read.format("csv").option('inferschema',True).option('header',True).load( '/FileStore/tables/CSV/saved_csv1.csv')
df_append.display()
df_append.count()

In [0]:
df.count()

In [0]:
df.write.format('CSV').mode('overwrite').save('/FileStore/tables/CSV/saved_csv1.csv')
df.count()

In [0]:

## We have the original dataset after performing a overwrite save 
df_ow  = spark.read.format("csv").option('inferschema',True).option('header',True).load( '/FileStore/tables/CSV/saved_csv1.csv')
df_ow.display()
df_ow.count()

#### Throws an exception if file already exists!

In [0]:
df.write.format('CSV').mode('Error').save('/FileStore/tables/CSV/saved_csv1.csv')

In [0]:
df.write.format('CSV').mode('Ignore').save('/FileStore/tables/CSV/saved_csv1.csv')

## Different types of file formats:
Parquet
Delta lake
ORC


In [0]:
df.write.format('parquet')\
    .mode('default').save('/FileStore/tables/CSV/saved_csv_parquet.csv')

In [0]:
# Save as a Table
df.write.format('parquet').mode('default').saveAsTable('my_data_asatable')

## Managed vs External tables?

## spark SQL

In [0]:
df.createTempView('temp_view')

In [0]:
%sql
select * from temp_view

In [0]:
spark.sql('select * from temp_view').display()