<a href="https://colab.research.google.com/github/aaubs/ds-master/blob/main/notebooks/M6_Performing_a_Big_Data_workflow_with_Spark_Part3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Advanced Tutorial


### Spark Catalog

In [None]:
# If you have used Spark for a while now, this is a good time to learn about spark Catalog.
# you can also totally skip this section since it is totally independed of what follows.

In [None]:
# get all the databases in the database. 
spark.catalog.listDatabases()

[Database(name='default', description='default database', locationUri='file:/content/spark-warehouse')]

In [None]:
# get the name of the current database
spark.catalog.currentDatabase()

'default'

In [None]:
## lists tables
spark.catalog.listTables()

[Table(name='df_test', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='mytable', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [None]:
# add a table to the catalog
df_train.createOrReplaceTempView("df_train")

In [None]:
# list tables
spark.catalog.listTables()

[Table(name='df_test', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='df_train', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='mytable', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [None]:
# Caching
# cached table "df_train"
spark.catalog.cacheTable("df_train")

In [None]:
# checks if the table is cached
spark.catalog.isCached("df_train")

True

In [None]:
spark.catalog.isCached("df_test")

False

In [None]:
# lets cahche df_test as well
spark.catalog.cacheTable("df_test")

In [None]:
spark.catalog.isCached("df_test")

True

In [None]:
# let's uncache df_train
spark.catalog.uncacheTable("df_train")

In [None]:
spark.catalog.isCached("df_train")

False

In [None]:
spark.catalog.isCached("df_test")

True

In [None]:
# How about clearing all cached tables at once. 
spark.catalog.clearCache()

In [None]:
spark.catalog.isCached("df_train")

False

In [None]:
# creating a global temp view
df_train.createGlobalTempView("df_train")

In [None]:
# listing all views in global_temp
spark.sql("SHOW VIEWS IN global_temp;").show()

+-----------+--------+-----------+
|  namespace|viewName|isTemporary|
+-----------+--------+-----------+
|global_temp|df_train|       true|
|           | df_test|       true|
|           |df_train|       true|
|           | mytable|       true|
+-----------+--------+-----------+



In [None]:
# dropping a table. 
spark.catalog.dropGlobalTempView("df_train")

True

In [None]:
# checking that global temp view is dropped.
spark.sql("SHOW VIEWS IN global_temp;").show()

+---------+--------+-----------+
|namespace|viewName|isTemporary|
+---------+--------+-----------+
|         | df_test|       true|
|         |df_train|       true|
|         | mytable|       true|
+---------+--------+-----------+



In [None]:
spark.catalog.dropTempView("df_train")

True

In [None]:
# checking that global temp view is dropped.
spark.sql("SHOW VIEWS IN global_temp;").show()

+---------+--------+-----------+
|namespace|viewName|isTemporary|
+---------+--------+-----------+
|         | df_test|       true|
|         | mytable|       true|
+---------+--------+-----------+



In [None]:
spark.sql("SHOW VIEWS").show()

+---------+--------+-----------+
|namespace|viewName|isTemporary|
+---------+--------+-----------+
|         | df_test|       true|
|         | mytable|       true|
+---------+--------+-----------+



## Dealing with Missing Values
### Cabin

In [None]:
# filling the null values in cabin with "N".
# df.fillna(value, subset=[]);
df_train = df_train.na.fill('N', subset=['Cabin'])
df_test = df_test.na.fill('N', subset=['Cabin'])

### Fare

In [None]:
## how do we find out the rows with missing values?
# we can use .where(condition) with .isNull()
df_test.where(df_test['Fare'].isNull()).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



Here, We can take the average of the **Fare** column to fill in the NaN value. However, for the sake of learning and practicing, we will try something else. We can take the average of the values where **Pclass** is ***3***, **Sex** is ***male*** and **Embarked** is ***S***

In [None]:
missing_value = df_test.filter(
    (df_test['Pclass'] == 3) &
    (df_test.Embarked == 'S') &
    (df_test.Sex == "male")
)
## filling in the null value in the fare column using Fare mean. 
df_test = df_test.na.fill(
    missing_value.select(mean('Fare')).collect()[0][0],
    subset=['Fare']
)

In [None]:
# Checking
df_test.where(df_test['Fare'].isNull()).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



### Embarked

In [None]:
df_train.where(df_train['Embarked'].isNull()).show()

+-----------+--------+------+-------------------+------+----+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|               Name|   Sex| Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+-------------------+------+----+-----+-----+------+----+-----+--------+
|         62|       1|     1|Icard, Miss. Amelie|female|38.0|    0|    0|113572|80.0|  B28|    null|
+-----------+--------+------+-------------------+------+----+-----+-----+------+----+-----+--------+



In [None]:
## Replacing the null values in the Embarked column with the mode. 
df_train = df_train.na.fill('C', subset=['Embarked'])

In [None]:
## checking
df_train.where(df_train['Embarked'].isNull()).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



In [None]:
df_test.where(df_test.Embarked.isNull()).show()

+-----------+--------+------+--------------------+------+----+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+------+----+-----+--------+
|        830|       1|     1|Stone, Mrs. Georg...|female|62.0|    0|    0|113572|80.0|  B28|    null|
+-----------+--------+------+--------------------+------+----+-----+-----+------+----+-----+--------+



## Feature Engineering
### Cabin

In [None]:
## this is a code to create a wrapper for function, that works for both python and Pyspark.
from typing import Callable
from pyspark.sql import Column
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType, IntegerType, ArrayType, DataType
class py_or_udf:
    def __init__(self, returnType : DataType=StringType()):
        self.spark_udf_type = returnType
        
    def __call__(self, func : Callable):
        def wrapped_func(*args, **kwargs):
            if any([isinstance(arg, Column) for arg in args]) or \
                any([isinstance(vv, Column) for vv in kwargs.values()]):
                return udf(func, self.spark_udf_type)(*args, **kwargs)
            else:
                return func(*args, **kwargs)
        return wrapped_func

    
@py_or_udf(returnType=StringType())
def first_char(col):
    return col[0]
    

In [None]:
df_train = df_train.withColumn('Cabin', first_char(df_train['Cabin']))

In [None]:
df_test = df_test.withColumn('Cabin', first_char(df_test['Cabin']))

In [None]:
df_train.limit(5).toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,N,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,N,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,N,S


We can use the average of the fare column We can use pyspark's ***groupby*** function to get the mean fare of each cabin letter.

In [None]:
df_train.groupBy('Cabin').mean("Fare").show()

+-----+------------------+
|Cabin|         avg(Fare)|
+-----+------------------+
|    F|20.112033333333333|
|    E|53.944447619047615|
|    T|              35.5|
|    B| 94.72019230769232|
|    D| 59.19123157894737|
|    C|102.14492608695654|
|    A|43.487500000000004|
|    N|19.307918358531317|
|    G|          13.58125|
+-----+------------------+



Now, these mean can help us determine the unknown cabins, if we compare each unknown cabin rows with the given mean's above. Let's write a simple function so that we can give cabin names based on the means. 

In [None]:
@py_or_udf(returnType=StringType())
def cabin_estimator(i):
    """Grouping cabin feature by the first letter"""
    a = 0
    if i<16:
        a = "G"
    elif i>=16 and i<27:
        a = "F"
    elif i>=27 and i<38:
        a = "T"
    elif i>=38 and i<47:
        a = "A"
    elif i>= 47 and i<53:
        a = "E"
    elif i>= 53 and i<54:
        a = "D"
    elif i>=54 and i<116:
        a = 'C'
    else:
        a = "B"
    return a

In [None]:
## separating data where Cabin == 'N', remeber we used 'N' for Null. 
df_withN = df_train.filter(df_train['Cabin'] == 'N')
df2 = df_train.filter(df_train['Cabin'] != 'N')

## replacing 'N' using cabin estimated function. 
df_withN = df_withN.withColumn('Cabin', cabin_estimator(df_withN['Fare']))

# putting the dataframe back together. 
df_train = df_withN.union(df2).orderBy('PassengerId') 

In [None]:
#let's do the same for test set
df_testN = df_test.filter(df_test['Cabin'] == 'N')
df_testNoN = df_test.filter(df_test['Cabin'] != 'N')
df_testN = df_testN.withColumn('Cabin', cabin_estimator(df_testN['Fare']))
df_test = df_testN.union(df_testNoN).orderBy('PassengerId')

### Name

In [None]:
## creating UDF functions
@py_or_udf(returnType=IntegerType())
def name_length(name):
    return len(name)


@py_or_udf(returnType=StringType())
def name_length_group(size):
    a = ''
    if (size <=20):
        a = 'short'
    elif (size <=35):
        a = 'medium'
    elif (size <=45):
        a = 'good'
    else:
        a = 'long'
    return a

In [None]:
## getting the name length from name. 
df_train = df_train.withColumn("name_length", name_length(df_train['Name']))

## grouping based on name length. 
df_train = df_train.withColumn("nLength_group", name_length_group(df_train['name_length']))

In [None]:
## Let's do the same for test set. 
df_test = df_test.withColumn("name_length", name_length(df_test['Name']))

df_test = df_test.withColumn("nLength_group", name_length_group(df_test['name_length']))

### Title

In [None]:
## this function helps getting the title from the name. 
@py_or_udf(returnType=StringType())
def get_title(name):
    return name.split('.')[0].split(',')[1].strip()

df_train = df_train.withColumn("title", get_title(df_train['Name']))
df_test = df_test.withColumn('title', get_title(df_test['Name']))

In [None]:
## we are writing a function that can help us modify title column
@py_or_udf(returnType=StringType())
def fuse_title1(feature):
    """
    This function helps modifying the title column
    """
    if feature in ['the Countess','Capt','Lady','Sir','Jonkheer','Don','Major','Col', 'Rev', 'Dona', 'Dr']:
        return 'rare'
    elif feature in ['Ms', 'Mlle']:
        return 'Miss'
    elif feature == 'Mme':
        return 'Mrs'
    else:
        return feature

In [None]:
df_train = df_train.withColumn("title", fuse_title1(df_train["title"]))

In [None]:
df_test = df_test.withColumn("title", fuse_title1(df_test['title']))

In [None]:
print(df_train.toPandas()['title'].unique())
print(df_test.toPandas()['title'].unique())

['Mr' 'Mrs' 'Miss' 'Master' 'rare']
['Mrs' 'Mr' 'Miss' 'rare' 'Master']


### family_size

In [None]:
df_train = df_train.withColumn("family_size", df_train['SibSp']+df_train['Parch'])
df_test = df_test.withColumn("family_size", df_test['SibSp']+df_test['Parch'])

In [None]:
## bin the family size. 
@py_or_udf(returnType=StringType())
def family_group(size):
    """
    This funciton groups(loner, small, large) family based on family size
    """
    
    a = ''
    if (size <= 1):
        a = 'loner'
    elif (size <= 4):
        a = 'small'
    else:
        a = 'large'
    return a

In [None]:
df_train = df_train.withColumn("family_group", family_group(df_train['family_size']))
df_test = df_test.withColumn("family_group", family_group(df_test['family_size']))


### is_alone

In [None]:
@py_or_udf(returnType=IntegerType())
def is_alone(num):
    if num<2:
        return 1
    else:
        return 0

In [None]:
df_train = df_train.withColumn("is_alone", is_alone(df_train['family_size']))
df_test = df_test.withColumn("is_alone", is_alone(df_test["family_size"]))

### ticket

In [None]:
## dropping ticket column
df_train = df_train.drop('ticket')
df_test = df_test.drop("ticket")

### calculated_fare

In [None]:
from pyspark.sql.functions import expr, col, when, coalesce, lit

In [None]:
## here I am using a something similar to if and else statement, 
#when(condition, value_when_condition_met).otherwise(alt_condition)
df_train = df_train.withColumn(
    "calculated_fare", 
    when((col("Fare")/col("family_size")).isNull(), col('Fare'))
    .otherwise((col("Fare")/col("family_size"))))

In [None]:
df_test = df_test.withColumn(
    "calculated_fare", 
    when((col("Fare")/col("family_size")).isNull(), col('Fare'))
    .otherwise((col("Fare")/col("family_size"))))

### fare_group

In [None]:
@py_or_udf(returnType=StringType())
def fare_group(fare):
    """
    This function creates a fare group based on the fare provided
    """
    
    a= ''
    if fare <= 4:
        a = 'Very_low'
    elif fare <= 10:
        a = 'low'
    elif fare <= 20:
        a = 'mid'
    elif fare <= 45:
        a = 'high'
    else:
        a = "very_high"
    return a

In [None]:
df_train = df_train.withColumn("fare_group", fare_group(col("Fare")))
df_test = df_test.withColumn("fare_group", fare_group(col("Fare")))