### Data Reading

In [0]:
# dbutils.fs.ls('/FileStore/')

In [0]:
df=spark.read.format('csv').option('inferSchema',True).option('header',True).load('/Volumes/firstcatalog/firstschema/firstvolume/BigMart Sales.csv')

In [0]:
df.show()

In [0]:
df.display()

### Data Reading JSON

In [0]:
# \ is used for line continuation
df_json = spark.read.format('json') \
    .option('inferSchema', True) \
    .option('header', True) \
    .load('/Volumes/firstcatalog/firstschema/volume2/drivers.json')

df_json.show(5)
df_json.display()
# or
display(df_json)


### Schema Definition

In [0]:
df.printSchema()

### DDL Schema

In [0]:
# changing data type of item weight from double to string
my_ddl_schema= '''
        Item_Identifier string,
        Item_Weight string,
        Item_Fat_Content string,
        Item_Visibility double,
        Item_Type string,
        Item_MRP double,
        Outlet_Identifier string,
        Outlet_Establishment_Year int,
        Outlet_Size string,
        Outlet_Location_Type string,
        Outlet_Type string,
        Item_Outlet_Sales double
'''
df=spark.read.format('csv')\
        .schema(my_ddl_schema)\
        .option('header',True)\
        .load('/Volumes/firstcatalog/firstschema/firstvolume/BigMart Sales.csv')

In [0]:
df.display()
# now data type will be changed everywhere, even if we run df.display() again of before changing data type, that too will show changed data type as we have already ran the changed schema part
# we can see the changed data type in column heading also as well as if we print again the schema
# there is one other way also to change the data type i.e., using structtype, u can explore that...
# at the end we again converted our dt back to double from string as later we need to perform some operations like aggregation,...    For data transformation back to original run again the top cell   df=spark.read.format('csv').option('inferSchema',True).option('header',True).load('/Volumes/firstcatalog/firstschema/firstvolume/BigMart Sales.csv')

### SELECT

In [0]:
# df_sel=df.select('Item_Identifier','Item_Weight','Item_MRP')
# df_sel.display()
# OR
df.select('Item_Identifier','Item_Weight','Item_MRP').display()

In [0]:
# Another way to select, using col
from pyspark.sql.functions import col
df.select(col('Item_Identifier'),col('Item_Weight'),col('Item_MRP')).display()

### ALIAS

In [0]:
df.select(col('Item_Identifier').alias('Item_ID'),col("Item_MRP").alias('Item Price')).display()
# if u want all the columns and some columns with alias then this will add both columns at the end, and if u want the columns woth alias in that column only replacing previous name, then u have to keep writing col for each column name with separating commas and where u want alias there u write alias
# display(df.select("*", col('Item_Identifier').alias('Item_ID'), col("Item_MRP").alias('Item Price')))

### FILTER

#### Scenario - 1

In [0]:
# Filter the data with dat content=Regular
from pyspark.sql.functions import col
df.filter(col('Item_Fat_Content')=='Regular').display()

#### Scenario - 2

In [0]:
# Slice the data with item type= Soft Drinks and item weight <10
from pyspark.sql.functions import col
df.filter( (col('Item_Type')=='Soft Drinks') & (col('Item_Weight')<10) ).display()

#### Scenario - 3

In [0]:
# Fetch the data with tier in (tier1 or tier2) and outlet size is null
from pyspark.sql.functions import col
df.filter( (col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1','Tier 2')) ).display()

### withColumnRenamed

In [0]:
df.withColumnRenamed('Item_Weight','Item Wt').display()

### withColumn

#### Scenario - 1

In [0]:
# Creating a new column flag with the constant value new
# from pyspark.sql.functions import lit
df=df.withColumn('flag',lit('new'))
df.display()

In [0]:
# Creating a new column but this time not with constant values, we'll be multiplying item weight and item mrp
from pyspark.sql.functions import col
df.withColumn('multiplyColumn',col('Item_Weight')*col('Item_MRP')).display()

#### Scenario - 2

In [0]:
# Modify the existing column. We'll be modifying itemFatContent column contents: low fat->LF, regular->Reg

from pyspark.sql.functions import col, regexp_replace
df.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),'Low Fat','LF'))\
    .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),'Regular','Reg')).display()

# from pyspark.sql.functions import col,when
# df.withColumn('Item_Fat_Content',when(col('Item_Fat_Content')=='Low Fat','LF').when(col('Item_Fat_Content')=='Regular','Reg').otherwise(col('Item_Fat_Content'))).display()

### Type Casting

In [0]:
df=df.withColumn('Item_Weight', col('Item_Weight').cast("string")) 
# OR
# df=df.withColumn('Item_Weight', col('Item_Weight').cast(StringType()))
df.printSchema()

### sort

#### Scenario - 1

In [0]:
df.sort(col('Item_Weight').desc()).display()

#### Scenario - 2

In [0]:
df.sort(col('Item_Visibility').asc()).display()

#### Scenario - 3

In [0]:
df.sort(['Item_Weight','Item_Visibility'], ascending=[0,1]).display()

### LIMIT

In [0]:
df.limit(10).display()

### DROP

#### Scenario - 1

In [0]:
df.drop('Item_Visibility').display()

#### Scenario - 2

In [0]:
df.drop('Item_Visibility','Item_Type').display()

### Drop_Duplicates

#### Scenario - 1

In [0]:
# removes duplicate rows across all columns.
df.dropDuplicates().display()

#### Scenario - 2

In [0]:
# removes duplicates from a particular column
df.dropDuplicates(['Item_Type']).display()

In [0]:
df.distinct().display()

### UNION and UNION BY NAME

#### Preparing Dataframes

In [0]:
data1 = [('1','kad'),
        ('2','sid')]
schema1 = 'id STRING, name STRING' 

df1 = spark.createDataFrame(data1,schema1)

data2 = [('3','rahul'),
        ('4','jas')]
schema2 = 'id STRING, name STRING' 

df2 = spark.createDataFrame(data2,schema2)

In [0]:
df1.display()
df2.display()

#### Union

In [0]:
df1.union(df2).display()

In [0]:
# Creating df1 again
data1 = [('kad','1',),
        ('sid','2',)]
schema1 = 'name STRING, id STRING' 

df1 = spark.createDataFrame(data1,schema1)
df1.display()
df1.union(df2).display()

### Union by Name

In [0]:
df1.unionByName(df2).display()

### String Functions

#### Initcap(), lower and upper

In [0]:
from pyspark.sql.functions import col, upper
# also try initcap and lower
df.select(upper(col('Item_Type')).alias('upper_Item_Type')).display()

### Date Functions

#### Current_Date, Date_Add(), DateSub(), DateDiff

In [0]:
# from pyspark.sql.functions import current_date
# df = df.withColumn('curr_date',current_date())
# df.display()

from pyspark.sql.functions import current_date, date_add, datediff, col
# add today's date
df = df.withColumn("curr_date", current_date())
# add a new column that is 7 days after curr_date
df = df.withColumn("week_after", date_add(col("curr_date"), 7))
# calculate difference between the two
df = df.withColumn("datediff", datediff(col("week_after"), col("curr_date")))
display(df)   # if in Databricks

# also explore date_format...


### Handling Nulls

#### Dropping NUlls

In [0]:
# Drop rows where all columns are null.
df.dropna('all').display()
# Drop rows where any column has null.
df.dropna('any').display()
# Only check Outlet_Size column for nulls.
df.dropna(subset=['Outlet_Size']).display()

#### Filling Nulls

In [0]:
# Replace all null values in all columns with 'NotAvailable'.
df.fillna('NotAvailable').display()
# Only replace nulls in the Outlet_Size column with 'NotAvailable'. Other columns stay untouched.
df.fillna('NotAvailable',subset=['Outlet_Size']).display()

### SPLIT AND INDEXING

#### SPLIT

In [0]:
# split(col, pattern) → splits a string column into an array of substrings, based on the delimiter. 
# Here, delimiter = ' '
from pyspark.sql.functions import split
df.withColumn('Outlet_Type',split('Outlet_Type',' ')).display()

#### Indexing

In [0]:
df.withColumn('Outlet_Type', split('Outlet_Type', ' ')[1]).display()

#### Explode

In [0]:
from pyspark.sql.functions import split, explode, array_contains

# Outlet_Type is no longer a string column → it’s an array column.
df_exp = df.withColumn('Outlet_Type',split('Outlet_Type',' '))
df_exp.display()
     
# explode() takes each element of the array and creates a new row.
df_exp.withColumn('Outlet_Type',explode('Outlet_Type')).display()
df_exp.display()

# Does the array in Outlet_Type contain the word "Type1"   array_contains → returns a boolean (True/False).
df_exp.withColumn('Type1_flag',array_contains('Outlet_Type','Type1')).display()

### GroupBY

#### Scenario - 1

In [0]:
df.groupBy('Item_Type').agg(sum('Item_MRP')).display()

#### Scenario - 2

In [0]:
df.groupBy('Item_Type').agg(avg('Item_MRP')).display()

#### Scenario - 3

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP')).display()

#### Scenario - 4

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP'),avg('Item_MRP')).display()

### Collect_List

#### Creating the sample DataFrame

In [0]:
data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]
schema = 'user string, book string'
df_book = spark.createDataFrame(data,schema)
df_book.display()

#### Group and collect books per user

In [0]:
from pyspark.sql.functions import collect_list
df_book.groupBy('user').agg(collect_list('book')).display()

#### Select specific columns

In [0]:
df.select('Item_Type','Outlet_Size','Item_MRP').display()

### PIVOT

In [0]:
# .pivot('Outlet_Size')→ Takes values from Outlet_Size (like Small, Medium, High) and turns them into columns.
df.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).display()

### When-Otherwise

In [0]:
from pyspark.sql.functions import when, col
df = df.withColumn('veg_flag',when(col('Item_Type')=='Meat','Non-Veg').otherwise('Veg'))
df.display()

In [0]:
from pyspark.sql.functions import when, col
df.withColumn('veg_exp_flag',when(((col('veg_flag')=='Veg') & (col('Item_MRP')<100)),'Veg_Inexpensive')\
                            .when((col('veg_flag')=='Veg') & (col('Item_MRP')>100),'Veg_Expensive')\
                            .otherwise('Non_Veg')).display() 

### JOINS

In [0]:
dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 
schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 
df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]
schemaj2 = 'dept_id STRING, department STRING'
df2 = spark.createDataFrame(dataj2,schemaj2)

df1.display()
df2.display()

#### Inner Join

In [0]:
df1.join(df2, df1['dept_id']==df2['dept_id'], 'inner').display()

#### Left Join

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'left').display()

#### RIGHT JOIN

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'right').display()

#### ANTI JOIN

In [0]:
# Returns rows from left DataFrame (employees) that don’t have a match in right DataFrame
df1.join(df2,df1['dept_id']==df2['dept_id'],'anti').display()

### WINDOW FUNCTIONS

#### ROW_NUMBER()

In [0]:
df.display()

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# row_number(): A window function that assigns a unique sequential number (1, 2, 3, …) to each row in the defined order. Think of it as adding a new column like an index, based on ordering.
df.withColumn('rowCol',row_number().over(Window.orderBy('Item_Identifier'))).display()

#### RANK VS DENSE RANK

In [0]:
from pyspark.sql.functions import rank, dense_rank, col
from pyspark.sql.window import Window

df.withColumn('rankCol',rank().over(Window.orderBy(col('Item_Identifier').desc())))\
        .withColumn('denseRankCol',dense_rank().over(Window.orderBy(col('Item_Identifier').desc()))).display()
     

In [0]:
# Cumulative Sum (Running Total)
from pyspark.sql.functions import sum
df.withColumn('sumCol',sum('Item_MRP').over(Window.orderBy('Item_Identifier').rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()

# explore more given below...

# df.withColumn('cumsum',sum('Item_MRP').over(Window.orderBy('Item_Type'))).display()

# df.withColumn('cumsum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()

# df.withColumn('totalsum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).display()
     

### USER DEFINED FUNCTIONS (UDF)

#### STEP - 1

In [0]:
def my_func(x):
    return x*x

#### STEP - 2

In [0]:
my_udf = udf(my_func)

df.withColumn('mynewcol',my_udf('Item_MRP')).display()

### DATA WRITING

#### CSV

In [0]:
# Now directory named data.csv will be made in the desired location
df.write.format('csv')\
        .save('/Volumes/firstcatalog/firstschema/firstvolume/data.csv')

In [0]:
# Now all the contents of df will be written into this directory
df.write.format('csv')\
        .mode('append')\
        .save('/Volumes/firstcatalog/firstschema/firstvolume/data.csv')

# OR
# df.write.format('csv')\
#         .mode('append')\
#         .option('path','/FileStore/tables/CSV/data.csv')\
#         .save()

#### Overwrite

In [0]:
df.write.format('csv')\
.mode('overwrite')\
.option('path','/Volumes/firstcatalog/firstschema/firstvolume/data.csv')\
.save()

#### Error

In [0]:
# This is the default mode in Spark. It fails if the target path already exists. No data will be written if the folder is already present.
df.write.format('csv')\
.mode('error')\
.option('path','/Volumes/firstcatalog/firstschema/firstvolume/data.csv')\
.save()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-8217662817346155>, line 4[0m
[1;32m      1[0m df[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m'[39m[38;5;124mcsv[39m[38;5;124m'[39m)\
[1;32m      2[0m [38;5;241m.[39mmode([38;5;124m'[39m[38;5;124merror[39m[38;5;124m'[39m)\
[1;32m      3[0m [38;5;241m.[39moption([38;5;124m'[39m[38;5;124mpath[39m[38;5;124m'[39m,[38;5;124m'[39m[38;5;124m/Volumes/firstcatalog/firstschema/firstvolume/data.csv[39m[38;5;124m'[39m)\
[0;32m----> 4[0m [38;5;241m.[39msave()

File [0;32m/databricks/python/lib/python3.12/site-packages/pyspark/sql/connect/readwriter.py:679[0m, in [0;36mDataFrameWriter.save[0;34m(self, path, format, mode, partitionBy, **options)[0m
[1;32m    677[0m     [38;5;28mself[39m[38;5;241m.[39mformat([38;5;28mformat[39m)
[1;32m    678[0m 

#### Ignore

In [0]:
# "ignore" mode means: If the path already exists, Spark will do nothing and will not throw an error. If the path does not exist, Spark will create it and write the DataFrame.
df.write.format('csv')\
.mode('ignore')\
.option('path','/Volumes/firstcatalog/firstschema/firstvolume/data.csv')\
.save()

### PARQUET

In [0]:
# Explore about parquet...
df.write.format('parquet')\
.mode('overwrite')\
.option('path','/Volumes/firstcatalog/firstschema/firstvolume/data.csv')\
.save()

### TABLE

In [0]:
df.write.format('delta')\
.mode('overwrite')\
.saveAsTable('my_table')
     
df.display()

### SPARK SQL

#### createTempView

In [0]:
# This registers your DataFrame (df) as a temporary SQL view inside the Spark session.
# if the view named my_view already exists then error. So drop first spark.catalog.dropTempView("my_view")   or   df.createOrReplaceTempView("my_view")
df.createTempView('my_view')

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-5792101506150881>, line 2[0m
[1;32m      1[0m [38;5;66;03m# This registers your DataFrame (df) as a temporary SQL view inside the Spark session.[39;00m
[0;32m----> 2[0m df[38;5;241m.[39mcreateTempView([38;5;124m'[39m[38;5;124mmy_view[39m[38;5;124m'[39m)

File [0;32m/databricks/python/lib/python3.12/site-packages/pyspark/sql/connect/dataframe.py:2180[0m, in [0;36mDataFrame.createTempView[0;34m(self, name)[0m
[1;32m   2176[0m [38;5;28;01mdef[39;00m [38;5;21mcreateTempView[39m([38;5;28mself[39m, name: [38;5;28mstr[39m) [38;5;241m-[39m[38;5;241m>[39m [38;5;28;01mNone[39;00m:
[1;32m   2177[0m     command [38;5;241m=[39m plan[38;5;241m.[39mCreateView(
[1;32m   2178[0m         child[38;5;241m=[39m[38;5;28mself[39m[38;5;241m.[39m_plan, name[38;5;241m=

In [0]:
%sql
select * from my_view where Item_Fat_Content = 'Low Fat'

-- my_table is already created so we can use it
-- select * from my_table where Item_Fat_Content = 'Low Fat'

In [0]:
df_sql = spark.sql("select * from my_view where Item_Fat_Content = 'Regular'")
df_sql.display()