### 1. DATA READING

#### Data Reading for CSV file

In [0]:
dbutils.fs.ls('/FileStore/tables/')

In [0]:
df_csv = spark.read.format('csv').option('inferschema', True). option('header', True).load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df_csv.display()

#### Data Reading for Json file

In [0]:
dbutils.fs.ls('/FileStore/tables')

In [0]:
df_json = spark.read.format('json').option('inferschema', True).option('header', True).option('multiline', False).load('/FileStore/tables/drivers.json')

In [0]:
df_json.display()

### 2. SCHEMA DEFINATION

In [0]:
df_csv.printSchema()

#### DDL Schema

In [0]:
my_ddl_schema = '''
                    Item_Identifier string,
                    Item_Weight string,
                    Item_Fat_Content string,
                    Item_Visibility double,
                    Item_Type string,
                    Item_MRP double,
                    Outlet_Identifier string,
                    Outlet_Establishment_Year integer,
                    Outlet_Size string,
                    Outlet_Location_Type string,
                    Outlet_Type string,
                    Item_Outlet_Sales double
                ''' 

In [0]:
df_ddl = spark.read.format('csv')\
        .schema(my_ddl_schema)\
        .option('header', True)\
        .load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df_ddl.printSchema()

#### StructType() Schema

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
my_struct_schema = StructType([
                        StructField('Item_Identifier', StringType(), True),
                        StructField('Item_Weight', StringType(), True),
                        StructField('Item_Fat_Content', StringType(), True),
                        StructField('Item_Visibility', StringType(), True), StructField('Item_Type', StringType(), True),
                        StructField('Item_MRP', StringType(), True),
                        StructField('Outlet_Identifier', StringType(), True),StructField('Outlet_Establishment_Year', StringType(), True),
                        StructField('Outlet_Size', StringType(), True),
                        StructField('Outlet_Location_Type', StringType(), True),StructField('Outlet_Type', StringType(), True),
                        StructField('Item_Outlet_Sales', StringType(), True)
                     ])

In [0]:
df_struct = spark.read.format('csv')\
            .schema(my_struct_schema)\
            .option('header', True)\
            .load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df_struct.printSchema()

### 3. SELECT

In [0]:
df_csv.select(col('Item_Identifier'), col('Item_Weight'), col('Item_Fat_Content')).display()

#### Alias

In [0]:
df_csv.select(col('Item_Identifier').alias('Item_Id')).display()

### 4. FILTER/WHERE

#### 1. Scenario

In [0]:
df_csv.filter(col('Item_Fat_Content') == 'Regular').display()

#### 2. Scenario

In [0]:
df_csv.filter((col('Item_Type') == 'Soft Drinks') & (col('Item_Weight') < 10)).display()

#### 3. Scenario

In [0]:
df_csv.filter((col('Outlet_Location_Type').isin('Tier 1', 'Tier 2')) & (col('Outlet_size').isNull())).display()

### 5. WITHCOLUMN

#### 1. withColumn Renamed

In [0]:
df_csv.withColumnRenamed('Item_Weight', 'Item_Wt').display()

#### 2. withColumn

##### 1. Scenario - Create a new column with same value

In [0]:
df = df_csv.withColumn('flag', lit('new'))

In [0]:
df.display()

##### 2. Scenario - Create new column and perform aggregative function

In [0]:
df_csv.withColumn('multiply', col('Item_weight') * col('Item_MRP')).display()

##### 3. Scenario - When you need to replace the content in a column

In [0]:
df_csv.withColumn('Item_Fat_Content', regexp_replace(col('Item_Fat_Content'),'Regular', 'Reg'))\
    .withColumn('Item_Fat_Content', regexp_replace(col('Item_Fat_Content'),'Low Fat', 'Lf'))\
    .display()

#### 3. Type Casting

In [0]:
df = df_csv.withColumn('Item_Weight', col('Item_Weight').cast(StringType()))

In [0]:
df.printSchema()

### 6. SORT/ORDER BY

#### 1. Scenario

In [0]:
df_csv.sort(col('Item_Weight').desc()).display()

#### 2. Scenario

In [0]:
df_csv.sort(col('Item_Visibility').asc()).display()

#### 3. Scenario

In [0]:
df_csv.sort(['Item_Weight', 'Item_Visibility'], ascending = [0,0]).display()

#### 4. Scenario - Multiple Column Sorting

In [0]:
df_csv.sort(['Item_Weight', 'Item_Visibility'], ascending = [0, 1]). display()

### 7. LIMIT

In [0]:
df_csv.limit(10).display()

### 8. DROP

#### 1. Scenario

In [0]:
df_csv.drop(col('Item_Visibility')).display()

#### 2. Scenario - Multiple Column Drop

In [0]:
df_csv.drop('Item_Weight', 'Item_Visibility').display()

#### 3. Drop Duplicate

In [0]:
df_csv.dropDuplicates().display()

#### 4. Drop Duplicate from column using subset

In [0]:
df_csv.drop_duplicates(subset = ['Item_Type']).display()

#### 5. Distinct

In [0]:
df_csv.distinct().display()

### 9. UNION

#### Preparing Dataframe

In [0]:
data1 = (['1', 'Ram'],
         ['2', 'Sam'])
schema1 = 'id STRING, name STRING'

df1 = spark.createDataFrame(data1, schema1)

data2 = (['3', 'Rom'],
         ['4', 'Jas'])
schema2 = 'id STRING, name STRING'

df2 = spark.createDataFrame(data2, schema2)

In [0]:
df1.display()

In [0]:
df2.display()

#### 1. Union

In [0]:
df1.union(df2).display()

#### 2. Union ByName

In [0]:
data1 = (['Ram', '1'],
         ['Sam', '2'])
schema1 = 'name STRING, id STRING'

df1 = spark.createDataFrame(data1, schema1)

df1.display()

In [0]:
df1.unionByName(df2).display()

### 10. STRING FUNCTIONS

#### 1. Scenario - initcap()

In [0]:
df_csv.select(initcap('Item_Type')).display()

#### 2. Scenario - lower()

In [0]:
df_csv.select(lower('Item_Type').alias('item_type')).display()

#### 3. Scenario - upper()

In [0]:
df_csv.select(upper('Item_Type').alias('ITEM_TYPE')).display()

### 11. DATE FUNTIONS

#### 1. Scenario - current date()

In [0]:
df_csv = df_csv.withColumn('cur_date', current_date())

df_csv.display()

#### 2. Scenario - date_add()

In [0]:
df_csv = df_csv.withColumn('week_after', date_add('cur_date', 7))

df_csv.display()

#### 3. Scenario - date_sub()

In [0]:
df_csv = df_csv.withColumn('week_before', date_sub('cur_date', 7))

df_csv.display()

In [0]:
df_csv = df_csv.withColumn('week_before', date_add('cur_date', -7))

df_csv.display()

#### 4.Scenario - datediff()

In [0]:
df_csv = df_csv.withColumn('date_dif', datediff('week_after', 'cur_date'))

df_csv.display()

#### 5. Scenario - date_format

In [0]:
df_csv.withColumn('week_after', date_format('week_after', 'dd-MM-yyyy'))\
      .withColumn('week_before', date_format('week_before', 'dd-MM-yyyy'))\
      .withColumn('cur_date', date_format('cur_date', 'dd-MM-yyyy'))\
      .display()

### 12. HANDLING NULLS

#### 1. Scenario - dropna('all')

In [0]:
df_csv.dropna('all').display()

#### 2. Scenario - dropna('any')

In [0]:
df_csv.dropna('any').display()

#### 3. Scenario - dropna by using subset

In [0]:
df_csv.dropna(subset = 'outlet_size').display()

#### 4. Scenario - fillna() - interger value

In [0]:
df_csv.fillna(0, subset = 'Item_Weight').display()

#### 5. Scenario - fillna() - string value

In [0]:
df_csv.fillna('Not Applivable').display()

### 13. SPLIT AND INDEXING

#### 1. Split

In [0]:
df_csv.withColumn('Outlet_Type', split('Outlet_Type', ' ')).display()

#### 2. Indexing

In [0]:
df_csv.withColumn('Outlet_Type', split('Outlet_Type', ' ')[1]).display()

#### 3.Explode

In [0]:
df_exp = df_csv.withColumn('Outlet_Type', split('Outlet_Type', ' '))

df_exp.display()

In [0]:
df_exp.withColumn('Outlet_Type', explode('Outlet_Type')).display()

#### 4. Array Contains

In [0]:
df_exp.withColumn('flag', array_contains('Outlet_Type', 'Type1')).display()

### 14. GROUP BY

#### 1. Scenario - Group by and use addition

In [0]:
df_csv.groupBy('Item_Type').agg(sum('Item_MRP')).display()

#### 2. Scenario - Average

In [0]:
df_csv.groupBy('Item_Type').agg(avg('Item_MRP')).display()

#### 3. Scenario

In [0]:
df_csv.groupBy('Item_Type', 'Outlet_Size').agg(sum('Item_MRP').alias('Total'))\
      .sort(['Item_Type', 'Outlet_Size'], ascending = [0, 0]).display()

#### 4. Scenario

In [0]:
df_csv.groupBy('Item_Type', 'Outlet_Size').agg(sum('Item_MRP'). alias('Total'), avg('Item_MRP').alias('Average')).display()

#### 5. Collect List

In [0]:
data = (['user1', 'book1'],
        ['user1', 'book2'],
        ['user2', 'book2'],
        ['user2', 'book4'],
        ['user3', 'book3'],
        ['user4', 'book4'])
schema = 'user STRING, book STRING'

df_book = spark.createDataFrame(data, schema)

df_book.display()

In [0]:
df_book.groupBy('user').agg(collect_list('book')).display()

### 15. PIVOT

In [0]:
df_csv.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).display()

### 16. WHEN-OTHERWISE

#### 1. Scenario

In [0]:
df_csv = df_csv.withColumn('Grocery_Type',when(col('Item_Type')=='Meat', 'Non-Veg').otherwise('Veg'))

df_csv.display()

#### 2. Scenario

In [0]:
df_csv.withColumn('Cost', when((col('Grocery_Type')=='Veg') & (col('Item_MRP')<100),'Expensive')\
    .when((col('Grocery_Type')=='Veg') & (col('Item_MRP')>100),'In-Expensive')\
    .otherwise('Non-Veg')).display()

### 17. JOINS

In [0]:
dataj1 = (['1','Gaur','d01'],
          ['2','Kit','d02'],
          ['3','Sam','d03'],
          ['4','Tim','d03'],
          ['5','Aman','d05'],
          ['6','Nad', 'd06'])

schemaj1 = 'id STRING, name STRING, dept_id STRING'

dfj1 = spark.createDataFrame(dataj1, schemaj1)

dataj2 = (['d01', 'HR'],
          ['d02', 'Marketing'],
          ['d03', 'Accounts'],
          ['d04', 'IT'],
          ['d05', 'Finance'])

schemaj2 = 'dept_id STRING, department STRING'

dfj2 = spark.createDataFrame(dataj2, schemaj2)

In [0]:
dfj1.display()

In [0]:
dfj2.display()

#### 1. Inner Join

In [0]:
dfj1.join(dfj2, dfj1['dept_id'] == dfj2['dept_id'],'inner').display()

#### 2. Left Join

In [0]:
dfj1.join(dfj2, dfj1['dept_id']==dfj2['dept_id'], 'left').display()

#### 3. Right Join

In [0]:
dfj2.join(dfj1, dfj2['dept_id']==dfj1['dept_id'], 'right').display()

#### 4. Anti Join - Provide all those data which are not matching

In [0]:
dfj1.join(dfj2, dfj1['dept_id']==dfj2['dept_id'], 'anti').display()

In [0]:
from pyspark.sql.window import Window

### 18. WINDOW FUNCTIONS

#### 1. Row Number()

In [0]:
df_csv.withColumn('row_col', row_number().over(Window.orderBy('Item_Identifier'))).display()

#### 2. Rank()

In [0]:
df_csv.withColumn('rank', rank().over(Window.orderBy('Item_Identifier'))).display()

#### 3. Dense Rank()

In [0]:
df_csv.withColumn('dense_rank', dense_rank().over(Window.orderBy(col('Item_Identifier').desc()))).display()

#### 4. Cumulative SUM

##### 1. Scenario

In [0]:
df_csv.withColumn('Sum', sum('Item_MRP').over(Window.orderBy('Item_Type'))).display()

##### 2. Scenario

In [0]:
df_csv.withColumn('cum_sum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()

##### 3. Scenario

In [0]:
df_csv.withColumn('Total_sum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).display()

### 19. USER DEFINED FUNCTIONS (UDF)

#### Step 1

In [0]:
def my_fun(x):
    return x*x

#### Step 2

In [0]:
my_udf = udf(my_fun)

#### Step 3

In [0]:
df_csv.withColumn('UDF_Col', my_udf('Item_MRP')).display()

### 20. DATA WRITING

#### 1. CSV

In [0]:
df_csv.write.format('csv')\
    .mode('overwrite')\
    .save('/FileStore/tables/CSV/data.csv')

#### 2. Parquet - Column based

In [0]:
df_csv.write.format('parquet')\
  .mode('overwrite')\
  .save('/FileStore/tables/CSV/data.csv')

#### MODES

#### A. Append

In [0]:
df_csv.write.format('csv')\
    .mode('append')\
    .save('/FileStore/tables/CSV/data.csv')

In [0]:
df_csv.write.format('csv')\
    .mode('append')\
    .option('path','/FileStore/tables/CSV/data.csv')\
    .save()

#### B. Overwrite

In [0]:
df_csv.write.format('csv')\
    .mode('overwrite')\
    .save('/FileStore/tables/CSV/data.csv')

#### C. Error

In [0]:
df_csv.write.format('csv')\
    .mode('error')\
    .save('/FileStore/tables/CSV/data.csv')

#### D. Ignore

In [0]:
df_csv.write.format('csv')\
    .mode('ignore')\
    .save('/FileStore/tables/CSV/data.csv')

#### 3. Table

In [0]:
df_csv.write.format('csv')\
    .mode('overwrite')\
    .saveAsTable('my_table')

### 21. SPARK SQL - Temp View

#### CreateTempView

In [0]:
df_csv.createTempView('my_view')

In [0]:
%sql

select * from my_view

In [0]:
%sql
select * 
from my_view
where Item_Fat_Content = 'Low Fat'

In [0]:
df_sql = spark.sql("select * from my_view where Item_Fat_Content = 'Low Fat'")

In [0]:
df_sql.display()

In [0]:
dbutils.fs.ls('/FileStore/tables/')

In [0]:
dbutils.fs.rm('/FileStore/tables/BigMart_Sales-4.csv', True)