### I. READING

To get the url of the file if i cant copypasta the dataset path something like a pwd in Linux, then write this below command - 

```dbutils.fs.ls('/FileStore/tables')```

#### Reading json file

In [0]:
dfj = spark.read.format('json').option('inferschema', True).option('header',True).option('multiline', False).load('/FileStore/tables/drivers.json')

dfj.display()

#### Reading csv file

In [0]:
df = spark.read.format('csv').option('inferschema', True).option('header', True).load('/FileStore/tables/BigMart_Sales.csv')

#df.show()      #doesnt display the csv neatly

df.display()

### Schema Definition

In [0]:
#to view the schema

df.printSchema()

### DDL Schema

- Here we woud be changing datatype of itemweight from double to string

In [0]:

my_ddl_schema = '''
                    Item_Identifier STRING,
                    Item_Weight STRING,       
                    Item_Fat_Content STRING, 
                    Item_Visibility DOUBLE,
                    Item_Type STRING,
                    Item_MRP DOUBLE,
                    Outlet_Identifier STRING,
                    Outlet_Establishment_Year INT,
                    Outlet_Size STRING,
                    Outlet_Location_Type STRING, 
                    Outlet_Type STRING,
                    Item_Outlet_Sales DOUBLE 

                ''' 

In [0]:

df = spark.read.format('csv')\
            .schema(my_ddl_schema)\
            .option('header',True)\
            .load('/FileStore/tables/BigMart_Sales.csv') 

df.display()

### Structype() schema

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

mystruct = StructType([
    StructField('Item_Identifier', StringType(), True),
    StructField('Item_Weight',StringType(),True), 
    StructField('Item_Fat_Content',StringType(),True), 
    StructField('Item_Visibility',StringType(),True), 
    StructField('Item_MRP',StringType(),True), 
    StructField('Outlet_Identifier',StringType(),True), 
    StructField('Outlet_Establishment_Year',StringType(),True), StructField('Outlet_Size',StringType(),True), 
    StructField('Outlet_Location_Type',StringType(),True), StructField('Outlet_Type',StringType(),True), 
    StructField('Item_Outlet_Sales',StringType(),True)
])

In [0]:
df = spark.read.format('csv').schema(mystruct).option('header',True).load('/FileStore/tables/BigMart_Sales.csv')

df.display()

### II. TRANSFORMATION

#### Select 

In [0]:
df.select('Item_Identifier','Item_Weight','Item_Fat_Content').display()

#or

df.select(col('Item_Identifier'),col('Item_Weight'),col('Item_Fat_Content')).display()   #col() is better because we can use it with aggregation function



#### Alias

- Rename a column

In [0]:
df.select(col('Item_Identifier').alias('Item_ID')).display() 

#### Filter 

1) Filter data where the fat content is regular

In [0]:
df.filter(col('Item_Fat_Content') == 'Regular').display()

2) Slice the data with softdrinks and where weight is less than 10

In [0]:
df.filter((col('Item_Type') == 'Soft Drinks') & (col('Item_Weight') < 10)).display()

3. Fetch the data with tier in (tier 1 or 2) and outlet size is null

In [0]:
df.filter((col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1','Tier 2'))).display()

#### withColumnRenamed

In [0]:
df.withColumnRenamed('Item_Weight','Item_Wt').display()

#### withColumn

1. Creating a new col based on existing column

- using the lit function here => In PySpark, lit() is a function within the pyspark.sql.functions module used to create a new column in a DataFrame with a constant or literal value.

In [0]:
df = df.withColumn('flag',lit("new"))
df.display()

Create a new column based on the transformation done in existing cols

- Here i will make a new col which is the product of item weight * item_mrp 

In [0]:
df.withColumn('Item_Product', col('Item_Weight') * col('Item_MRP')).display()

2) Make a transformation on the existing column

- regexp_replace() => replacing substrings that match a specified regular expression pattern. replace strings

In [0]:

df = df.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Regular","Reg"))\
    .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Low Fat","Lf"))

df.display()

#### Type Casting


In [0]:
df = df.withColumn('Item_Weight', col('Item_Weight').cast(StringType()))

df.printSchema()

#### Sort

1) Based on descending item weight

In [0]:
df.sort(col('Item_Weight').desc()).display()

2) Ascending order of item visibility

In [0]:

df.sort(col('Item_Visibility').asc()).display()
     

3) Sorting based on multiple columns

- Both item wright and item visibilty in ascending and descending, respectively

In [0]:

df.sort(['Item_Weight','Item_Visibility'],ascending = [1,0]).display()
     

#### Limit

In [0]:
df.limit(10).display()

#### Drop

1. Drop one column

In [0]:
df.drop('Item_Visibility').display()

2) Drop multiple columns

In [0]:
df.drop('Item_Visibility','Item_Type').display()

#### Drop_Duplicates

- similar to pandas dropna

1) drop duplicates across all columns

In [0]:
df.drop_duplicates().display()

2) drop duplicates for a specific col

In [0]:
df.drop_duplicates(subset = ['Item_Type']).display()

#### Union and Union by Name

##### Making Dataframes

In [0]:
#create dataframe = pd.Dataframe in pandas
data1 = [('1','kad'),
        ('2','sid')]
schema1 = 'id STRING, name STRING' 

df1 = spark.createDataFrame(data1,schema1)

data2 = [('3','rahul'),
        ('4','jas')]
schema2 = 'id STRING, name STRING' 

df2 = spark.createDataFrame(data2,schema2)


#### Union

df2 below df1

In [0]:
df1.union(df2).display()

In [0]:
data1 = [('kad','1',),
        ('sid','2',)]
schema1 = 'name STRING, id STRING' 

df1 = spark.createDataFrame(data1,schema1)

df1.display()
     

#### Union by name

- where we union cols based on the colnames, not simply on the dataframe specified

In [0]:

df1.unionByName(df2).display()

### String Functions

#### initcap(), upper(), lower()

In [0]:
df.select(initcap('Item_Type')).display()

### Date Functions

#### Current_Date

In [0]:
df = df.withColumn('curr_date', current_date())
df.display()

#### Date_add()

In [0]:
df = df.withColumn('week_after', date_add('curr_date',7))
df.display()

#### Date_sub

In [0]:
df = df.withColumn('week_before',date_sub('curr_date',-7))
df.display()

#### datediff

In [0]:
df = df.withColumn('datediff',datediff('curr_date','week_after'))
df.display()

#### date_format

In [0]:
df = df.withColumn('week_before',date_format('week_before','dd-mm-yyyy'))
df.display()

### III. HANDLING NULLS

#### Dropping Nulls

##### General null value dropping

In [0]:
df.dropna(how = 'all')

In [0]:
df.dropna(how = 'any').display()

#### Null Value Dropping for Specific Column

In [0]:
df.dropna(subset=['Outlet_Size']).display()

#### Filling Nulls

##### Fillna

1) Fill nulls across all columns

In [0]:
df.fillna('NotAvailable').display()

2) Fill nulls across a specified column


In [0]:
df.fillna('NotAvailable',subset=['Outlet_Size']).display()

### IV. SPLITTING AND INDEXING

#### Split Function

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')).display()

#### Indexing 

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')[1]).display()

#### Explode

- Used to transform a column containing list-like entries (e.g., lists, tuples, or arrays) into multiple rows. 
- Each element within the list-like entry of a cell is converted into a separate row
- Done while replicating the index and values of other columns from the original row.
- Present in both Pandas and Pyspark

So -> [a b] is turned into \
[a \
                            b]

In [0]:
dfex = df.withColumn('Outlet_Type',split('Outlet_Type',' '))
dfex.display()

In [0]:
dfex.withColumn('Outlet_Type', explode('Outlet_Type')).display()

#### Array_contains

- This function is used to determine if a specific value is present within an ArrayType column in a PySpark DataFrame. 
- It returns a boolean value (True if the array contains the value, False otherwise, and Null if the array itself is Null).

In [0]:
dfex.withColumn('Type1 Flag', array_contains('Outlet_Type','Type1')).display()

### IV. GROUPBY AND PIVOT

#### Groupby

1) Sum of MRP for each Item Type

In [0]:
df.groupBy('Item_Type').agg(sum("Item_MRP")).display()

2) Find the avg MRP per Item group

In [0]:
df.groupBy('Item_Type').agg(avg("Item_MRP")).display()

3. Sum of Item_MRP, grouped by item_type and outlet size. 
Rename the aggregated column to Total MRP.

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP')).display()

4. Use two groupby and two agg

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP'),avg('Item_MRP').alias('Avg_MRP')).display()

#### Collect_List

- Like group_concat() in MySQL

In [0]:
data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]

schema = 'user string, book string'

df_book = spark.createDataFrame(data,schema)

df_book.display()

In [0]:
df_book.groupBy('user').agg(collect_list('book')).display()

#### Pivot

```dfname.groupBy(cols you want to group the pivot upon).pivot(whose cols do you want as pivot col names).agg(whose cols do you want to be pivot col content).alias(if u wanna assign as alias to the agg)```

In [0]:
df.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP').alias('Avg_MRP')).display()

### V. WHEN - OTHERWISE

- the Case if function in SQL

1. Differentitate between Veg and Non Veg Item Type

In [0]:
df = df.withColumn('foodflag', when(col('Item_Type') == 'Meat','Non-Veg').otherwise('Veg'))
df.display()

2. If Veg and more than 100 -> Veg Expensive else Veg Inexpensive
Same for Non Veg

In [0]:
df.withColumn('veg_exp_flag',when(((col('foodflag')=='Veg') & (col('Item_MRP')<100)),'Veg_Inexpensive')\
                            .when((col('foodflag')=='Veg') & (col('Item_MRP')>100),'Veg_Expensive')\
                            .otherwise('Non_Veg')).display() 

### VI. JOINS

In [0]:
dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 

schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 

df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schemaj2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataj2,schemaj2)
     

#### Inner Join

- Outputs values common in both dataframes
- When I say values, it means the stuff present in the columns as well.
- Dept id of d1 has d01,d02,d03,d03,d05 and Dept id of d2 has d01,d02,d03,d04,d05, in that case, the output will be d01, d02, d03, d03, and d05. 
- d04 isnt common to both columns

In [0]:
df1.join(df2, df1['dept_id'] == df2['dept_id'], 'inner').display()

#### Left Join
- inner join + the df on the left is given more priority 

In [0]:
df1.join(df2, df1['dept_id'] == df2['dept_id'], 'left').display()

#### Right Join

- inner join + df on the right is given more priority

In [0]:
df1.join(df2, df1['dept_id'] == df2['dept_id'], 'right').display()

#### Anti Join

- get the col that are not in a df, which is there in the other 

In [0]:
df1.join(df2, df1['dept_id'] == df2['dept_id'], 'anti').display()

### VII. WINDOW FUNCTIONS

In [0]:
df = spark.read.format('csv').option('inferschema', True).option('header', True).load('/FileStore/tables/BigMart_Sales.csv')

#df.show()      #doesnt display the csv neatly

df.display()

In [0]:
df.dropna(how = 'any')

#### Row Number

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, rank, dense_rank

In [0]:
df.withColumn('rows', row_number().over(Window.orderBy('Item_Identifier'))).display()

#### Rank

- If arr = [aa,bb,bb,cc,dd], then rank will make it [1,2,2,4,5]

In [0]:
df.withColumn('rows', rank().over(Window.orderBy('Item_Identifier'))).display()

##### Lets make it descending

In [0]:
df.withColumn('rows', rank().over(Window.orderBy(col('Item_Identifier').desc()))).display()

#### Dense rank

- If arr = [aa,bb,bb,cc,dd], then rank will make it [1,2,2,3,4]

In [0]:
df.withColumn('rows', rank().over(Window.orderBy(col('Item_Identifier').desc())))\
    .withColumn('dense', dense_rank().over(Window.orderBy(col('Item_Identifier').desc()))).display()

#### Cumulative Sum

- same as the one in statistics

##### unboundedPreceding, currentRow 

- This defines a window that includes the current row and all rows before it in the partition.
- The window starts at the first row and ends at the current row being processed.
- Used for : Cumulative or running calculations, such as a cumulative sum or running average, as the window expands with each successive row.

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import sum as spark_sum, col

# Assuming 'Item_MRP' is the column to sum
window_spec = Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding, Window.currentRow)

df.withColumn(
    'cumsum',
    spark_sum(col('Item_MRP').cast('float')).over(window_spec)  # Cast to float
)
df.display()  # or df_with_cumsum.show()


##### unboundedPreceding, unboundedFollowing

- This defines a window that includes all rows in the partition.
- The window starts at the very first row and ends at the very last row, encompassing the entire partition.
-  Used for : Calculating the total sum or average over the entire dataset or within each partitioned group.

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import sum as spark_sum, col

window_spec = Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df_with_totalsum = df.withColumn(
    'totalsum',
    spark_sum(col('Item_MRP').cast('float')).over(window_spec)
)
df_with_totalsum.display()


### VIII. USER DEFINED FUNCTIONS

#### 1. Create a function

In [0]:
def myf(x):
    return x*x

#### 2. Create Pyspark function

In [0]:
myu = udf(myf)

#### 3. Usage in a spark job

In [0]:
df.withColumn('news',myu('Item_MRP')).display()

### IX. DATA WRITING

- csv, parquet and json are most commonly used

#### A. Writing to CSV Format

In [0]:
df.write.format('csv') \
    .save('/FileStores/tables/CSV/data.csv')

print("CSV Saved to Filestore!")

##### DATA WRITING MODES

- Append  => Adds to the present content/file/folder. We dont lose any data here.
- Overwrite => Deletes and overwrites the new info in the current file//folder.
- Error => Throws error if a file is already present. Used as a checking statement.
- Ignore => Ignores the file if already present.

##### Append

In [0]:
df.write.format('csv').mode('append').save('/FileStores/tables/CSV/data.csv')

##### Another way of writing this

In [0]:
df.write.format('csv').mode('append').option('path','/FileStores/tables/CSV/data.csv').save()

#####  Overwrite

In [0]:
df.write.format('csv').mode('overwrite').option('path','/FileStores/tables/CSV/data.csv').save()

#### Error

In [0]:
df.write.format('csv').mode('error').option('path','/FileStores/tables/CSV/data.csv').save()

#### Ignore

In [0]:
df.write.format('csv').mode('ignore').option('path','/FileStores/tables/CSV/data.csv').save()

#### B. Writing to Parquet format

- It is a columnar based file format.
- Efficient compression: Similar data types are stored together, making compression more effective.

- Faster reads: When querying only specific columns, only those columns are read from disk.

- Optimized for analytical queries: Aggregations and column-specific operations perform faster.

- OLAP suitable

- ```Metadata is stored at the footer```

Contrast with rows?

- Formats like CSV, JSON, or Avro (optional row-oriented) store data row by row.

- Row-based formats are more suitable for OLTP, where entire rows are frequently read or written.

In [0]:
df.write.format('parquet').mode('overwrite').option('path','/FileStores/tables/CSV/data.csv').save()

We are using the overwrite mode because we wanna overwrite the file format (csv) to parquet.

#### C. Writing to Delta Lake / Delta File Format

- Built on top of parquet (however file extension is still .parquet)
- Metadata isnt stored in the same file at all. It stores it separately.
- Metadata is stored in a transaction log, specifically in a folder named ```_delta_log/```.


##### Components of the delta log folder

- JSON files (.json) — for operation logs (e.g. schema changes, file additions/deletions)

- Parquet files (.parquet) — for checkpoints, which summarize the metadata for performance


#### D. Writing the file as a table

In [0]:
df.write.format('parquet').mode('overwrite').saveAsTable('my_table')

#### Managed VS External Tables

- Managed : Operations used to manipulate data in the table is stored in the default location (here, Databricks). When the table is added, deleted or appended, data is also given the same treatment because it isnt in our control anyway.

- External: Operations used to manipulate data in the table, however it is stored in our custom location. When table is added, deleted or appended, data isnt affected. Instead, DB just drops the schema keeping the data intact.

### X. SPARK SQL

#### Converting a dataframe to a temporary view

- It is wiped out after the session is over

- Using createTempView()

In [0]:
df.createTempView('my_view')

In [0]:
%sql
SELECT * FROM my_view
WHERE Item_Weight = 9.3;

#### Convert back to a df to write it into storage

In [0]:
df_sql = spark.sql("SELECT * FROM my_view WHERE Item_Weight = 9.3")

In [0]:
df_sql.display()

####### THE END ########

#### 

##### 