### Data reading in json format

In [0]:
df_json = spark.read.format('json').option('inferschmea', True)\
                    .option('header', True)\
                    .option('multiline', False)\
                    .load('/FileStore/tables/drivers.json')

In [0]:
df_json.display()

### Data Reading

In [0]:
dbutils.fs.ls('/FileStore/tables')

In [0]:
df = spark.read.format('csv').option("inferschema",True).option("header", True).load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
# df.show() this will not give data in good manner

In [0]:
df.display()

### Schema Defination

In [0]:
df.printSchema()

### DDL Schema

In [0]:
my_ddl_schema = '''
                      Item_Identifier string,
                      Item_Weight string,
                      Item_Fat_Content string,
                      Item_Visibility double,
                      Item_Type string,
                      Item_MRP double,
                      Outlet_Identifier string,
                      Outlet_Establishment_Year integer,
                      Outlet_Size string,
                      Outlet_Location_Type string,
                      Outlet_Type string,
                      Item_Outlet_Sales double
                '''

In [0]:
df = spark.read.format('csv')\
          .schema(my_ddl_schema)\
          .option('header',True)\
          .load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df.display()

In [0]:
df.printSchema()

### StructType() Schema

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
my_strct_schema = StructType([
                                StructField('Item_Identifier', StringType(), True),
                                StructField('Item_Weight', StringType(), True),
                                StructField('Item_Fat_Content', StringType(), True),
                                StructField('Item_Visibility', StringType(), True),
                                StructField('Item_Type', StringType(), True),
                                StructField('Item_MRP', StringType(), True),
                                StructField('Outlet_Identifier', StringType(), True),
                                StructField('Outlet_Establishment_Year', StringType(), True),
                                StructField('Outlet_Size', StringType(), True),
                                StructField('Outlet_Location_Type', StringType(), True),
                                StructField('Outlet_Type', StringType(), True),
                                StructField('Item_Outlet_Sales', StringType(), True)
])

In [0]:
df = spark.read.format('csv')\
               .schema(my_strct_schema)\
               .option('header', True)\
               .load('/FileStore/tables/BigMart_Sales.csv')

In [0]:
df.printSchema()

### SELECT

In [0]:
df_select = df.select(col('Item_Identifier'), col('Item_Weight'),col('Item_Fat_Content')).display() # converted col into object

### ALIAS

In [0]:
df.select(col('Item_Identifier').alias('Item_ID')).display()

### FILTER / WHERE

##### Scenario 1 -> Filter data with fat content = Regular

In [0]:
df.filter(col('Item_Fat_Content') == 'Regular').display()

#### Scenario 2 -> slice the data with item type = soft drinks and weight < 10

In [0]:
df.filter((col('Item_Type') == 'Soft Drinks') & (col('Item_weight') < 10)).display()

#### Scenario 3 -> Fetch data with tier in (tier 1 or tier 2) and outlet size is null

In [0]:
df.filter((col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1', 'Tier 2'))).display()

### withColumnRenamed

In [0]:
df.withColumnRenamed('Item_Weight', 'Item_Wt').display()

# withColumn

### Scenario 1 -> creating col with val new

In [0]:
df.withColumn('FLAG',lit('new')).display()

### Scenario 1 -> create a col with value mutilplication of 2 col ****

In [0]:
df.withColumn('multiply', col('Item_Weight')*col('Item_MRP')).display()

### Scenario 2 -> replacing value in existing col

In [0]:
df.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'), 'Regular', 'Reg'))\
   .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'), 'Low Fat', 'Lf')).display()

# Type Casting

In [0]:
df = df.withColumn('Item_Weight',col('Item_Weight').cast(StringType()))
df.printSchema()


# Sort/Order by

### Scenario 1 - desc

In [0]:
df.sort(col('Item_Weight').desc()).display()

### Scenario 2 - asc

In [0]:
df.sort(col('Item_Visibility').asc()).display()

### Scenario 3 - sort mutiple cols in one 

In [0]:
df.sort(['Item_Weight','Item_Visibility'], ascending = [0,0]).display()

### Scenario 4 - sort mutiple cols in one but different order

In [0]:
df.sort(['Item_Weight','Item_Visibility'], ascending = [0,1]).display()

# Limit

In [0]:
df.limit(10).display()

# Drop 

### Scenario 1 -> drop col 

In [0]:
df.drop('Item_Visibility').display()

### Scenario 2 -> drop mutiple col at once

In [0]:
df.drop('Item_Visibility','Item_Type').display()

# drop_duplicates

### Scenario 1 -> simple drop duplicates

In [0]:
df.dropDuplicates().display()

### Scenario 2 -> drop duplicates from particular col

In [0]:
df.dropDuplicates(subset = ['Item_Type']).display()

### drop duplicates using distinct

In [0]:
df.distinct().display()

### Preparing dataframe

In [0]:
data1 = [('1','kad'),
         ('2','sid')]
schema1 = 'id STRING, name STRING'

df1 = spark.createDataFrame(data1,schema1)

data2 = [('3','rahul'),
         ('4','sid')]
schema2 = 'id STRING, name STRING'

df2 = spark.createDataFrame(data2,schema2)

In [0]:
df1.printSchema()

In [0]:
df1.display()
df2.display()

### union 

In [0]:
df1.union(df2).display()

In [0]:
data1 = [('kad','1'),
         ('sid','2')]
schema1 = 'name STRING, id STRING'

df1 = spark.createDataFrame(data1,schema1)


In [0]:
df1.union(df2).display()

### union by name 

In [0]:
df1.unionByName(df2).display()

# String functions

### INIT CAP

In [0]:
df.select(initcap('Item_Type')).display()

### Lower

In [0]:
df.select(lower('Item_Type')).display()

### Upper

In [0]:
df.select(upper('Item_Type').alias('upper_item_type')).display()

# Date Funcations

### current date

In [0]:
df = df.withColumn('curr_date',current_date())
df.display()

### date add

In [0]:
df = df.withColumn('week_after',date_add('curr_date',7))
df.display()

### date sub

In [0]:
df = df.withColumn('week_before',date_sub('curr_date',7))
df.display();

### 2nd way for date diff

In [0]:
df = df.withColumn('week_before',date_add('curr_date',-7))
df.display()

# Date Diff

In [0]:
df = df.withColumn('date diff',datediff('week_after','curr_date'))
df.display()

# date format

In [0]:
df = df.withColumn('week_before',date_format('week_before','dd-MM-yyyy'))
df.display()

# Handling nulls

### dropping nulls

In [0]:
df.dropna('all').display() # del records which has all col null

In [0]:
df.dropna('any').display() # drop record which has any col null

In [0]:
df.dropna(subset=['Item_Type']).display() #helps to delete in a particular cols

# filling nulls

In [0]:
df.fillna('NotAvailable').display()

In [0]:
df.fillna('NotAvailable',subset=['Outlet_Size']).display()

# Split and indexing

### split

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')).display()

### Indexing

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')[1]).display()

# Explode

In [0]:
df_exp = df.withColumn('Outlet_Type',split('Outlet_Type',' '))
df_exp.display()

In [0]:
df_exp.withColumn('Outlet_Type',explode('Outlet_Type')).display()

# Array Contains

In [0]:
df_exp.withColumn('Type1_flag',array_contains('Outlet_Type','Type1')).display()

# group by

### Scenario 1

In [0]:
df.groupBy('Item_Type').agg(sum('Item_MRP')).display()

### scenario 2

In [0]:
df.groupBy('Item_Type').agg(avg('Item_MRP')).display()

### Scenario 3

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP')).display()

### scenario 4

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP'),avg('Item_MRP')).display()

# collect list

In [0]:
data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]

schema = 'user string, book string'

df_book = spark.createDataFrame(data,schema)

df_book.display()

In [0]:
df_book.groupBy('user').agg(collect_list('book')).display()

# Pivot

In [0]:
df.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).display()

# When Otherwise

### scenario 1

In [0]:
df = df.withColumn('Veg_flag',when(col('Item_Type') == 'Meat', 'Non-Veg').otherwise('Veg'))
df.display()

### scenario 2

In [0]:
df.withColumn('veg_exp_flag',when(((col('veg_flag') == 'Veg') & (col('Item_MRP')<100)),'Veg_Inexpensive')\
  .when((col('veg_flag')=='Veg') & (col('Item_MRP')>100),'Veg_Expensive')\
  .otherwise('Non_Veg')).display() 

# Joins

In [0]:
dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 

schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 

df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schemaj2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataj2,schemaj2)

In [0]:
df1.display()
df2.display()

### Inner Join

In [0]:
df1.join(df2, df1['dept_id'] == df2['dept_id'], 'inner').display()

### Left Join

In [0]:
df1.join(df2, df1['dept_id'] == df2['dept_id'], 'left').display()

### Right Join

In [0]:
df1.join(df2, df1['dept_id'] == df2['dept_id'], 'right').display()

### Anti Join

In [0]:
df1.join(df2, df1['dept_id'] == df2['dept_id'], 'anti').display()

# Window function

### Row Number

In [0]:
df.display()

In [0]:
from pyspark.sql.window import Window

In [0]:
df.withColumn('rowCol',row_number().over(Window.orderBy('Item_Identifier'))).display()

### RANK VS DENSE RANK

In [0]:
df.withColumn('rank',rank().over(Window.orderBy(col('Item_Identifier').desc())))\
        .withColumn('denseRank',dense_rank().over(Window.orderBy(col('Item_Identifier').desc()))).display()

In [0]:
df.withColumn('sum',sum('Item_MRP').over(Window.orderBy('Item_Identifier').rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()

In [0]:
### Cumulative Sum

In [0]:
df.withColumn('cum_sum',sum('Item_MRP').over(Window.orderBy('Item_Type'))).display()

In [0]:
df.withColumn('cum_sum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()

In [0]:
df.withColumn('totalsum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).display()


# User Defined Function (UFD)

### Step - 1

In [0]:
def my_func(x):
    return x*x

### Step 2

In [0]:
my_udf = udf(my_func)

In [0]:
df.withColumn('mynewcol',my_udf('Item_MRP')).display()

# Data writing

### CSV

In [0]:
df.write.format('csv')\
  .save('/FileStore/tables/CSV/data.csv')

# Writing Modes

### Append

In [0]:
df.write.format('csv')\
  .mode('append')\
  .save('/FileStore/tables/CSV/data.csv')

In [0]:

df.write.format('csv')\
        .mode('append')\
        .option('path','/FileStore/tables/CSV/data.csv')\
        .save()

### Overwrite

In [0]:
df.write.format('csv')\
        .mode('overwrite')\
        .option('path','/FileStore/tables/CSV/data.csv')\
        .save()

### Error

In [0]:
df.write.format('csv')\
.mode('error')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

### Ignore

In [0]:

df.write.format('csv')\
.mode('ignore')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

### PARQUET

In [0]:
df.write.format('parquet')\
.mode('overwrite')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

# TABLE

In [0]:
df.write.format('parquet')\
.mode('overwrite')\
.saveAsTable('my_table')

In [0]:
df.display()

# Spark SQL

In [0]:
df.createTempView('my_view')

In [0]:
%sql
select * from my_view where Item_Fat_Content = 'Low Fat'

In [0]:
df_sql = spark.sql("select * from my_view where Item_Fat_Content = 'Low Fat'")

In [0]:
df_sql.display()