## Data Reading

### Reading csv files

In [0]:
df_sales = spark.read.format("csv") \
  .option("header", "true") \
    .option("inferSchema", "true") \
      .load("/Volumes/catalog-dbk/bronze/bronze-volume-raw/BigMart Sales.csv")

In [0]:
df_sales.display()

In [0]:
df_sales.printSchema()

### Reading Json files

In [0]:
df_movie_json = spark.read \
    .option("multiLine", "true") \
    .json("/Volumes/catalog-dbk/bronze/bronze-volume-raw/large-file.json")



In [0]:
l1 = dbutils.fs.ls("/Volumes/catalog-dbk/bronze/bronze-volume-raw")
for i in l1:
    print(i.name)

In [0]:

df_json = spark.read.format('json') \
    .option('header', True) \
        .option("inferSchema", True) \
            .option("multiline", False) \
                .load("/Volumes/catalog-dbk/bronze/bronze-volume-raw/drivers.json")


### Explore data

In [0]:
df_sales.display()

### Schema definition

In [0]:
df_sales.printSchema()

## DDL Schema

In [0]:
my_ddl_schema = """
                Item_Identifier STRING,
                Item_Weight STRING,
                Item_Fat_Content STRING,
                Item_Visibility DOUBLE,
                Item_Type STRING,
                Item_MRP DOUBLE,
                Outlet_Identifier STRING,
                Outlet_Establishment_Year INT,
                Outlet_Size STRING,
                Outlet_Location_Type STRING,
                Outlet_Type STRING,
                Item_Outlet_Sales DOUBLE
"""




In [0]:
df = spark.read.format("csv") \
    .schema(my_ddl_schema) \
        .option("header", True) \
            .load("/Volumes/catalog-dbk/bronze/bronze-volume-raw/BigMart Sales.csv")
df.display()
df.printSchema()

## StructType and StructFiled

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:

my_struct_schema = StructType([
                                StructField("Item_Identifier", StringType(), True),
                                StructField("Item_Weight", StringType(), True),
                                StructField("Item_Fat_Content", StringType(), True),
                                StructField("Item_Visibility", DoubleType(), True),
                                StructField("Item_Type", StringType(), True),
                                StructField("Item_MRP", DoubleType(), True),
                                StructField("Outlet_Identifier", StringType(), True),
                                StructField("Outlet_Establishment_Year", IntegerType(), True),
                                StructField("Outlet_Size", StringType(), True),
                                StructField("Outlet_Location_Type", StringType(), True),
                                StructField("Outlet_Type", StringType(), True),
                                StructField("Item_Outlet_Sales", DoubleType(), True)
])


In [0]:
df = spark.read.format("csv") \
    .schema(my_struct_schema) \
        .option("header", True) \
            .load("/Volumes/catalog-dbk/bronze/bronze-volume-raw/BigMart Sales.csv")
df.display()
df.printSchema()

## Select columns

In [0]:
# one way of select the columns
df_sel = df_sales.select("Item_Identifier","Item_Type","Item_Fat_Content")

In [0]:
df_sales.select(col("Item_Identifier"), col("Item_Type"), col("Item_Fat_Content")).display()


## ALIAS

In [0]:
#df_sales.select(col("Item_Identifier").alias("Item_ID")).display()

df_sales.select(col("Item_Identifier").alias("Item_id"), col("Outlet_Establishment_Year").alias("Item_Esta_Year")).display()

### FILTER/WHERE Transformations

###1.filter data with fact content = Regular

In [0]:

df_sales.filter(col("Item_Fat_Content")== 'Regular').display()


###2.Slice the data with item type = soft Drinks and weight < 10

In [0]:
# item type = soft Drinks
# weight < 10
df_sales.select(col("Item_Type"), col("Item_Weight")).display()



In [0]:
df_sales.select("Item_Type","Item_Fat_Content","Item_Identifier","Item_Weight").filter((col("Item_Type") == "Soft Drinks") & (col("Item_Weight") < 10)).display()

###3.Fetch the data with Tier in (Tier1 or Tier 2) and outlet Size Null


In [0]:
# Outlet_Location_Type in Tier 1 or Tier 2
# Outlet_Size  = Null


df_sales.filter((col("Outlet_Location_Type").isin("Tier 1" , "Tier 2")) & (col("Outlet_Size").isNull())).display()

## withColumnRenamed

In [0]:
df_sales.withColumnRenamed("Item_Weight","Item_wgt") \
        .withColumnRenamed("Item_Identifier","Item_ID").display()

##withColumn
### 1. new colum
### 2. modify the existing column data

In [0]:
df_sales = df_sales.withColumn("Flag",lit("New"))

In [0]:
df_sales.display()

In [0]:
df_sales = df_sales.withColumn("multiply", col("Item_Weight")* col("Item_MRP"))

In [0]:
df_sales.display()

###Change the content of the column
### regexp_replace

In [0]:
df_sales=  df_sales.withColumn("Item_Fat_Content", regexp_replace(col("Item_Fat_Content"),"Regular","Reg")) \
    .withColumn("Item_Fat_Content", regexp_replace(col("Item_Fat_Content"),"Low Fat", "LF"))

## Type Casting

In [0]:
df_sales.printSchema()

In [0]:
df_sales = df_sales.withColumn("Item_weight", col("Item_Weight").cast(StringType()))

In [0]:
df_sales.printSchema()

# sort/orderBy

asc order or desc order

In [0]:
df_sales.sort(col("Item_weight").desc()).display()

In [0]:
df_sales.sort(col("Item_Visibility").asc()).display()

In [0]:
df_sales.sort(["Item_weight","Item_Visibility"], ascending = [0,1]).display()

## Limit

In [0]:
df_sales.limit(10).display()

# DROP - COLUMNS

In [0]:
df_sales.drop(col("Flag")).display()

In [0]:
df_sales.drop("Item_Visibility","Item_Type").display()

## Drop-Duplicates

###remove the same columns
###to remove the duplicates in the specific column level

In [0]:
df_sales.dropDuplicates().display()

In [0]:
df_sales.distinct().display()

In [0]:
df_sales.dropDuplicates(subset = ["Item_Type"]).display()

## UNION and UNION BYNAME

PREPARING DataFrames

In [0]:
# -------- DataFrame A --------
data_a = [
    (1, "Alice"),
    (2, "Bob")
]

schema_a = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
])

df_a = spark.createDataFrame(data_a, schema_a)

# -------- DataFrame B --------
data_b = [
    (3, "Charlie"),
    (4, "David")
]

schema_b = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
])

df_b = spark.createDataFrame(data_b, schema_b)

data_c = [
    ("Charlie", 3),
    ("David", 4)
]

schema_c = StructType([
    StructField("name", StringType(), True),
    StructField("id", IntegerType(), True)
])

df_c = spark.createDataFrame(data_c, schema_c)



In [0]:
df_a.display()
df_b.display()

In [0]:
df_a.union(df_b).display()

##unionByName

In [0]:
df_a.unionByName(df_c).display()

# String Functions

## INITCAP()
## UPPPER()
## LOWER()

In [0]:
df_sales.select("Item_Type").display()

initcap()

In [0]:
df_sales.select(initcap("Item_Type")).display()

In [0]:
df_sales.select(lower("Item_Type")).display()

In [0]:
df_sales.select(upper("Item_Type")).display()

In [0]:

from pyspark.sql.functions import initcap,lower,upper

df_sales.select(initcap("Item_Type").alias("upped_item_type")).display()


##DATE FUNCTIONS
    CURRENT_DATE()
    DATE_ADD()
    DATE_SUB()   :: INSTEAD WORKAROUND

In [0]:
df_sales = df_sales.withColumn("cur_date", current_date())

Date_add()

week after  (7) days 

In [0]:
df_sales=df_sales.withColumn("week_after",date_add("cur_date",7))

Date_sub()

In [0]:
df_sales=df_sales.withColumn("week_before", date_sub("cur_date",7))

In [0]:
df_sales = df_sales.withColumn("week_before", date_add("week_after",-7))

##DATE_DIFF



In [0]:
df_sales = df_sales.withColumn("datediff", date_diff("week_after","cur_date"))

#DATE_FORMAT()

In [0]:
df_sales = df_sales.withColumn("week_before", date_format("week_before","dd-MM-yyyy"))

##HANDLING NULLS

  * dropping nulls
  * filling nulls

###Dropping nulls

In [0]:
df_sales.dropna('all').display()

In [0]:
df_sales.dropna("any").display()

In [0]:
df_sales.dropna(subset = ["Item_weight"]).display()

## filling the null with names

In [0]:
df_sales.fillna("notavalable").display()

df_sales.fillna("Not available", subset = ["Outlet_Size"]).display()

## SPLIT AND INDEXING