# **From Databricks Notebook (sourced as .py, converted to .ipynb)**

In [None]:
dbutils.fs.ls('/FileStore/')  # peeking into '/FileStore/'

In [None]:
dbutils.fs.ls('/FileStore/tables/')   # peeking into '/FileStore/tables/'

##### **Above o/p shows where our required file is residing. Copy that path and use it as shown in cell ahead:**

In [None]:
df = spark.read.format('csv').option('inferSchema', True).option('header', True).load('/FileStore/tables/BigMart_Sales.csv')
df.display()

#### **Reading JSON**

In [None]:
df_json = spark.read.format('json').option('inferSchema', True)\
                    .option('header', True)\
                    .option('multiline', False)\
                    .load('/FileStore/tables/drivers.json')

df_json.display()

#### **Schema Definition**

In [None]:
df.printSchema()

#### **Creating a DDL Schema to apply to our frame 'df'**

In [None]:
my_ddl_schema = '''
                    Item_Identifier STRING,
                    Item_Weight STRING,
                    Item_Fat_Content STRING,
                    Item_Visibility DOUBLE,
                    Item_Type STRING,
                    Item_MRP DOUBLE,
                    Outlet_Identifier STRING,
                    Outlet_Establishment_Year INT,
                    Outlet_Size STRING,
                    Outlet_Location_Type STRING,
                    Outlet_Type STRING,
                    Item_Outlet_Sales DOUBLE
                '''

df = spark.read.format('csv')\
                .schema(my_ddl_schema)\
                    .option('header', True)\
                        .load('/FileStore/tables/BigMart_Sales.csv')

In [None]:
df.display()

In [None]:
df.printSchema()   # 'Item_Weight' was DOUBLE earlier, now it is STRING

#### **Creating a StructType() Schema**

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [None]:
my_struct_schema = StructType([
                            StructField('Item_Identifier', StringType(), True),  # True --> NULLs allowed
                            StructField('Item_Weight', StringType(), True),
                            StructField('Item_Fat_Content', StringType(), True),
                            StructField('Item_Visibility', StringType(), True),
                            StructField('Item_Type', StringType(), True),
                            StructField('Item_MRP', StringType(), True),
                            StructField('Outlet_Identifier', StringType(), True),
                            StructField('Outlet_Establishment_Year', StringType(), True),
                            StructField('Outlet_Size', StringType(), True),
                            StructField('Outlet_Location_Type', StringType(), True),
                            StructField('Outlet_Type', StringType(), True),
                            StructField('Item_Outlet_Sales', StringType(), True)
])

In [None]:
# Applying above 'my_struct_schema' to our df

df = spark.read.format('csv').schema(my_struct_schema).option('header', True).load('/FileStore/tables/BigMart_Sales.csv')
df.printSchema()

##### **So that was about data loading, custom schema creation and application on df. Let's reload the CSV file for our further tasks.**

In [None]:
df = spark.read.format('csv').option('inferSchema', True).option('header', True).load('/FileStore/tables/BigMart_Sales.csv')
df.printSchema()

##### **Now that we have the original file's content again, we proceed to SELECT.**

#### **SELECT**

In [None]:
df.columns  # glancing at all cols

In [None]:
df.select('Item_Identifier', 'Item_Weight', 'Item_Fat_Content').display()

In [None]:
# ANOTHER WAY to write above cell's code

df.select(col('Item_Identifier'), col('Item_Weight'), col('Item_Fat_Content')).display()

#### **ALIAS**

In [None]:
df.select(col('Item_Identifier').alias('Item_ID')).display()

#### **FILTER**

##### **Scenario-1**

In [None]:
df.filter(col('Item_Fat_Content') == 'Regular').display()

##### **Scenario - 2**

In [None]:
# Pay attention to the syntax

df.filter((col('Item_Type') == 'Soft Drinks') & (col('Item_Weight') < 10)).display()

##### **Scenario - 3**

In [None]:
df.filter((col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1', 'Tier 2'))).display()

In [None]:
# Selecting only first 3 columns from above output  (just an exercise)

df.filter((col('Outlet_Size').isNull()) & (col('Outlet_Location_Type').isin('Tier 1', 'Tier 2')))\
    .select(col('Item_Identifier'), col('Item_Weight'), col('Item_Fat_Content'))\
        .display()

#### **withColumnRenamed()** -- Changing column name at frame level

In [None]:
df.withColumnRenamed('Item_Weight', 'Item_Wt').display()

#### **withColumn()** -- Adding new column

##### **Scenario - 1**

In [None]:
# New col's name is 'Flag' and it will contain the literal 'new' as values

df = df.withColumn('Flag', lit("new"))
df.display()

##### Creating a new col based on some calculation

In [None]:
df.withColumn('Multiply', col('Item_Weight') * col('Item_MRP')).display()

##### **Scenario - 2**

In [None]:
# Using a frame's col name as it is (e.g. 'Item_Fat_Content' below) modifies the existing col of that name
# If a different col name is specified, then withColumn() creates a new col of that different name
# regexp_replace() finds a string pattern and replaces it with new string

df.withColumn('Item_Fat_Content', regexp_replace(col('Item_Fat_Content'), "Regular", "Reg"))\
    .withColumn('Item_Fat_Content', regexp_replace(col('Item_Fat_Content'), "[Ll]ow [Ff]at", "LF"))\
        .display()

#### **Type Casting**

In [None]:
df = df.withColumn('Item_Weight', col('Item_Weight').cast(StringType()))

In [None]:
df.printSchema()

#### **Sorting**

##### **Scenario - 1**

In [None]:
df.sort(col('Item_Weight').desc()).display()

##### **Scenario - 2**

In [None]:
df.sort(col('Item_Visibility').asc()).display()

##### **Scenario - 3** : Sorting on multiple columns



In [None]:
df.sort(['Item_Weight', 'Item_Visibility'], ascending = [0, 0])\
    .display()          # [0, 0] --> false, false

##### **Scenario - 4** : Sorting on multiple columns (one asc, one desc)


In [None]:
# O/p is 1st sorted desc on Item_Weight & then based on that, 'Item_Visibility' is sorted asc;
# that's why, 0 is not coming up in 'Item_Visibility' at the top

df.sort(['Item_Weight', 'Item_Visibility'], ascending = [0, 1])\
    .display()

#### **LIMIT**

In [None]:
df.limit(5).display()

## **INTERMEDIATE TRANSFORMATIONS**

#### **DROP**

In [None]:
df.drop('Item_Visibility').display()  # multi col drop --> df.drop('col_name', 'col_name', ...)

#### **DROP_DUPLICATES**

##### **Scenario - 1**

In [None]:
df.dropDuplicates().display()   # 'drop_duplicates()' is also valid

##### **Scenario - 2**

In [None]:
# Dropping duplicates based on one or more columns

df.drop_duplicates(subset = ['Item_Type']).display()

#### **UNION and UNIONBYNAME**

In [None]:
data1 = [('1','kad'), ('2','sid')]
schema1 = 'id STRING, name STRING'
df1 = spark.createDataFrame(data1,schema1)

data2 = [('3','rahul'), ('4','jas')]
schema2 = 'id STRING, name STRING'
df2 = spark.createDataFrame(data2,schema2)

In [None]:
df1.display()

In [None]:
df2.display()

In [None]:
df1.union(df2).display()     # UNION

In [None]:
# UNION works even if the ordering of the cols in frame is changed.
# ONLY condition is: no. of cols should be same

data1 = [('kad','1',), ('sid','2',)]
schema1 = 'name STRING, id STRING'

df1 = spark.createDataFrame(data1, schema1)
df1.display()

In [None]:
df1.union(df2).display()

In [None]:
df1.unionByName(df2).display()     # UNIONBYNAME

#### **String Functions**

In [None]:
df.select(initcap('Item_Type')).display()  # upper(), lower() fns.

In [None]:
df.select(initcap('Item_Type').alias('Initials_Capitalized')).display()

#### **DATE FUNCTIONS**

In [None]:
df = df.withColumn('Curr_Date', current_date())    # CURRENT_DATE()
df.limit(5).display()

In [None]:
df = df.withColumn('Week_After', date_add('Curr_Date', 7))   # DATE_ADD()
df.limit(5).display()

In [None]:
df = df.withColumn('Week_Before', date_sub('Curr_Date', 7))    # DATE_SUB()
df.limit(5).display()

In [None]:
# Another way to do the above job

df = df.withColumn('Week_Before', date_add('Curr_Date', -7))     #  specifying -7
df.limit(5).display()

In [None]:
df = df.withColumn('DateDiff', datediff('Week_After', 'Curr_Date'))     # DATEDIFF()
df.limit(5).display()

In [None]:
df = df.withColumn('Week_Before', date_format('Week_Before', 'dd-MM-yyyy'))   # DATE_FORMAT()
df.limit(5).display()

#### **Handling NULLs**

In [None]:

df.dropna('all').count()

df.dropna().count()  # dropping records having NULL in any col; then, counting remaining records

df.dropna(subset=['Outlet_Size']).count()  # dropping records with Outlet_Size = NULL

##### **Filling NA with custom value**

In [None]:
# Replace all NULLs with 'NotAvailable' across the frame

df.fillna('NotAvailable').limit(10).display()

In [None]:
# Replace selected NULLs with 'NotAvailable' in a column; other NULLs remain unaffected

df.fillna('NotAvailable', subset = 'Outlet_Size').limit(10).display()

#### **SPLIT** -- Splitting column vals (within the col itself) and accessing them via indexing

In [None]:
df.withColumn('Outlet_Type', split('Outlet_Type', ' ')).limit(5).display()

In [None]:
# Acessing & displaying the 2nd of the split vals in col 'Outlet_Type'
df.withColumn('Outlet_Type', split('Outlet_Type', ' ')[1]).limit(5).display()

#### **EXPLODE** -- multiple vals in a col's cell are exploded over separate rows

In [None]:
df_explode = df.withColumn('Outlet_Type', split('Outlet_Type', ' '))
df_explode.limit(5).display()

In [None]:
df_explode.withColumn('Outlet_Type', explode('Outlet_Type')).display()

#### **ARRAY_CONTAINS(col_name, search_string)**  -- returns true/false if col_name contains search_string

In [None]:
df_explode.limit(5).display()

In [None]:
df_explode.withColumn('Type1_Flag', array_contains('Outlet_Type', 'Type1')).limit(5).display()

#### **groupBy()**

##### **Scenario - 1**

In [None]:

df.groupBy('Item_Type').agg(sum('Item_MRP')).display()



##### **Scenario - 2**

In [None]:
df.groupBy('Item_Type').agg(avg('Item_MRP')).display()



##### **Scenario - 3**

In [None]:

# COMMAND ----------

df.groupBy('Item_Type', 'Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP')).display()



##### **Scenario - 4**

In [None]:
df.groupBy('Item_Type', 'Outlet_Size').agg(sum('Item_MRP').alias('Total_MRP'), avg('Item_MRP').alias('Avg_MRP')).display()

## **ADVANCED TRANSFORMATIONS**

#### **COLLECT_LIST()    -- Same as GROUP_CONCAT() in MySQL**

In [None]:
data = [('user1','book1'),
        ('user1','book2'),
        ('user2','book2'),
        ('user2','book4'),
        ('user3','book1')]

schema = 'user STRING, book STRING'
df_book = spark.createDataFrame(data, schema)

df_book.display()

In [None]:
df_book.groupBy('user').agg(collect_list('book')).display()

#### **PIVOT**

In [None]:
df.groupBy('Item_Type').pivot('Outlet_Size').agg(avg('Item_MRP')).display()

#### **WHEN-OTHERWISE**

In [None]:
df = df.withColumn('Veg_Flag', when(col('Item_Type') == "Meat", "Non-Veg").otherwise('Veg'))
df.display()

In [None]:
df.withColumn('Veg_Exp_Flag',when(((col('Veg_Flag')=='Veg') & (col('Item_MRP')<100)),'Veg_Inexpensive')\
                            .when((col('Veg_Flag')=='Veg') & (col('Item_MRP')>100),'Veg_Expensive')\
                            .otherwise('Non_Veg')).display()

### **JOINS**

In [None]:
dataj1 = [('1','gaur','d01'),
          ('2','kit','d02'),
          ('3','sam','d03'),
          ('4','tim','d03'),
          ('5','aman','d05'),
          ('6','nad','d06')]
schemaj1 = 'emp_id STRING, emp_name STRING, dept_id STRING'
df1 = spark.createDataFrame(dataj1,schemaj1)

dataj2 = [('d01','HR'),
          ('d02','Marketing'),
          ('d03','Accounts'),
          ('d04','IT'),
          ('d05','Finance')]
schemaj2 = 'dept_id STRING, department STRING'
df2 = spark.createDataFrame(dataj2,schemaj2)

In [None]:
df1.display()

In [None]:
df2.display()

#### **INNER, LEFT, RIGHT, ANTI**

In [None]:
df1.join(df2, df1['dept_id'] == df2['dept_id'],'inner').display()

In [None]:
df1.join(df2, df1['dept_id'] == df2['dept_id'], 'left').display()

In [None]:
df1.join(df2, df1['dept_id'] == df2['dept_id'], 'right').display()

In [None]:
# Anti join returns data that is present in one frame BUT NOT in the other frame

df1.join(df2, df1['dept_id'] == df2['dept_id'], 'anti').display()

In [None]:
# Anti join returns data that is present in one frame BUT NOT in the other frame

df2.join(df1, df1['dept_id'] == df2['dept_id'], 'anti').display()

### **WINDOW FUNCTIONS**

In [None]:
from pyspark.sql.window import Window

In [None]:
df.withColumn('Row_Col', row_number().over(Window.orderBy('Item_Identifier'))).display()

#### **RANK() and DENSE_RANK()**

In [None]:
df.withColumn('Rank', rank().over(Window.orderBy(col('Item_Identifier').desc())))\
  .withColumn('DenseRank', dense_rank().over(Window.orderBy(col('Item_Identifier').desc()))).display()

#### **Cumulative Sum**

In [None]:
# Notice the CumSum column in the output; it's NOT the desired cumulative sum

df.withColumn('CumSum', sum('Item_MRP').over(Window.orderBy('Item_Type'))).display()

In [None]:
# This is the correct cumulative sum

df.withColumn('CumSum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.currentRow))).display()

In [None]:
# TOTAL SUM

df.withColumn('TotalSum',sum('Item_MRP').over(Window.orderBy('Item_Type').rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).display()

#### **User Defined Functions**

In [None]:
def my_func(x):
    return x*x

my_udf = udf(my_func)   # 'udf' is inbuilt to invoke user defined function

df.withColumn('MyNewCol', my_udf('Item_MRP')).display()

#### **DATA WRITING**

In [None]:
df.write.format('csv').save('/FileStore/tables/CSV/data.csv')

##### **Data Writing Modes -- Append, Overwrite, Error, Ignore**

In [None]:
df.write.format('csv')\
        .mode('append')\
        .save('/FileStore/tables/CSV/data.csv')

df.write.format('csv')\
        .mode('append')\
        .option('path','/FileStore/tables/CSV/data.csv')\
        .save()

df.write.format('csv')\
    .mode('overwrite')\
    .option('path','/FileStore/tables/CSV/data.csv')\
    .save()

##### **Error mode (Following code SHOULD throw error)**

In [None]:
df.write.format('csv')\
    .mode('error')\
    .option('path','/FileStore/tables/CSV/data.csv')\
    .save()

##### **Ignore error mode**

In [None]:
df.write.format('csv')\
  .mode('ignore')\
  .option('path','/FileStore/tables/CSV/data.csv')\
  .save()

##### **PARQUET format**



In [None]:
df.write.format('parquet')\
    .mode('overwrite')\
    .option('path','/FileStore/tables/CSV/data.csv')\
    .save()

##### **Table format**

In [None]:
df.write.format('parquet')\
    .mode('overwrite')\
    .saveAsTable('my_table')

#### **Spark SQL**

In [None]:
df.display()

In [None]:
df.createTempView('my_view')  # temporary views are purged as session ends

In [None]:
# MAGIC %sql
select * from my_view

In [None]:
# MAGIC %sql
select * from my_view where Item_Fat_Content = 'Low Fat'

In [None]:
df_sql = spark.sql("select * from my_view where Item_Fat_Content = 'Low Fat'")

In [None]:
df_sql.display()