# Databricks Basics with PySpark

## Data Reading

We will read data using the data frame reader api in spark.

In [0]:
dbutils.fs.ls("dbfs:/Volumes/workspace/default/custom-volume/")

### From CSV

In [0]:
# df = spark.read.table("workspace.default.big_mart_sales")

df_csv = (
    spark.read.format("csv")
    .option("inferSchema", True)
    .option("header", True)
    .load("dbfs:/Volumes/workspace/default/custom-volume/BigMart Sales.csv")
)

In [0]:
df_csv.display()

### From JSON

In [0]:
# spark.read.json("/Volumes/workspace/default/custom-volume/drivers.json").display()

# df_json = (
#     spark.read.format("json")
#     .option("inferSchema", True)
#     .option("header", True)
#     .option("multiLine", False)
#     .load("dbfs:/Volumes/workspace/default/custom-volume/drivers.json")
# )

df_json = spark.read.format("json")\
                    .option("inferSchema", True)\
                    .option("header", True)\
                    .option("multiLine", False)\
                    .load("dbfs:/Volumes/workspace/default/custom-volume/drivers.json")


In [0]:
df_json.display()

## Transformations

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

### Schema - DDL and StructType()

We can use custom schema to transform the data types while loading them.

#### Print Schema

In [0]:
df_csv.printSchema()

#### DDL Schema


In [0]:
ddl_schema = '''
    Item_Identifier STRING,
    Item_Weight STRING,
    Item_Fat_Content STRING,
    Item_Visibility DOUBLE,
    Item_Type STRING,
    Item_MRP DOUBLE,
    Outlet_Identifier STRING,
    Outlet_Establishment_Year INTEGER,
    Outlet_Size STRING,
    Outlet_Location_Type STRING,
    Outlet_Type STRING,
    Item_Outlet_Sales DOUBLE
'''

##### Update Schema

In [0]:
spark.read.format("csv")\
    .schema(ddl_schema)\
    .option("header", True)\
    .load("dbfs:/Volumes/workspace/default/custom-volume/BigMart Sales.csv")\
    .display()

#### StructType() Schema

In [0]:
struct_schema = StructType([
    StructField('Item_Identifier', StringType(), True),
    StructField('Item_Weight', StringType(), True),
    StructField('Item_Fat_Content', StringType(), True),
    StructField('Item_Visibility', DoubleType(), True),
    StructField('Item_Type', StringType(), True),
    StructField('Item_MRP', DoubleType(), True),
    StructField('Outlet_Identifier', StringType(), True),
    StructField('Outlet_Establishment_Year', IntegerType(), True),
    StructField('Outlet_Size', StringType(), True),
    StructField('Outlet_Location_Type', StringType(), True),
    StructField('Outlet_Type', StringType(), True),
    StructField('Item_Outlet_Sales', DoubleType(), True)
])

##### Update Schema

In [0]:
spark.read.format("csv")\
    .schema(struct_schema)\
    .option("header", True)\
    .load("dbfs:/Volumes/workspace/default/custom-volume/BigMart Sales.csv")\
    .display()

### Select

#### Show Original Data Frame

In [0]:
df_csv.display()

#### Select few columns

##### Method 1

In [0]:
df_csv.select("Item_Identifier", "Item_Weight", "Item_Fat_Content").display()

##### Method 2

In [0]:
df_csv.select(
    col("Item_Identifier"), col("Item_Weight"), col("Item_Fat_Content")
).display()

### Alias

In [0]:
df_csv.select(col("Item_Identifier").alias("Item_Id")).display()

### FILTER/WHERE

#### Scenario - 1

In [0]:
df_csv.filter(col("Item_Fat_Content") == "Regular").display()

#### Scenario - 2 

In [0]:
df_csv.filter((col("Item_Type") == "Soft Drinks") & (col("Item_Weight") < 10)).display()

#### Scenario - 3

In [0]:
df_csv.filter(
    (col("Outlet_Location_Type").isin("Tier 1", "Tier 2"))
    & (col("Outlet_Size").isNull())
).display()

### withColumnRenamed

This is to rename columns at a data frame level

In [0]:
df_csv.withColumnRenamed("Item_Weight", "Item_Wt").display()

### withColumn
We use this to create a new column or modify an existing column

#### Scenarion - 1

In [0]:
df_csv.withColumn("New_Column_Name", lit("New Column Value")).display()

#### Scenario - 2  
If we give a new column name then it will add a new column

In [0]:
df_csv.withColumn("Total_Sales", col("Item_MRP") * col("Item_Outlet_Sales")).display()

#### Scenario - 4
This will update the content in the exisitng column, so if we use the same name it will replace the content

In [0]:
df_csv.withColumn(
    "Item_Fat_Content", regexp_replace(col("Item_Fat_Content"), "Regular", "Reg")
).withColumn(
    "Item_Fat_Content", regexp_replace(col("Item_Fat_Content"), "Low Fat", "LF")
).display()

### Type Casting

In [0]:
df_csv.withColumn("Item_Weight", col("Item_Weight").cast(StringType())).display()

### Sort/Order By

#### Scenario - 1

In [0]:
df_csv.sort(col("Item_Weight").asc()).display()

#### Scenario - 2

In [0]:
df_csv.sort(col("Item_Visibility").desc()).display()

#### Scenario - 3

In [0]:
df_csv.sort(["Outlet_Establishment_Year", "Item_MRP"], ascending = [0, 1]).display()

### Limit

In [0]:
df_csv.limit(10).display()

### Drop

#### Scenario - 1

In [0]:
df_csv.drop(col("Item_Weight")).display()

#### Scenario - 2

In [0]:
df_csv.drop(col("Item_Identifier"), col("Item_Weight")).display()

### Drop Duplicates

also knows as `dedup` 

#### Scenario - 1
Remove all duplicates

In [0]:
df_csv.dropDuplicates().display()

# df_csv.distinct().display()

#### Scenario - 2

In [0]:
df_csv.dropDuplicates(["Item_Type", "Outlet_Type"]).display()

### Union and Union By Name

#### Preparing DataFrame

In [0]:
data1 = [('kad', '1'),
        ('sid', '2')]
schema1 = 'name STRING, id STRING' 

df1 = spark.createDataFrame(data1,schema1)
df1.display()

In [0]:
data2 = [('3','rahul'),
        ('4','jas')]
schema2 = 'id STRING, name STRING' 

df2 = spark.createDataFrame(data2,schema2)
df2.display()

#### Union

In [0]:
df1.union(df2).display()

#### Union By Name

In [0]:
df1.unionByName(df2).display()

### String Functions

#### INITCAP()

In [0]:
df_csv.select(initcap("Item_Type").alias("Item_Type")).display()

#### UPPER

In [0]:
df_csv.select(upper("Item_Type").alias("ITEM_TYPE")).display()

#### LOWER

In [0]:
df_csv.select(lower("Item_Type").alias("item_type")).display()

### Date Functions

#### CURRENT_DATE()

In [0]:
df_date = df_csv.withColumn("Curr_Date", current_date())
df_date.display()

#### DATE_ADD()

In [0]:
df_date = df_date.withColumn("Week_After", date_add("Curr_Date", 7))
df_date.display()

#### DATE_SUB()

In [0]:
df_date = df_date.withColumn("Week_Before", date_sub("Curr_Date", 7))
df_date.display()

# df_date.withColumn("Week_Before", date_add("Curr_Date", -7)).display()

#### DATEDIFF()

In [0]:
df_date = df_date.withColumn("Date_Diff", datediff("Week_After", "Week_Before"))
df_date.display()

#### DATE_FORMAT()

In [0]:
df_date = df_date.withColumn("Week_After", date_format("Week_After", "dd-MM-yyyy"))
df_date.display()

### Handling Nulls

#### Dropping Nulls

##### Drop all records when there is null in all the columns

In [0]:
df_csv.dropna("all").display()

##### Drop all records when any column is null

In [0]:
df_csv.dropna("any").display()

##### Drop records where a subset of columns are null

In [0]:
df_csv.dropna(subset=["Outlet_Size"]).display()

#### Filling Nulls

##### Replace any null values in any column / Replace all null values

In [0]:
df_csv.fillna("NA").display()

##### Replace null values in a subset of columns

In [0]:
df_csv.fillna(subset=["Outlet_Size"], value="NA").display()

### Split and Indexing

##### Split
This will split the column value based on delimeter and save an array

In [0]:
df_csv.withColumn("Outlet_Type", split("Outlet_Type", " ")).display()

##### Indexing
This will return the index of the array after the split

In [0]:
df_csv.withColumn("Outlet_Type", split("Outlet_Type", " ")[1]).display()

### Explode
This is to explode the array items into seperate rows

In [0]:
df_split = df_csv.withColumn("Outlet_Type", split("Outlet_Type", " "))
df_split.withColumn("Outlet_Type", explode("Outlet_Type")).display()

### Array Contains

In [0]:
df_split.withColumn("Type1_Flag", array_contains("Outlet_Type", "Type1")).display()

### Group By

#### Scenario - 1

In [0]:
df_csv.groupBy("Item_Type").agg(sum("Item_MRP")).display()

#### Scenario - 2

In [0]:
df_csv.groupBy("Item_Type").agg(avg("Item_MRP")).display()

#### Scenario - 3

In [0]:
df_csv.groupBy("Item_Type", "Outlet_Size").agg(sum("Item_MRP").alias("Total_MRP")).display()

#### Scenario - 4

In [0]:
df_csv.groupBy("Item_Type", "Outlet_Size").agg(
    sum("Item_MRP").alias("Total_MRP"), avg("Item_MRP").alias("Avg_MRP")
).display()

### Collect List

#### Preparing Data Frame

In [0]:
data = [
    ("user1", "book1"),
    ("user1", "book2"),
    ("user2", "book2"),
    ("user2", "book4"),
    ("user3", "book1"),
]

schema = "user string, book string"

df_collect = spark.createDataFrame(data, schema)

df_collect.display()

#### COLLECT_LIST

In [0]:
df_collect.groupBy("user").agg(collect_list("book").alias("books")).display()

### Pivot

In [0]:
df_csv.groupBy("Item_Type").pivot("Outlet_Size").agg(avg("Item_MRP").alias("Avg_Item_MRP")).display()

### When Otherwise
Used for creating conditional columns

#### Scenario - 1

In [0]:
df_temp = df_csv.withColumn("Veg_Flag", when(col("Item_Type") == "Meat", "Non-Veg").otherwise("Veg"))
df_temp.display()

#### Scenario - 2

In [0]:
df_temp.withColumn(
    "Veg_Expensive_Flag",
    when(((col("Veg_Flag") == "Veg") & (col("Item_MRP") < 100)), "Inexpensive Veg")
    .when(((col("Veg_Flag") == "Veg") & (col("Item_MRP") > 100)), "Expensive Veg")
    .otherwise("Non-Veg"),
).display()

### Joins

#### Preparing Data Frame

In [0]:
dataj1 = [
    ("1", "gaur", "d01"),
    ("2", "kit", "d02"),
    ("3", "sam", "d03"),
    ("4", "tim", "d03"),
    ("5", "aman", "d05"),
    ("6", "nad", "d06"),
]

schemaj1 = "emp_id STRING, emp_name STRING, dept_id STRING"

df1 = spark.createDataFrame(dataj1, schemaj1)

df1.display()

In [0]:
dataj2 = [
    ("d01", "HR"),
    ("d02", "Marketing"),
    ("d03", "Accounts"),
    ("d04", "IT"),
    ("d05", "Finance"),
]

schemaj2 = "dept_id STRING, department STRING"

df2 = spark.createDataFrame(dataj2, schemaj2)
df2.display()

#### INNER JOIN

In [0]:
df1.join(df2, df1["dept_id"] == df2["dept_id"], "inner").display()

#### LEFT JOIN

In [0]:
df1.join(df2, df1["dept_id"] == df2["dept_id"], "left").display()

#### RIGHT JOIN

In [0]:
df1.join(df2, df1["dept_id"] == df2["dept_id"], "right").display()

#### ANTI JOIN
Get records only available in data frame 1

In [0]:
df1.join(df2, df1["dept_id"] == df2["dept_id"], "anti").display()

## Window Functions

In [0]:
from pyspark.sql.window import Window

### ROW_NUMBER()

In [0]:
df_csv.withColumn("Row_Col", row_number().over(Window.orderBy("Item_Identifier"))).display()

### RANK()
It assigns the same rank to rows that have identical values in the ordering columns within a window partition.
When ties occur, it leaves gaps in the ranking sequence. For example, if two rows tie for rank 1, the next distinct rank would be 3 (skipping rank 2).

In [0]:
# df_csv.withColumn("Row_Rank_Col", rank().over(Window.orderBy("Item_Identifier"))).display()
df_csv.withColumn("Row_Rank_Col", rank().over(Window.orderBy(col("Item_Identifier").desc()))).display()
# df_csv.withColumn("Row_Dense_Rank_Col", dense_rank().over(Window.orderBy("Item_Identifier")).display()

### DENSE_RANK()
It assigns the same rank to rows with identical values in the ordering columns within a window partition.
It does not leave gaps in the ranking sequence when ties occur. The ranks are consecutive. If two rows tie for rank 1, the next distinct rank would be 2.

In [0]:
df_csv.withColumn(
    "Row_Rank_Col", rank().over(Window.orderBy(col("Item_Identifier").desc()))
).withColumn(
    "Row_Dense_Rank_Col",
    dense_rank().over(Window.orderBy(col("Item_Identifier").desc())),
).display()

### Cumulative Sum

In [0]:
df_csv.withColumn(
    "CumSum",
    sum("Item_MRP").over(
        Window.orderBy("Item_Type").rowsBetween(
            Window.unboundedPreceding, Window.currentRow
        )
    ),
).display()

## User Defined Function (UDF)

### Step - 1

In [0]:
def my_fun(x):
  return x * x

### Step - 2

In [0]:
my_udf = udf(my_fun)

### Step - 3

In [0]:
df_csv.withColumn('mynewcol',my_udf('Item_MRP')).display()

## Data Writing

### CSV

In [0]:
df_csv.write.format("csv").save("dbfs:/Volumes/workspace/default/custom-volume/write_data.csv")

### Data Writing Modes

#### APPEND

In [0]:
df_csv.write.format("csv").mode("append").save("dbfs:/Volumes/workspace/default/custom-volume/write_data.csv")

#### OVERWRITE

In [0]:
df_csv.write.format("csv").mode("overwrite").save("dbfs:/Volumes/workspace/default/custom-volume/write_data.csv")

#### ERROR

In [0]:
df_csv.write.format("csv").mode("error").save("dbfs:/Volumes/workspace/default/custom-volume/write_data.csv")

#### IGNORE

In [0]:
df_csv.write.format("csv").mode("ignore").option(
    "path", "dbfs:/Volumes/workspace/default/custom-volume/write_data.csv"
).save()

### PARQUET

In [0]:
df_csv.write.format("parquet").mode("overwrite").option(
    "path", "dbfs:/Volumes/workspace/default/custom-volume/write_parquet_data.csv"
).save()

### TABLE

In [0]:
df_csv.write.format("parquet").mode("overwrite").saveAsTable("my_table")

### SPARK SQL

#### createTempView

In [0]:
df_csv.createTempView('my_view')

In [0]:
%sql
select * from my_view where Item_Fat_Content = 'Lf'

In [0]:
df_sql = spark.sql("select * from my_view where Item_Fat_Content = 'Lf'")
df_sql.display()