### DATA READING JSON

In [0]:
df_json = spark.read.format('json').option('inferSchema',True).option('header',True).option('multiline',False).load('/Volumes/databrickstutorial/dataset/drivers/drivers.json')

In [0]:
df_json.display()

###  ***DATA READING***

In [0]:
df = spark.read.format("csv").option('inferSchema',True).option('header',True).load('/Volumes/databrickstutorial/dataset/bigmartsales/BigMart Sales.csv')

In [0]:
df.display()

### SCHEMA DEFINITION

In [0]:
df.printSchema()

### DDL SCHEMA

In [0]:
my_ddl_schema = '''
  Item_Identifier string,
  Item_Weight string,
  Item_Fat_Content string,
  Item_Visibility double,
  Item_Type string,
  Item_MRP double,
  Outlet_Identifier string,
  Outlet_Establishment_Year integer,
  Outlet_Size string,
  Outlet_Location_Type string,
  Outlet_Type string,
  Item_Outlet_Sales double'''

In [0]:
df = spark.read.format("csv")\
    .schema(my_ddl_schema)\
    .option('header',True)\
    .load('/Volumes/databrickstutorial/dataset/bigmartsales/BigMart Sales.csv')

In [0]:
df.display()

In [0]:
df.printSchema()

### STRUCTTYPE SCHEMA

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
my_struct_schema = StructType([
    StructField('Item_identifier', StringType(),True),
    StructField('Item_Weight', StringType(),True),
    StructField('Item_Fat_Content', StringType(),True),
    StructField('Item_Visibility', DoubleType(),True),
    StructField('Item_Type', StringType(),True),
    StructField('Item_MRP', DoubleType(),True),
    StructField('Outlet_Identifier', StringType(),True),
    StructField('Outlet_Establishment_Year', IntegerType(),True),
    StructField('Outlet_Size', StringType(),True),
    StructField('Outlet_Location_Type', StringType(),True),
    StructField('Outlet_Type', StringType(),True),
    StructField('Item_Outlet_Sales', DoubleType(),True)
])

In [0]:
df = spark.read.format('csv')\
    .schema(my_struct_schema)\
    .option('header',True)\
    .load('/Volumes/databrickstutorial/dataset/bigmartsales/BigMart Sales.csv')

In [0]:
df.printSchema()

### SELECT

In [0]:
df_select = df.select('Item_identifier','Item_Weight','Item_Fat_Content').display()

### COLUMN

In [0]:
df.select(col('Item_identifier'),col('Item_Weight'),col('Item_Fat_Content')).display()

### ALIAS

In [0]:
df.select(col('Item_identifier').alias('Item_ID')).display()

### FILTER/WHERE

In [0]:
# Filter the data with fat content = regular
df.filter(col('Item_Fat_Content') == 'Regular').display()

In [0]:
# Filter item type soft drinks where item weight is less than 10
df.filter(
    (col('Item_Type') == 'Soft Drinks') & (col('Item_Weight')<10.0)).display()

In [0]:
# Fetch the data with tier in (tier1 or tier2) and outlet size is null
df.filter((col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin(['Tier 1','Tier 2']))).display()

### WITH COLUMN RENAMED

In [0]:
df.withColumnRenamed('Item_Weight','Item_Wt').display()

### WITHCOLUMN

In [0]:
# Creating a New Column
df = df.withColumn('flag', lit("new"))
df.display()

In [0]:
df.withColumn('multiply',col("Item_Weight")*col("Item_MRP")).display()

In [0]:
from pyspark.sql.functions import regexp_replace
df.withColumn("Item_Fat_Content", regexp_replace(col("Item_Fat_Content"),"Regular","Reg"))\
    .withColumn("Item_Fat_Content", regexp_replace(col("Item_Fat_Content"),"Low Fat","LF")).display()


### TYPE CASTING

In [0]:
df = df.withColumn("Item_Weight",col("Item_Weight").cast(StringType()))

In [0]:
df.printSchema()

### SORT/ORDERBY

In [0]:
df.sort(col("Item_Weight").desc()).display()

In [0]:
df.sort(col("Item_Visibility").asc()).display()

In [0]:
# Sortinf based on multiple columns
df.sort(["Item_Weight","Item_Visibility"],ascending = [0,0]).display()

In [0]:
# One column asc and other column desc
df.sort(["Item_Weight","Item_Visibility"],ascending = [0,1]).display()

### LIMIT

In [0]:
df.limit(10).display()

### DROP

In [0]:
#Droping one column
df.drop("Item_Visibility").display()

In [0]:
# Droping multiple columns
df.drop("Item_Visibility","Item_Type").display()

### DROP DUPLICATES

In [0]:
df.dropDuplicates().display()

In [0]:
df.drop_duplicates(subset = ["Item_Type"]).display()

In [0]:
df.distinct().display()

### UNION AND UNION BYNAME

In [0]:
data1 = [('1','Kad'),
         ('2','sid')]
schema1 = 'id STRING , name STRING'

df1 = spark.createDataFrame(data1,schema1)

data2= [('3','rahul'),
        ('4','jas')]
schema2 = 'id STRING , name STRING'

df2 = spark.createDataFrame(data2,schema2)

In [0]:
df1.display()

In [0]:
df2.display()

### UNION

In [0]:
df1.union(df2).display()

In [0]:
data1 = [('Kad','1'),
         ('sid','2')]
schema1 = 'name STRING , id STRING'

df1 = spark.createDataFrame(data1,schema1)
df1.display()

In [0]:
df1.union(df2).display()

### UNION BYNAME

In [0]:
df1.unionByName(df2).display()

### STRING FUNCTIONS

### INITCAP()

In [0]:
df.select(initcap("Item_Type").alias("Initcap_Item_Type")).display()

### UPPER

In [0]:
df.select(upper("Item_Type").alias("upper_Item_Type")).display()

### LOWER

In [0]:
df.select(lower("Item_Type").alias("lower_Item_Type")).display()

### DATE FUNCTIONS

### CCURRENT DATE

In [0]:
df = df.withColumn("curr_date", current_date())
df.display() 

### DATE ADD FUNCTION

In [0]:
df = df.withColumn('week_after', date_add('curr_date', 7))
df.display()

### DATE SUB

In [0]:
df.withColumn('week_before', date_sub('curr_date', 7)).display()

In [0]:
df = df.withColumn('week_before',date_add('curr_date', -7))
df.display()

### DATE DIFF

In [0]:
df =df.withColumn('datediff', datediff('week_after', 'curr_date'))
df.display()

### DATE FORMAT

In [0]:
df = df.withColumn('week_before', date_format('week_before','dd-MM-yyyy'))
df.display()

### HANDLING NULLS

### DROPPING NULLS

In [0]:
df.dropna('all').display()

In [0]:
df.dropna('any').display()

In [0]:
df.dropna(subset=['Outlet_Size']).display()

### FILLING NULLS

In [0]:
df.fillna('NotAvailable').display()

In [0]:
df.fillna('NotAvailable',subset=['Outlet_Size']).display()

### SPLIT AND INDEXING

### SPLIT

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')).display()

### INDEXING

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')[1]).display()

### EXPLODE

In [0]:
df_exp = df.withColumn('Outlet_Type',split('Outlet_Type',' '))
df_exp.display()

In [0]:
df_exp.withColumn('Outlet_Type',explode('Outlet_Type')).display()

### ARRAY_CONTAINS

In [0]:
df_exp.withColumn('Type1_flag',array_contains('Outlet_Type','Type1')).display()

### GROUP BY

In [0]:
df.groupBy('Item_Type').agg(sum("Item_MRP")).display()

In [0]:
df.groupBy('Item_Type').agg(avg("Item_MRP")).display()

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP')).display()

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP'),avg('Item_MRP')).display()

### COLLECT LIST

In [0]:
data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]

schema = 'user string , book string'

df_book = spark.createDataFrame(data,schema)
df_book.display()

In [0]:
df_book.groupBy('user').agg(collect_list("book")).display()

### PIVOT

In [0]:
df.groupBy("Item_Type").pivot("Outlet_Size").agg(avg("Item_MRP")).display()

### WHEN OTHERWISE

In [0]:
df = df.withColumn('Veg_flag', 
    when(col("Item_Type") == "Meat", "Non-Veg")
    .otherwise("Veg")
)
df.display()

In [0]:
from pyspark.sql.functions import col, when

df.withColumn(
    "Veg_exp_flag",
    when(
        (col("Veg_flag") == "Veg") & (col("Item_MRP") < 100),
        "Veg_Inexpensive"
    ).when(
        (col("Veg_flag") == "Veg") & (col("Item_MRP") >= 100),
        "Veg_Expensive"
    ).otherwise("Non_Veg")
).display()


### JOINS

In [0]:
# 1. Define Data and Schema for First DataFrame
dataj1 = [('1','gaur','d01'),
          ('2','sita','d02'),
          ('3','ram','d03'),
          ('4','shyam','d03'),
          ('5','aman','d05')]
schemaj1 = 'emp_id string, name string, dept_id string'

# Corrected function name: createDataFrame
df1 = spark.createDataFrame(dataj1, schemaj1)

# 2. Define Data and Schema for Second DataFrame
dataj2 = [('d01','hr'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]
schemaj2 = 'dept_id string, dept_name string'

# Corrected function name: createDataFrame
df2 = spark.createDataFrame(dataj2, schemaj2)
        


In [0]:
df1.display()

In [0]:
df2.display()

### INNER JOIN

In [0]:
df1.join(df2,df1["dept_id"]==df2["dept_id"]).display()

### LEFT JOIN

In [0]:
df1.join(df2,df1["dept_id"]==df2["dept_id"],"left").display()

### RIGHT JOIN

In [0]:
df1.join(df2,df1["dept_id"]==df2["dept_id"],"right").display()

### ANTI JOIN

In [0]:
df1.join(df2,df1["dept_id"]==df2["dept_id"],"anti").display()

### WINDOW FUNCTIONS

### ROW NUMBER

In [0]:
df.display()

In [0]:
from pyspark.sql.window import Window

In [0]:
df.withColumn("rowCol",row_number().over(Window.orderBy('Item_Identifier'))).display()

### RANK

In [0]:
df.withColumn("rank",rank().over(Window.orderBy("Item_Identifier"))).display()

### DENSE RANK

In [0]:
df.withColumn("rank",rank().over(Window.orderBy(col("Item_Identifier").desc()))).display()

In [0]:
df.withColumn("rank",rank().over(Window.orderBy(col("Item_Identifier").desc())))\
    .withColumn("denserank",dense_rank().over(Window.orderBy(col("Item_Identifier").desc()))).display()


### CUMULATIVE SUM

In [0]:
df.withColumn('cumsum',sum("Item_MRP").over(Window.orderBy('Item_Type'))).display()

In [0]:
  df.withColumn('cumsum',sum("Item_MRP").over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()

In [0]:
  df.withColumn('cumsum',sum("Item_MRP").over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).display()

### USER DEFINED FUNCTIONS

## STEP 1

In [0]:
def my_func(x):
    return x*x

## STEP 2

In [0]:
my_udf = udf(my_func)

In [0]:
df.withColumn("mynewcol",my_udf("Item_MRP")).display()

### DATA WRITING

## CSV

In [0]:
# df.write.format('csv')\
#     .save('/FileStore/tables/CSV/data.csv')

## APPEND

In [0]:
# df.write.format("csv")\
#     .mode('append')\
#         .option('path','/FileStore/tables/CSV/data.csv')\
#             .save()

## OVERWRITE

In [0]:
# df.write.format('csv')\
#     .mode('overwrite')\
#         .option('path','/FileStore/tables/CSV/data.csv')\
#             .save()

## IGNORE

In [0]:
# df.write.format('csv')\
#     .mode('ignore')\
#         .option('path','/FileStore/tables/CSV/data.csv')\
#             .save()

### PARQUET

In [0]:
# df.write.format('parquet')\
#     .mode('overwrite')\
#         .option('path','/FileStore/tables/parquet/data.parquet')\
#             .save()

## TABLE

In [0]:
# df.write.format('parquet')\
#     .mode('overwrite')\
#         .saveAsTable('my_table')

### SPARK SQL

## createTempView

In [0]:
df.createTempView('my_view1')

In [0]:
%sql
SELECT *
FROM my_view1
WHERE LOWER(Item_Fat_Content) = 'lf';

In [0]:

df_sql = spark.sql("select * from my_view1 where Item_Fat_Content = 'Lf'")

In [0]:
df_sql.display()