### Data Reading in json

In [0]:
%python
dbutils.fs.ls('/FileStore/tables/')

In [0]:
%python
df_json = spark.read.format('json').option('inferSchema',True)\
                    .option('header',True)\
                    .option('multiLine',False)\
                    .load('/FileStore/tables/drivers-1.json')
df_json.display()

### Data Reading

In [0]:
%python
dbutils.fs.ls('/FileStore/tables')

In [0]:
%python
df = spark.read.format('csv').option('inferSchema',True).option('header',True).load('/FileStore/tables/BigMart_Sales.csv')
df.display()

### SCEMA - DDL(DEFINITION) and StructType

In [0]:
%python
df.printSchema()

In [0]:
%python
my_ddl_schema = '''
                Item_Identifier STRING,
                Item_Weight STRING,
                Item_Fat_Content STRING,
                Item_Visibility DOUBLE,
                Item_Type STRING,
                Item_MRP DOUBLE,
                Outlet_Identifier STRING,
                Outlet_Establishment_Year INT,
                Outlet_Size STRING,
                Outlet_Location_Type STRING,
                Outlet_Type STRING,
                Item_Outlet_Sales DOUBLE '''

In [0]:
%python
my_ddl_schema1 = '''
                Item_Identifier: string,
                Item_Weight: int, 
                Item_Fat_Content: int, 
                Item_Visibility: int, 
                Item_Type: string, 
                Item_MRP: double,
                Outlet_Identifier: string, 
                Outlet_Establishment_Year: integer, 
                Outlet_Size: string, 
                Outlet_Location_Type: string, 
                Outlet_Type: string, 
                Item_Outlet_Sales: double '''

In [0]:
df_new=spark.read.format('csv').schema(my_ddl_schema).option('header',True).load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df_new.display()

In [0]:
df_new.printSchema()

root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: string (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Visibility: double (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: double (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Establishment_Year: integer (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)
 |-- Item_Outlet_Sales: double (nullable = true)



### Second Method - struckt type schema

In [0]:
from pyspark.sql.types import*
from pyspark.sql.functions import*

In [0]:
my_struct_schema = StructType([
                                    StructField('Item_Identifier',StringType(),True)
])

In [0]:
df.select(col('Item_Identifier'),col('Item_Weight'),col('Item_Fat_Content')).display()
# df.select('Item_Identifier','Item_Weight','Item_Fat_Content').display()

ALIAS

In [0]:
df.select(col('Item_Identifier').alias('Item_Id')).display()

In [0]:
df.display()

### FILTER/WHERE
# 1-Filter the data 

In [0]:
df.filter(col('Item_Fat_Content')=='Regular').display()

### scenario

In [0]:
df.filter((col('Item_Type') == 'Soft Drinks') & (col('Item_Weight')<10)).display()

In [0]:
df.filter((col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1','Tier 2'))).display()

### withColumnRenamed

In [0]:
df.withColumnRenamed('Item_Weight','Item_Wt').display()

### withColumn
### 1. new col - giving col value with lit() function

In [0]:
df=df.withColumn('flag',lit('new'))

In [0]:
df.display()

#### 2. we are creating a new column based on transformation with withColumn() function

In [0]:
df.withColumn('multiply',col('Item_Weight')*col('Item_MRP')).display()

In [0]:
df.display()

### Scenario 2 - replacing values in a col with new values

In [0]:
df.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Regular","Reg"))\
.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),"Low Fat","LP")).display()

### Type casting

In [0]:
df=df.withColumn('Item_Weight',col('Item_Weight').cast(StringType()))

In [0]:
df.printSchema()

### Sort - scenario 1

In [0]:
df.sort(col('Item_Weight').desc()).display()

### Scenario 2 -

In [0]:
df.sort(col('Item_Visibility').asc()).display()

### Scenario 3 - sort based on mutiple col

In [0]:
df.sort(['Item_Weight','Item_Visibility'],ascending = [0,0]).display()

### Scenario 4 

In [0]:
df.sort(['Item_Weight','Item_Visibility'], ascending=[0,1]).display()