In [None]:
from typing import Final, Literal, Optional, Callable, Union, List

In [None]:
# Deleting tables left from previous runs in case they still exist after deleting an inactive cluster
dbutils.fs.rm("/user", recurse=True)

Out[80]: True

In [None]:
# We need to install 'ipython_unittest' to run unittests in a Jupyter notebook
!pip install -q ipython_unittest

You should consider upgrading via the '/databricks/python3/bin/python -m pip install --upgrade pip' command.[0m


In [None]:
# Loading PySpark modules that we need
import unittest
from collections import Counter
from pyspark.sql import DataFrame, Column
from pyspark.sql.types import *

#### Subtask 1: defining the schema for the data
Typically, the first thing to do before loading the data into a Spark cluster is to define the schema for the data. Look at the schema for 'badges' and try to define the schema for other tables similarly.

[Data Types Documentation](https://spark.apache.org/docs/3.3.1/api/python/reference/pyspark.sql/data_types.html)

In [None]:
# Defining a schema for 'badges' table
badges_schema = StructType([
    StructField('UserId', IntegerType(), False),
    StructField('Name', StringType(), False),
    StructField('Date', TimestampType(), False),
    StructField('Class', IntegerType(), False)
])

# Defining a schema for 'posts' table
posts_schema = StructType([
    StructField("Id", IntegerType(), False),
    StructField("ParentId", IntegerType(), False),
    StructField("PostTypeId", IntegerType(), False),
    StructField("CreationDate", TimestampType(), False),
    StructField("Score", IntegerType(), False),
    StructField("ViewCount", IntegerType(), False),
    StructField("Body", StringType(), False), 
    StructField("OwnerUserId", IntegerType(), False),
    StructField("LastActivityDate", TimestampType(), False),
    StructField("Title", StringType(), False),
    StructField("Tags", StringType(), False), # transform to ArrayType(StringType()) later
    StructField("AnswerCount", IntegerType(), False),
    StructField("CommentCount", IntegerType(), False),
    StructField("FavoriteCount", IntegerType(), False),
    StructField("CloseDate", TimestampType(), False)
])

# Defining a schema for 'users' table
users_schema = StructType([
    StructField("Id", IntegerType(), False),
    StructField("Reputation", IntegerType(), False),
    StructField("CreationDate", TimestampType(), False),
    StructField("DisplayName", StringType(), False),
    StructField("LastAccessDate", TimestampType(), False),
    StructField("AboutMe", StringType(), False),
    StructField("Views", IntegerType(), False),
    StructField("UpVotes", IntegerType(), False),
    StructField("DownVotes", IntegerType(), False),
])

# Defining a schema for 'comments' table
comments_schema = StructType([
    StructField("PostId", IntegerType(), False),
    StructField("Score", IntegerType(), False),
    StructField("Text", StringType(), False),
    StructField("CreationDate", TimestampType(), False),
    StructField("UserId", IntegerType(), False)
])

#### Subtask 2: implementing two helper functions
Next, we need to implement two helper functions:
1. 'load_csv' that as input argument receives path for a CSV file and a schema and loads the CSV pointed by the path into a Spark DataFrame and returns the DataFrame;
2. 'save_df' receives a Spark DataFrame and saves it as a Parquet file on DBFS.

Note that the column separator in CSV files is TAB character ('\t') and the first row includes the name of the columns. 

BTW, DBFS is the name of the distributed filesystem used by Databricks Community Edition to store and access data.

[Data Sources Documentation](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html)

In [None]:
TABLE_ROOT_PATH: Final[Literal["/FileStore/tables/"]] = "/FileStore/tables"
PARQUET_ROOT_PATH: Final[Literal["/user/hive/warehouse"]] = "/user/hive/warehouse"

def load_csv(source_file: str, schema: StructType) -> DataFrame:
    """
    Arguments:
        source_file: path for the CSV file to load
        schema: schema for the CSV file being loaded as a DataFrame
        post_read: an optional callable that takes in the DataFrame as a parameter, and returns a DataFrame
    """
    df = spark.read.option("delimiter", "\t").option("header", True).schema(schema).csv(source_file)
    return df

def save_df(df: DataFrame, table_name: str) -> None:
    """
    Arguments:
        df: DataFrame to be saved
        table_name: name under which the DataFrame will be saved
    """
    df.write.save(f"{PARQUET_ROOT_PATH}/{table_name}", format="parquet", mode="overwrite")


In [None]:
# Loading 'ipython_unittest' so we can use '%%unittest_main' magic command
%load_ext ipython_unittest

The ipython_unittest extension is already loaded. To reload it, use:
  %reload_ext ipython_unittest


#### Subtask 3: validating the implementation by running the tests

Run the cell below and make sure that all the tests run successfully. Moreover, at the end there should be four Parquet files named 'badges', 'comments', 'posts', and 'users' in '/user/hive/warehouse'.

Note that we assumed that the data for the project has already been stored on DBFS on the '/FileStore/tables/' path. (I mean as 'badges_csv.gz', 'comments_csv.gz', 'posts_csv.gz', and 'users_csv.gz'.)

In [None]:
%%unittest_main
class TestTask1(unittest.TestCase):
   
    # test 1
    def test_load_badges(self):
        # changed file path to badges.csv
        result = load_csv(source_file="/FileStore/tables/badges.csv", schema=badges_schema)
        self.assertIsNotNone(result, "Badges dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 105640, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower, ['UserId', 'Name', 'Date', 'Class']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    
    # test 2
    def test_load_posts(self):
        # changed file path to posts.csv
        result = load_csv(source_file="/FileStore/tables/posts.csv", schema=posts_schema)
        self.assertIsNotNone(result, "Posts dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 61432, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower,
                                   ['Id', 'ParentId', 'PostTypeId', 'CreationDate', 'Score', 'ViewCount', 'Body', 'OwnerUserId',
                                    'LastActivityDate', 'Title', 'Tags', 'AnswerCount', 'CommentCount', 'FavoriteCount',
                                    'CloseDate']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    
    # test 3
    def test_load_comments(self):
        # changed file path to comments.csv
        result = load_csv(source_file="/FileStore/tables/comments.csv", schema=comments_schema)
        self.assertIsNotNone(result, "Comments dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 58735, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower, ['PostId', 'Score', 'Text', 'CreationDate', 'UserId']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    
    # test 4
    def test_load_users(self):
        # changed file path to users.csv
        result = load_csv(source_file="/FileStore/tables/users.csv", schema=users_schema)
        self.assertIsNotNone(result, "Users dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 91616, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower,
                                   ['Id', 'Reputation', 'CreationDate', 'DisplayName', 'LastAccessDate', 'AboutMe',
                                    'Views', 'UpVotes', 'DownVotes']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    # test 5
    def test_save_dfs(self):
        dfs = [("/FileStore/tables/users.csv", users_schema, "users"),
               ("/FileStore/tables/badges.csv", badges_schema, "badges"),
               ("/FileStore/tables/comments.csv", comments_schema, "comments"),
               ("/FileStore/tables/posts.csv", posts_schema, "posts")
               ]

        for i in dfs:
            df = load_csv(source_file=i[0], schema=i[1])
            save_df(df, i[2])



Success

.....
----------------------------------------------------------------------
Ran 5 tests in 22.952s

OK
Out[86]: <unittest.runner.TextTestResult run=5 errors=0 failures=0>

##### Conversions, decoding, and post-processing prior to saving
Some of the columns need additional processing prior to being used in task 2. The following processing steps are performed here, and the resulting tables are saved.
We weren't quite sure where to put this, so we just put it here 😇
###### Posts
- Convert the `Tags` column from a string to an array of strings
- Base64 decode `Body` and `Title`
###### Comments
- Base64 decode `Text`

In [None]:
from pyspark.sql.functions import split, col, transform, regexp_replace, udf, unbase64

@udf(returnType=ArrayType(StringType()))
def to_array(string: Optional[str]) -> List[str]:
  """
  Convert a string of format <item-1><item-2><item-3> to a list
  ["list-1", "list-2", "list-3"]
  """
  if string is not None:
    return string.strip("<>").split("><")

In [None]:
# Handle the tag array fields after loading the CSVs
posts_df = load_csv(source_file="/FileStore/tables/posts.csv", schema=posts_schema)
posts_df = posts_df.withColumn("Tags", to_array("Tags"))
posts_df = posts_df.withColumn("Body", unbase64("Body").cast("string"))
posts_df = posts_df.withColumn("Title", unbase64("Title").cast("string"))
save_df(posts_df, "posts")

In [None]:
# Handle the tag array fields after loading the CSVs
comments_df = load_csv(source_file="/FileStore/tables/comments.csv", schema=comments_schema)
comments_df = comments_df.withColumn("Text", unbase64("Text").cast("string"))
save_df(comments_df, "comments")

#### Subtask 4: answering to questions about Spark related concepts

Please write a short description for the terms below---one to two short paragraphs for each term. Don't copy-paste; instead, write your own understanding.

1. What do the terms 'Spark Application', 'SparkSession', 'Transformations', 'Action', and 'Lazy Evaluation' mean in the context of Spark?

Write your descriptions in the next cell.

#### Spark Application

A **Spark application** is a user-defined program that uses Spark, typically in the form of JAR files (from Scala/Java) or Python files. These files are **submitted** to run the application in a **cluster** of nodes. At runtime, an application consists of a **driver**, which coordinates the program, and **executors**, which run computations on data. There are two main deployment modes:
- **Client mode** runs the driver locally, allowing for interacting with the cluster through a Spark shell
- **Cluster mode** deploys the driver in the cluster - typically preferred when shell interaction is not required, to minimize network latency between drivers and executors

In both of these modes, the driver assigns work to executors, which run on **worker nodes** in the cluster. They all communicate with a **cluster manager** to manage resources.

#### SparkSession

**SparkSession** is essentially the entry point to Spark's API in an application. It allows users to create data structures like **Datasets** and **DataFrames**, and use **Spark SQL**. It also contains a **SparkContext** - in the deployment model described above, it represents the connection to the cluster. This provides an abstraction layer to the user, giving the power of the cluster through a relatively simple API.

#### Resilient Distributed Datasets (extra)

In order to properly explain transformations and actions below, we felt the need to first explain **Resilient Distributed Datasets (RDDS)**. An RDD is an immutable, patitioned collection of objects, which may derive from several different data sources - most typical is a data store such as HDFS or a database. It is a central data structure that Spark operates on.

The **resilient** part of the name comes from the fact that a partition of an RDD that is corrupted or otherwise lost can be recovered by "replaying" the transformations (explained below) on the datasets it derives from. It keeps a history of the transformations to apply to it, called **lineage**.

#### Transformations

A **transformation** is an operation on an RDD that produces a *new, transformed* RDD. Examples of transformations are `map`, `filter` and `flatMap`, all of which take a function as an argument, and applies it on every element of the RDD to transform it.

There are also transformations that work on 2 RDDs to **combine** them into a transformed RDD. Examples of this are `union`, `intersection` and `subtract`, all of which operate on one RDD and take another RDD as an argument. In addition, there are transformations that work specifically for RDDs of `(key, value)` pairs, such as `reduceByKey`, `groupByKey` and `mapValues`. Finally, there are transformations that work on 2 `(key, value)`-pair RDDs, such as `subtractByKey` and `join`.

#### Action

An **action** is an operation on an RDD that produces a **result**. This result can vary in type depending on the action, but the key difference from a transformation is that the result is not a new RDD. Examples of actions are `collect`, `count` and `reduce`.

Similarly to transformations, there are also actions that work specifically on `(key, value)` pairs. Examples of this are `countByKey`, `collectAsMap` and `lookup`.

#### Lazy Evaluation

**Lazy evaluation** in the context of Spark refers to how transformations and actions are evaluated. Since an RDD doesn't really "matter" until it is used for some result, the evaluation of _transformations_ on the RDD can be delayed until an _action_ is called on it. This is what is meant by "lazy": postponing actually _doing_ the transformation until it is needed. This is similar to the concept of **iterators** in many programming languages.