### Read CSV data


In [0]:
df = spark.read.csv("dbfs:/Volumes/workspace/default/my_volume/BigMart Sales.csv", header=True, inferSchema=True)
df.show()

# spark : is the session, created auto
# header: tells spark to display columns names at first
# inferSchema: tells spark to detect columns data types, otherwise all columns are treated as strings

In [0]:
df.display()

### READ JSON data format

In [0]:
df = spark.read.json("dbfs:/Volumes/workspace/default/my_volume/drivers.json")
df.display()
# no inferSchema here
# you CAN add multiLine=True if your json data  looks like this:

# json
# Copy
# Edit
# {
#   "name": "Ali",
#   "age": 30
# }
# i.e., one object over multiple lines.

### Schema definition

In [0]:
df.printSchema()

In [0]:
schema = """
    Item_Identifier STRING,
    Item_Weight STRING,
    Item_Fat_Content STRING,
    Item_Visibility DOUBLE,
    Item_Type STRING,
    Item_MRP DOUBLE,
    Outlet_Identifier STRING,
    Outlet_Establishment_Year INT,
    Outlet_Size STRING,
    Outlet_Location_Type STRING,
    Outlet_Type STRING,
    Item_Outlet_Sales DOUBLE
"""

# Item weight changed to string

In [0]:
df = spark.read.schema(schema).csv("dbfs:/Volumes/workspace/default/my_volume/BigMart Sales.csv",header=True)
df.display()

### SELECT STATEMENT


In [0]:
from pyspark.sql.functions import col

df.select(col("Item_Identifier"), col("Item_Type")).display()

In [0]:
df.select(col("Item_Outlet_Sales").alias("sales")).display()

In [0]:
df.select((col("Item_MRP")*1.1).alias("price with tax")).display()

### Filter/Where

In [0]:
from pyspark.sql.functions import col

df.filter(col("Item_Weight") > 10).display()


In [0]:
df.select(col("Item_Weight")).filter(col("Item_Weight") > 10).display()
# chaining


In [0]:
# and condition
df.filter((col("Item_Weight") > 10) & (col("Item_Type") == "Dairy")).display()

In [0]:
# or condition
df.filter((col("Item_Weight") > 10) | (col("Item_Type") == "Dairy")).display()

In [0]:
# not condition
df.filter(~(col("Item_Weight") == 10)).display()

In [0]:
# filter by membership
df.filter((col("Outlet_Size").isin("Small","Medium"))).display()

In [0]:
df.filter(col("Item_Weight") > 10).filter(col("Item_MRP") < 250).show()


In [0]:
df.filter((col("Outlet_Size").isNull()) & (col("Outlet_Location_Type").isin( "Tier 1","Tier 2"))).display()

In [0]:
df.select("Outlet_Location_Type", "Outlet_Size").distinct().display()


### Rename one or more columns in your DataFrame 

In [0]:
df = df.withColumnRenamed("Item_Identifier", "id")
# This returns a new DataFrame — Spark DataFrames are immutable.

In [0]:
df.display()

In [0]:
df = df.withColumnRenamed("Item_Fat_Content","Item_Fat") \
    .withColumnRenamed("Item_Type","type") 
df.display()
       
    

### withColumn()

#### It's very flexible — you can:

Add computed columns (e.g., tax = price × 0.1)

Replace values

Convert data types

Extract or transform strings, dates, etc.

#### Basic Syntax
****
df2 = df.withColumn("new_column_name", expression_or_transformation)



Like most PySpark methods, it returns a new DataFrame.

In [0]:
# 1. Add New Column (e.g. tax = 10% of MRP):
df = df.withColumn("tax",col("Item_MRP") * 0.1)
df.display()

In [0]:
# 2. Modify Existing Column (e.g. double the weight):
df = df.withColumn("Item_Weight", col("Item_Weight") * 2)
df.display()

In [0]:
# Convert Column Type (string → double):
df = df.withColumn("Item_Weight", col("Item_weight").cast("double"))
df.printSchema()

In [0]:
# Add a Constant Column:
from pyspark.sql.functions import lit
df = df.withColumn("Country",lit("Palestine"))

In [0]:
df.display()

In [0]:
from pyspark.sql.functions import regexp_replace, col

df = df.withColumn("Item_Fat", regexp_replace(col("Item_Fat"), "Regular", "Reg"))
display(df)

In [0]:
df.withColumn("Item_Weight", col("Item_Weight").cast("double")).printSchema()

### SORT


 Basic Syntax

df.sort("column_name").show()

This sorts the DataFrame by the specified column in ascending order (default).

In [0]:
df.sort(col("Item_Weight").desc()).display()

In [0]:
df.sort(col("Item_Weight").asc()).display()

In [0]:
df.sort(col("Item_Weight").desc(), col("Item_MRP").asc()).display()

In [0]:
df.sort(col("Item_Weight").desc(), col("Item_MRP").asc()).limit(8).display()

In [0]:
df.limit(10).display()

### DROP

#### drop() is used to remove columns from a DataFrame, or (in some contexts) remove rows that contain null values.

In [0]:
# drop one column
df = df.drop("Country")
# drop multi columns
df = df.drop("Item_Visibility", "tax")

In [0]:
df.display()

#### Drop Rows with Nulls

In [0]:
df = df.dropna()
# This removes all rows that have any null values.

# You can also drop only if specific columns are null:
df = df.dropna(subset=["Item_Weight","Item_MRP"])
df.display()
    

### dropDuplicates()

#### dropDuplicates() removes duplicate rows based on:

all columns (default), or

a subset of columns you specify.

In [0]:
df = df.dropDuplicates()

# Removes rows that are completely identical across all columns.

In [0]:
df.display()

In [0]:
df2 = df.dropDuplicates(["id"])
df2.display()
# Keeps only one row per unique Item_Identifier, and removes all other duplicates (even if other columns differ).

### UNION


In [0]:
df1 = df.filter(col("Item_Weight") > 5)
df2 = df.filter(col("Item_weight") > 10)
df3 = df2.union(df2)
# df3 = df3.dropDuplicates()
df3.display()
# df1 and df2 must have exactly the same schema (same number of columns, same names, same types, and order)

### Union by name


In [0]:
df3 = df1.unionByName(df2)
df3.display()
# it makes sure columns are within the same order, even if not, it will make them

In [0]:
df1 = [(1,'nsr'),(2,'ali')]
schema1 = 'id int, name string '
df1 = spark.createDataFrame(df1,schema1)


df2 = [(3,'mo'),(4,'bro')]
df2 = spark.createDataFrame(df2,schema1)


In [0]:
df3 = df1.union(df2)
df3.display()

In [0]:
df1 = [(1,'nsr'),(2,'ali')]
schema1 = 'id int, name string '
df1 = spark.createDataFrame(df1,schema1)


df2 = [('mo',3),('bro',4)]
schema2 = 'name string, id int'
df2 = spark.createDataFrame(df2,schema2)


In [0]:
# df1.unionAll(df2).display() error
df1.unionByName(df2).display()

### String functions

#### All used with from pyspark.sql.functions import *
(You’ll also usually combine them with col("column_name") inside select() or withColumn())

In [0]:
from pyspark.sql.functions import initcap, col
df.select(initcap(col("Item_Type"))).display()

In [0]:
from pyspark.sql.functions import *
df.select(upper(col("Item_Type"))).display()

In [0]:
df.select(col("Item_Type"),length(col("Item_Type"))).display()

### DATE

In [0]:
df = df.withColumn("current date", current_date())
df = df.withColumn("after week", date_add(col("current date"), 7))
df.display()

In [0]:
df = df.withColumn("dates_diff",date_diff(col("after week"), col("current date")))
df.display()

### dealing with nulls


In [0]:
# drop null values
df = df.dropna()


In [0]:
df.display()

In [0]:
df = df.fillna({
    "Item_Weight":0.0,
    "Outlet_Size":"Unknown"
})

df.display()

In [0]:
average = df.select(avg(col("Item_Weight"))).first()[0]
average

In [0]:
df = df.fillna({
    "Item_Weight": average
})

In [0]:
df.display()

### Split & indexing

In [0]:
df.withColumn("Outlet_Type",split(col("Outlet_Type")," ")).display()

In [0]:
df.withColumn("Outlet_Type",split(col("Outlet_Type")," ")[1]).display()

### EXPLODE

In [0]:

df2 = df.withColumn("Outlet_Type", split(col("Outlet_Type"), " "))
df2.withColumn("Outlet_Type",explode(col("Outlet_Type"))).display()