### Data Reading in Databricks


In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark DataFrames").getOrCreate()
spark

In [0]:
df=spark.read.csv("/Volumes/workspace/default/data/addresses.csv",header=True,inferSchema=True)
df2=spark.read.format('csv').option('inferSchema','true').load('/Volumes/workspace/default/data/addresses.csv')
df.show()
df2.display()
df.printSchema()


In [0]:
### Checking files
dbutils.fs.ls('/Volumes/workspace/default/data')

In [0]:
df.display()

### Reading JSON

In [0]:
df_json=spark.read.format('json').option('inferSchema',True)\
                  .option('header',True)\
                  .option('multiline',False)\
                  .load('/Volumes/workspace/default/data/drivers.json')

In [0]:
df_json.display()

### Schema Definition


In [0]:
df.printSchema()

In [0]:
df_new=spark.read.csv('/Volumes/workspace/default/data/BigMart Sales.csv',header=True,inferSchema=True)
df_new.display()
df_new.printSchema()
df_json.printSchema()

In [0]:
my_ddl_schema = '''
                   Item_Identifier string,
                   Item_Weight string,
                   Item_Fat_Content string,
                   Item_Visibility double,
                   Item_Type string,
                   Item_MRP double,
                   Outlet_Identifier string,
                   Outlet_Establishment_Year integer,
                   Outlet_Size string,
                   Outlet_Location_Type string,
                   Outlet_Type string,
                   Item_Outlet_Sales double
                '''
df_new2=spark.read.csv('/Volumes/workspace/default/data/BigMart Sales.csv',header=True,schema=my_ddl_schema)
df_new2.display()
df_new2.select('Item_Weight').show()

In [0]:
df_new2.printSchema()

### StructType() Schema

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
my_strct_schema=StructType([
    StructField('Item_Identifier',StringType(),True),
    StructField('Item_Weight',StringType(),True),
    StructField('Item_Fat_Content',StringType(),True),
    StructField('Item_Visibility',DoubleType(),True),
    StructField('Item_Type',StringType(),True),
    StructField('Item_MRP',DoubleType(),True),
    StructField('Outlet_Identifier',StringType(),True),
    StructField('Outlet_Establishment_Year',IntegerType(),True),
    StructField('Outlet_Size',StringType(),True),
    StructField('Outlet_Location_Type',StringType(),True),
    StructField('Outlet_Type',StringType(),True),
    StructField('Item_Outlet_Sales',DoubleType(),True)
])

In [0]:
df=spark.read.format('csv').option('header','true').schema(my_strct_schema).load('/Volumes/workspace/default/data/BigMart Sales.csv')

In [0]:
df.printSchema()

### Transformation in Pyspark (SELECT)

In [0]:
df_sel=df_new.select('Item_Identifier','Item_Weight','Item_Fat_Content')
df_sel.display()


### COL

In [0]:
df_sel=df_new.select(col('Item_Identifier'),col('Item_Weight'),col('Item_Fat_Content'))
df_sel.display()

### ALIAS -- renaming

In [0]:
df_new.describe()
df_new.select(col('Item_Identifier').alias('Item_ID')).display()

### FILTER/WHERE

In [0]:
df_new.display()

### Scenario - 1
#### Filter the Data with fat content = Regular

In [0]:
df_new.filter(col('Item_Fat_Content')=='Regular').display()

#### Scenario - 2
##### Slice the data with item type = Soft Drinks and weight < 10

In [0]:
df_new.filter((col('Item_Type') == 'Soft Drinks') & (col('Item_Weight') < 10)).display()

### Scenario - 3
#### Fetch the data with Tier in (Tier1 or Tier 2) and Outlet Size is Null

In [0]:
df_new.filter((col('Outlet_Location_Type') != 'Tier 3') & (col('Outlet_Size').isNull())).display()

In [0]:
df_new.filter((col('Outlet_Location_Type').isin('Tier 1','Tier 2')) & (col('Outlet_Size').isNull())).display()

## withColumnRenamed

In [0]:
df_new.withColumnRenamed('Item_Weight','Item_Wt').display()

### withColumn
#### Adding a new column (new_col) (modify_col)

In [0]:
df_new=df_new.withColumn('flag',lit("new"))

In [0]:
df_new.display()

In [0]:
df_new=df_new.withColumn('multiply',col('Item_Weight')*col('Item_MRP'))
df_new.display()

#### Modifying Column content

In [0]:
df_new=df_new.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Regular","Reg"))\
             .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Low Fat","LF"))\
             .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"low fat","LF"))
df_new.display()


### Type Casting

In [0]:
df_new=df_new.withColumn('Item_Weight',col('Item_Weight').cast('string'))
#df_new=df_new.withColumn('Item_Weight',col('Item_Weight').cast(stringType()))
df_new.display()

### Sort/ orderBy

### Scenario - 1

In [0]:
df_new.sort(col("Item_Weight").desc()).display()

In [0]:
df_new.filter(col('Item_Weight').isNotNull()).sort(col("Item_Weight").asc()).display()

### Ordering by two columns

In [0]:
df_new.sort(['Item_Weight','Item_Visibility'],ascending=[0,0]).display() ###ascending is force for both

In [0]:
df_new.sort(['Item_Weight','Item_Visibility'],ascending=[0,1]).display() ###ascending is force for Item_weight only

#### Limit
##### showing a few recors

In [0]:
df_new.limit(10).display()

### Drop

In [0]:
#### Single column
df_new.drop('Item_Visibility').display()
#### Multiple columns
df_new.drop('Item_Visibility','Item_Type').display()

### Dropping Duplicates

In [0]:
df_new.dropDuplicates().display()

In [0]:
df_new.drop_duplicates(subset=['Item_Type']).display()

In [0]:
df.distinct().display()

### UNION AND UNION BYNAME

In [0]:
data1=[('1','kad'),
       ('2','peter')
       ]
schema1='id STRING,name STRING'
df1=spark.createDataFrame(data1,schema1)
df1.display()
data2=[('3','john'),
       ('4','jean')
       ]
schema2='id STRING,name STRING'
df2=spark.createDataFrame(data2,schema2)
df2.display()

#### Union

In [0]:
df1.union(df2).display()

### Union by name

In [0]:
data1=[('kad','1'),
       ('peter','2')
       ]
schema1='name STRING,id STRING'
df1=spark.createDataFrame(data1,schema1)
df1.display()
data2=[('3','john'),
       ('4','jean')
       ]
schema2='id STRING,name STRING'
df2=spark.createDataFrame(data2,schema2)
df2.display()

In [0]:
### Applying union will mess up the date
df1.union(df2).display()

### use unionby name
df1.unionByName(df2).display()

### STRING FUNCTIONS 
#### INITCAP()
#### UPPER()
#### LOWER()

In [0]:
df_new.select(initcap('Item_Type').alias('Caps_Item_Type')).limit(5).display()
df_new.select(lower('Item_Type').alias('Lower_Item_Type')).limit(5).display()
df_new.select(upper('Item_Type').alias('Upper_Item_Type')).limit(5).display()

#### Date Functions
#### DATE_ADD()
#### DATE_SUB()
#### CURRENT_DATE()

In [0]:
from pyspark.sql.functions import current_date

# Check if 'curr_date' already exists
if 'curr_date' in df_new.columns:
    print("Column 'curr_date' already exists. Dropping it first.")
    df_new = df_new.drop('curr_date')

df_new = df_new.withColumn(
    'curr_date',
    current_date()
)

display(df_new.limit(5))

###date adding

In [0]:
df_new=df_new.withColumn('week_after',date_add('curr_date',7))
df_new.display()

### Date_Sub()

In [0]:
df_new.withColumn('week_before',date_sub('curr_date',7)).display()

In [0]:
df_new=df_new.withColumn('week_before',date_add('curr_date',-7))
df_new.display()

### DATEDIFF

In [0]:
df_new=df_new.withColumn('datediff',datediff('week_after','week_before'))
df_new.display()
df_new=df_new.withColumn('datediff',datediff('curr_date','week_before'))

### DATE_FORMAT


In [0]:
df_new=df_new.withColumn('week_before',date_format('week_before','dd-MM-yyyy'))
df_new.display()

### HANDLING NULLS
### dropping nulls
### filling nulll

In [0]:
df_new.dropna('all').display() ## Drop if it has nulls in all the columns

In [0]:
df.dropna('any').display() # Drop if it has any nulls

In [0]:
df_new.dropna(subset=["Outlet_Size"]).display() # drop if this column contains nulls

### Filling the nulls in columns

In [0]:
df_new.fillna('NotAvailable').display()

In [0]:
df_new.fillna('NotAvailable',subset=['Outlet_Size']).display()

### SPLIT AND INDEXING



#### Split Function

In [0]:
df_new.withColumn('Outlet_Type',split('Outlet_Type',' ')).display()

In [0]:
df_new.withColumn('Outlet_Type',split('Outlet_Type',' ')[1]).display()

### EXPLODE --> GIVING EACH SPLIT VALUE INTO ROWS

In [0]:
df_exp=df.withColumn('Outlet_Type',split('Outlet_Type',' '))
df_exp.display()

In [0]:
df_exp.withColumn('Outlet_Type',explode('Outlet_Type')).display()

In [0]:
%fs ls /databricks-datasets

### ARRAY_CONTAINS

In [0]:
## IF THE OUTLET TYPE CONTAINS TYPE1 RETURN YES
df_exp.limit(5).display()

In [0]:
df_exp.withColumn('Type1_flag',array_contains('Outlet_Type','Type1')).display()

### GROUP_BY

In [0]:
df.groupBy("Item_Type").agg(sum("Item_MRP")).display()

In [0]:
df.groupBy("Item_Type").agg(avg("Item_MRP")).display()

In [0]:
df.groupBy(["Item_Type","Outlet_Size"]).agg(sum("Item_MRP").alias('Total_MRP')).orderBy("Item_Type").display()

In [0]:
df.groupBy(["Item_Type","Outlet_Size"]).agg(sum("Item_MRP").alias('Total_MRP'),avg('Item_MRP').alias('Avg_MRP')).orderBy("Item_Type").display()

### COLLECT_LIST