# Spark Interview 1: Date types


Loads PySpark and basic modules

In [2]:
# module `findspark` is useful when running Apache Spark on Windows
import findspark
findspark.init()

# pyspark modules
from pyspark.sql import SparkSession
from pyspark.sql import Row

from datetime import date

session = SparkSession.builder.appName("Spark Interview 1").getOrCreate()

Creates a Spark dataframe with dummy content

In [3]:


df = session.createDataFrame([
    Row(project="Project 1", start=date(2022,7,15),end=date(2022,7,16)),
    Row(project="Project 2", start=date(2022,7,1),end=None),
    Row(project="Project 3", start=date(2022,6,9),end=date(2022,6,12))],
    #schema="project_name string,  start_date date,  end_date date"
)

df.show()


+---------+----------+----------+
|  project|     start|       end|
+---------+----------+----------+
|Project 1|2022-07-15|2022-07-16|
|Project 2|2022-07-01|      null|
|Project 3|2022-06-09|2022-06-12|
+---------+----------+----------+



**Problem 1.** Calculate the duration of projects. The end date is inclusive - e.g. a project that starts on 14 July 2022 and ends on 15 July 2022 lasts for 2 days. 

In [4]:
from pyspark.sql.functions import datediff,date_add, when

def calculate_duration(df):
    """
    Creates a new column containing the difference between `end` and `start` dates plus 1 (to include last day).
    
    """
    return df.withColumn("project_length", datediff("end","start") + 1)

df_a = calculate_duration(df)
df_a.show()

+---------+----------+----------+--------------+
|  project|     start|       end|project_length|
+---------+----------+----------+--------------+
|Project 1|2022-07-15|2022-07-16|             2|
|Project 2|2022-07-01|      null|          null|
|Project 3|2022-06-09|2022-06-12|             4|
+---------+----------+----------+--------------+



**Problem 2.** Now let's improve the previous function. When the `end` column is `null`, calculate project length until today. Sort in descending order of project length (longest duration first).

In [36]:
from pyspark.sql.functions import datediff,date_add, when, isnull, current_date, desc

def calculate_duration_2(df):
    """
    Builds from previous function. Uses built-in conditional statement (`when` and `otherwise` functions). If `end` column is empty, uses the today's date (`current_date` function instead). 
    """
    df2 = df.withColumn("project_length", when(isnull("end"), datediff(current_date(),"start")+1).\
                                otherwise(datediff("end","start")+1)).sort(desc("project_length"))
    return df2

df_b = calculate_duration_2(df_a)
df_b.show()

+---------+----------+----------+--------------+
|  project|     start|       end|project_length|
+---------+----------+----------+--------------+
|Project 2|2022-07-01|      null|            15|
|Project 3|2022-06-09|2022-06-12|             4|
|Project 1|2022-07-15|2022-07-16|             2|
+---------+----------+----------+--------------+



## Test

Create unit test cases for the functions above using `pytest`.

TODO: shorten test cases.

In [35]:

import pytest

# needed to run pytest tests in a jupyter notebook
import ipytest
ipytest.autoconfig()

from datetime import date, timedelta

def test_calculate_duration_basic_case():
    """
    Tests basic case of `calculate_duration()`.
    """
    df = session.createDataFrame([
        Row(project="Project 1", start=date(2022,7,15),end=date(2022,7,16))],)
    df.createOrReplaceTempView("df")
    
    df_expected = session.createDataFrame([Row(project="Project 1", start=date(2022,7,15),end=date(2022,7,16), project_length=2)],)
    df_expected.createOrReplaceTempView("df_expected")
    
    df_result = calculate_duration(df)
    
    assert df_result.collect() == df_expected.collect()
    
def test_calculate_duration_end_null():
    """
    Tests `calculate_duration_2()`: end date is empty.
    """
    start_date = date(2022,7,15)
    curr_date = date.today()
    expected_length = (curr_date - start_date).days + 1
    
    df = session.createDataFrame([
        ("Project 1", start_date, None)],
        schema="project string, start date, end date")
    df.createOrReplaceTempView("df")
    

    df_expected = session.createDataFrame([
        ("Project 1", date(2022,7,15), None, expected_length)],
        schema="project string, start date, end date, project_length int")
    df_expected.createOrReplaceTempView("df_expected")
    
    df_result = calculate_duration_2(df)
    
    assert df_result.collect() == df_expected.collect()

    
def test_calculate_duration_with_desc_sort():
    """
    Tests `calculate_duration_2()`: sort in descending order.
    """
    df = session.createDataFrame([
        ("Project 1", date(2022,4,10), date(2022,4,12)),
        ("Project 2", date(2022,4,21), date(2022,4,25)),],
        schema="project string, start date, end date")
    df.createOrReplaceTempView("df")

    df_expected  = session.createDataFrame([
        ("Project 2", date(2022,4,21), date(2022,4,25), 5),
    ("Project 1", date(2022,4,10), date(2022,4,12), 3),],
        schema="project string, start date, end date, project_length int")
    df_expected.createOrReplaceTempView("df_expected")
    
    df_result = calculate_duration_2(df)
    
    assert df_result.collect() == df_expected.collect()    
    
def test_calculate_duration_with_end_null_and_desc_sort():
    """
    Tests `calculate_duration_2()`: end date is empty and sort in descending order.
    """
    
    # today's date can vary, therefore start_date is today minus 2 days
    start_date = date.today() - timedelta(days=2)
    curr_date = date.today()
    # expected project length
    expected_length = (curr_date - start_date).days + 1
    
    df = session.createDataFrame([
        ("Project 1", start_date, None),
        ("Project 2", date(2022,4,21), date(2022,4,25)),],
        schema="project string, start date, end date")
    df.createOrReplaceTempView("df")
    
    # expected resulting dataframe (uses `expected_length`)
    df_expected  = session.createDataFrame([
        ("Project 2", date(2022,4,21), date(2022,4,25), 5),
        ("Project 1", start_date, None, expected_length),],
        schema="project string, start date, end date, project_length int")
    df_expected.createOrReplaceTempView("df_expected")
    
    df_result = calculate_duration_2(df)
    
    assert df_result.collect() == df_expected.collect()    

# runs all tests in this notebook cell`
ipytest.run()

[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m                                                                                      [100%][0m
[32m[32m[1m7 passed[0m[32m in 124.44s (0:02:04)[0m[0m


<ExitCode.OK: 0>