Advance Commands

# DATA READING

### Data Reading JSON

### Collect_List

In [None]:
data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]

schema = 'user string, book string'

df_book = spark.createDataFrame(data,schema)

df_book.display()

In [None]:
df_book.groupBy('user').agg(collect_list('book')).display()

In [None]:
df.select('Item_Type','Outlet_Size','Item_MRP').display()

### PIVOT

In [None]:
df.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).display()

### When-Otherwise

#### Scenario - 1

In [None]:
df = df.withColumn('veg_flag',when(col('Item_Type')=='Meat','Non-Veg').otherwise('Veg'))

In [None]:
df.display()

In [None]:
df.withColumn('veg_exp_flag',when(((col('veg_flag')=='Veg') & (col('Item_MRP')<100)),'Veg_Inexpensive')\
                            .when((col('veg_flag')=='Veg') & (col('Item_MRP')>100),'Veg_Expensive')\
                            .otherwise('Non_Veg')).display()

### JOINS

In [None]:
dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')]

schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING'

df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]

schemaj2 = 'dept_id STRING, department STRING'

df2 = spark.createDataFrame(dataj2,schemaj2)

In [None]:
df1.display()

In [None]:
df2.display()

#### Inner Join

In [None]:
df1.join(df2, df1['dept_id']==df2['dept_id'],'inner').display()

#### Left Join

In [None]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'left').display()

#### LEFT JOIN

In [None]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'right').display()

#### ANTI JOIN

In [None]:
df1.join(df2,df1['dept_id']==df2['dept_id'],'anti').display()

### WINDOW FUNCTIONS

#### ROW_NUMBER()

In [None]:
df.display()

In [None]:
from pyspark.sql.window import Window

In [None]:
df.withColumn('rowCol',row_number().over(Window.orderBy('Item_Identifier'))).display()

#### RANK VS DENSE RANK

In [None]:
df.withColumn('rank',rank().over(Window.orderBy(col('Item_Identifier').desc())))\
        .withColumn('denseRank',dense_rank().over(Window.orderBy(col('Item_Identifier').desc()))).display()

In [None]:
df.withColumn('dum',sum('Item_MRP').over(Window.orderBy('Item_Identifier').rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()

#### Cumulative Sum

In [None]:
df.withColumn('cumsum',sum('Item_MRP').over(Window.orderBy('Item_Type'))).display()

In [None]:
df.withColumn('cumsum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()

In [None]:
df.withColumn('totalsum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).display()

### USER DEFINED FUNCTIONS (UDF)

#### STEP - 1

In [None]:
def my_func(x):
    return x*x

#### STEP - 2

In [None]:
my_udf = udf(my_func)

In [None]:
df.withColumn('mynewcol',my_udf('Item_MRP')).display()

### DATA WRITING

#### CSV

In [None]:
df.write.format('csv')\
        .save('/FileStore/tables/CSV/data.csv')

#### APPEND

In [None]:
df.write.format('csv')\
        .mode('append')\
        .save('/FileStore/tables/CSV/data.csv')

In [None]:
df.write.format('csv')\
        .mode('append')\
        .option('path','/FileStore/tables/CSV/data.csv')\
        .save()

#### Overwrite

In [None]:
df.write.format('csv')\
.mode('overwrite')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

#### Error

In [None]:
df.write.format('csv')\
.mode('error')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

#### Ignore

In [None]:
df.write.format('csv')\
.mode('ignore')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

#### PARQUET

In [None]:
df.write.format('parquet')\
.mode('overwrite')\
.option('path','/FileStore/tables/CSV/data.csv')\
.save()

#### TABLE

In [None]:
df.write.format('parquet')\
.mode('overwrite')\
.saveAsTable('my_table')

In [None]:
df.display()

### SPARK SQL

#### createTempView

In [None]:
df.createTempView('my_view')

In [None]:
%sql

select * from my_view where Item_Fat_Content = 'Lf'

In [None]:
df_sql = spark.sql("select * from my_view where Item_Fat_Content = 'Lf'")

In [None]:
df_sql.display()