### Data Reading JSON

In [0]:
df_json = spark.read.format('json').option('inferSchema',True)\
    .option('header', True)\
        .option('multiLine',False)\
            .load('/Volumes/workspace/data/stramingjson/drivers.json')

### Data Reading

In [0]:
df = spark.read.format('csv').option('inferSchema', 'True').option('header', 'True').load('/Volumes/workspace/data/straming/BigMart Sales.csv')

In [0]:
 df.show()

In [0]:
df.display()

In [0]:
df.printSchema()

### DDL SCHEMA

In [0]:
my_ddl_schema = '''
                    Item_Identifier STRING,
                    It
                    em_Weight STRING,
                    Item_Fat_Content STRING,
                    Item_Visibility DOUBLE,
                    Item_Type ST
                    RING, 
                    Item_MRP DOUBLE,
                    Outlet_Identifier STRING,
                    Outlet_Establishment_Year INT,
                    Outlet_Size STRING,
                    Outlet_Location_Type STRING,
                    Outlet_Type STRING,
                    Item_Outlet_Sales DOUBLE
'''

In [0]:
df = spark.read.format('csv').schema(my_ddl_schema).option('header', 'True').load('/Volumes/workspace/data/straming/BigMart Sales.csv')

In [0]:
df.display()

In [0]:
df.printSchema()

### SELECT

In [0]:
df.display()

In [0]:
df.select('Item_Identifier','Item_Weight','Item_Fat_Content','Item_Visibility').display()

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window


In [0]:
df.select(col('Item_Identifier'),col('Item_Weight')).display()

In [0]:
df.select(col('Item_Identifier').alias('Item_id')).display()

In [0]:
df.display()

####Scenario 1

In [0]:
df.filter(col('Item_Fat_Content') == 'Regular').display()

####Scenario 2

In [0]:
df.filter((col('Item_Type')=='Soft Drinks') & (col('Item_Weight')<10)).display()

####Scenario 3

In [0]:
df.filter((col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1','Tier 2'))).display()

###WithColumnRenamed

In [0]:
df.withColumnRenamed('Item_Weight','Item_wt').display()

###withColumn

####Scenario1

In [0]:
df = df.withColumn('flag',lit('new'))
df.display()

In [0]:
df.withColumn('multiply',col("Item_Weight")*col("Item_MRP")).display()

####Scenario 2

In [0]:
df.withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),'Regular','Reg'))\
    .withColumn('Item_Fat_Content',regexp_replace(col('Item_Fat_Content'),'Low Fat','LF')).display()

###TypeCasting

In [0]:
df=df.withColumn("Item_Weight",col('Item_weight').cast(StringType()))

In [0]:
df.printSchema()

###sort

####scenario 1

In [0]:
df.sort(col('Item_Weight').desc()).display()

####Scenario2

In [0]:
df.sort(col('Item_Visibility').asc()).display()

####Scenario 3

In [0]:
df.sort(['Item_Weight','Item_Visibility'],ascending=[0,0]).display()

####scenario 4


In [0]:
df.sort(['Item_Weight','Item_Visibility'],descending=[0,1]).display()

###Limit

In [0]:
df.limit(10).display()

###Drop

####scenario1

In [0]:
df.drop('Item_Visibility').display()

####Scenario 2

In [0]:
df.drop('Item_Visibility','Item_Type').display()

###Drop Duplicates

In [0]:
df.dropDuplicates().display()


####Scenario 2

In [0]:
df.drop_duplicates(subset=["Item_Type"]).display()

In [0]:
df.distinct().display()

###UNION and UNION BY NAME

In [0]:
data1 = [('1','kad'),('2','sid')]
schema1 = 'id STRING, name STRING'
df1 = spark.createDataFrame(data1, schema1)

data2 = [('3','rahul'),('4','jas')]
schema2 = 'id STRING, name STRING'
df2 = spark.createDataFrame(data2, schema2)

 

In [0]:
df1.display()

In [0]:
df2.display()

###Union

In [0]:
df1.union(df2).display()


In [0]:
data1 = [('kad','1'),('sai','2')]
schema1 = 'name STRING, id STRING'
df1 = spark.createDataFrame(data1, schema1)

In [0]:
df1.union(df2).display()

###union by name

In [0]:
df1.unionByName(df2).display()

###String Function

####Initcap()


In [0]:
df.select(initcap('Item_Type')).display()

In [0]:
df.select(lower('Item_Type')).display()

In [0]:
df.display()

In [0]:
df.select(upper('Item_Type')).display()

###Date Functions

####current_date

In [0]:
df=df.withColumn('current_date',current_date())
df.display()

####date_add()

In [0]:
df=df.withColumn('week_after',date_add('current_date',7))
df.display()


###Date_sub()

In [0]:
df.withColumn('week_before',date_sub('current_date',7)).display()


In [0]:
df=df.withColumn('week_before',date_add('current_date',-7))
df.display()

###Date_Diff()

In [0]:
df=df.withColumn('datediff',datediff('current_date','week_after'))
df.display()

###Date_format()

In [0]:
df=df.withColumn('week_before',date_format('week_before','dd-MM-yyyy'))
df.display()

###Handling NULLS

####Dropping Nulls

In [0]:
df.dropna('all').display()

In [0]:
df.dropna('any').display()

In [0]:
df.dropna(subset=['Outlet_Size']).display()

####Filling NULLS

In [0]:
df.fillna('Not Available').display()

In [0]:
df.fillna('NA',subset=['Outlet_Size']).display()

In [0]:
df.display()

###Split & Indexing


In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')).display()


###Indexing

In [0]:
df.withColumn('Outlet_Type',split('Outlet_Type',' ')[1]).display()

###Explode

In [0]:
df_exp = df.withColumn('Outlet_Type',split('Outlet_Type',' '))
df_exp.display()


In [0]:
df_exp.withColumn('Outlet_Type',explode('Outlet_type')).display()

###Array_Contains

In [0]:
df_exp.display()

In [0]:
df_exp.withColumn('Type1_flag',array_contains('Outlet_Type','Type1')).display()

###GroupBy

####scenario 1

In [0]:
df.display()

In [0]:
df.groupBy('Item_Type').agg(sum('Item_MRP')).display()

####Scenario2

In [0]:
df.groupBy('Item_Type').agg(avg('Item_MRP')).display()

####Scenario 3

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP').alias('Toatal_mrp')).display()

####scenario 4

In [0]:
df.groupBy('Item_Type','Outlet_Size').agg(sum('Item_MRP'),avg('Item_MRP')).display()

####Scenario 5 (COLLECT_LIST)

In [0]:
data = [('user1','book1'), ('user1','book2'), ('user2','book2'), ('user3','book4'),('user3','book1')]
schema = 'user string, book string'
df_book = spark.createDataFrame(data, schema)
df_book.display()

In [0]:
df_book.groupBy('user').agg(collect_list('book')).display()

###Pivot


In [0]:
df.select('Item_Type','Outlet_Size','Item_MRP').display()

In [0]:
df.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).display()

In [0]:
df.select('Item_Type','Item_MRP','Outlet_Establishment_Year','Outlet_Size','Outlet_Location_Type').display()

###When-Otherwise

####Scenario 1

In [0]:
df = df.withColumn('veg_flag',when(col('Item_Type')=='Meat','Non-veg').otherwise('veg'))

####Scenario 2

In [0]:
df.withColumn('veg_exp_flag',when((col('veg_flag')=='veg') & (col('Item_MRP')<100),'veg_Inexpensive')\
  .when((col('veg_flag')=='veg') & (col('Item_MRP')>100),'veg_expensive')\
    .otherwise('non-veg')).display()


###JOINS




In [0]:
dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')] 

schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING' 

df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schemaj2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataj2,schemaj2)

In [0]:
df1.display()

In [0]:
df2.display()

####Inner join

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'inner').display()

####left join

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'left').display()

####Right join

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'right').display()

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'outer').display()

####Anti join

In [0]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'anti').display()

###Window Functions

####ROW_NUMBER()

In [0]:
df.withColumn('rowCol',row_number().over(Window.orderBy('Item_Identifier'))).display()

####RANK() and DENSE_RANK()

In [0]:
df.withColumn('rank',rank().over(Window.orderBy('Item_Identifier'))).display()

In [0]:
df.withColumn('rank',rank().over(Window.orderBy(col('Item_Identifier').desc())))\
    .withColumn('dense_rank',dense_rank().over(Window.orderBy(col('Item_Identifier').desc())))


####CUMMULATIVE_SUM

In [0]:
df.withColumn('cum_sum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()

In [0]:
df.withColumn('total_sum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).display()

###User Defined Functions (UDF)

####STEP-1

In [0]:
def my_func(x):
  return x*x

####Step2

In [0]:
my_udf = udf(my_func)

In [0]:
df.withColumn('myNewCol',my_udf('Item_MRP')).display()

[0;31m---------------------------------------------------------------------------[0m
[0;31mParseException[0m                            Traceback (most recent call last)
File [0;32m<command-7939590620976869>, line 1[0m
[0;32m----> 1[0m df[38;5;241m.[39mwithColumn([38;5;124m'[39m[38;5;124mmyNewCol[39m[38;5;124m'[39m,my_udf([38;5;124m'[39m[38;5;124mItem_MRP[39m[38;5;124m'[39m))[38;5;241m.[39mdisplay()

File [0;32m/databricks/python_shell/lib/dbruntime/monkey_patches.py:72[0m, in [0;36mapply_dataframe_display_patch.<locals>.df_display[0;34m(df, *args, **kwargs)[0m
[1;32m     68[0m [38;5;28;01mdef[39;00m [38;5;21mdf_display[39m(df, [38;5;241m*[39margs, [38;5;241m*[39m[38;5;241m*[39mkwargs):
[1;32m     69[0m [38;5;250m    [39m[38;5;124;03m"""[39;00m
[1;32m     70[0m [38;5;124;03m    df.display() is an alias for display(df). Run help(display) for more information.[39;00m
[1;32m     71[0m [38;5;124;03m    """[39;00m
[0;32m---> 72[0m    

###DATA_WRITING()

####csv

In [0]:
df.write.format('csv')\
  .save('/Volumes/workspace/data/straming/data1.csv')

####Append

In [0]:
df.write.format('csv')\
  .mode('append')\
    .save('/Volumes/workspace/data/straming/data1.csv')

In [0]:
df.write.format('csv')\
  .mode('append')\
    .option('path','/Volumes/workspace/data/straming/data1.csv')\
        .save()

####Overwrite

In [0]:
df.write.format('csv')\
  .mode('overwrite')\
    .option('path','/Volumes/workspace/data/straming/data1.csv')\
        .save()

####ERROR

In [0]:
df.write.format('csv')\
  .mode('error')\
    .option('path','/Volumes/workspace/data/straming/data1.csv')\
        .save()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-6334381942296986>, line 4[0m
[1;32m      1[0m df[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m'[39m[38;5;124mcsv[39m[38;5;124m'[39m)\
[1;32m      2[0m   [38;5;241m.[39mmode([38;5;124m'[39m[38;5;124merror[39m[38;5;124m'[39m)\
[1;32m      3[0m     [38;5;241m.[39moption([38;5;124m'[39m[38;5;124mpath[39m[38;5;124m'[39m,[38;5;124m'[39m[38;5;124m/Volumes/workspace/data/straming/data1.csv[39m[38;5;124m'[39m)\
[0;32m----> 4[0m         [38;5;241m.[39msave()

File [0;32m/databricks/python/lib/python3.12/site-packages/pyspark/sql/connect/readwriter.py:703[0m, in [0;36mDataFrameWriter.save[0;34m(self, path, format, mode, partitionBy, **options)[0m
[1;32m    701[0m     [38;5;28mself[39m[38;5;241m.[39mformat([38;5;28mformat[39m)
[1;32m    702[0m

####IGNORE

In [0]:
df.write.format('csv')\
  .mode('ignore')\
    .option('path','/Volumes/workspace/data/straming/data1.csv')\
        .save()

### PARQUET

In [0]:
df.write.format('parquet')\
  .mode('overwrite')\
    .option('path','/Volumes/workspace/data/straming/data1.parquet')\
        .save()

###Table

In [0]:
df.write.format('parquet')\
  .mode('overwrite')\
     .saveAsTable('my_Table')

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-4815279975263165>, line 3[0m
[1;32m      1[0m df[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m'[39m[38;5;124mparquet[39m[38;5;124m'[39m)\
[1;32m      2[0m   [38;5;241m.[39mmode([38;5;124m'[39m[38;5;124moverwrite[39m[38;5;124m'[39m)\
[0;32m----> 3[0m      [38;5;241m.[39msaveAsTable([38;5;124m'[39m[38;5;124mmy_Table[39m[38;5;124m'[39m)

File [0;32m/databricks/python/lib/python3.12/site-packages/pyspark/sql/connect/readwriter.py:737[0m, in [0;36mDataFrameWriter.saveAsTable[0;34m(self, name, format, mode, partitionBy, **options)[0m
[1;32m    735[0m [38;5;28mself[39m[38;5;241m.[39m_write[38;5;241m.[39mtable_name [38;5;241m=[39m name
[1;32m    736[0m [38;5;28mself[39m[38;5;241m.[39m_write[38;5;241m.[39mtable_save_method [38;5;241m=[39m 

In [0]:
df.display()


###spark sql

####create temp view

In [0]:
df.createTempView('my_view')

In [0]:
%sql
select * from my_view

In [0]:
%sql
select * from my_view where Item_Fat_Content = 'Regular'

In [0]:
df_sql = spark.sql("select * from my_view where Item_Fat_Content = 'Regular'")

In [0]:
df_sql.display()