#### Names of people in the group

Please write the names of the people in your group in the next cell.

Name of person Oda Colquhoun

Name of person Emil Bjørlykke Berglund

In [0]:
# Deleting tables left from previous runs in case they still exist after deleting an inactive cluster
dbutils.fs.rm("/user", recurse=True)


In [0]:
# We need to install 'ipython_unittest' to run unittests in a Jupyter notebook
!pip install -q ipython_unittest

In [0]:
# Loading PySpark modules that we need
import unittest
from collections import Counter
from pyspark.sql import DataFrame
from pyspark.sql.types import *

#### Subtask 1: defining the schema for the data
Typically, the first thing to do before loading the data into a Spark cluster is to define the schema for the data. Look at the schema for 'badges' and try to define the schema for other tables similarly.

In [0]:
# Defining a schema for 'badges' table
badges_schema = StructType([StructField('UserId', IntegerType(), False),
                            StructField('Name', StringType(), False),
                            StructField('Date', TimestampType(), False),
                            StructField('Class', IntegerType(), False)])

# Defining a schema for 'comments' table
comments_schema = StructType([StructField('PostId', IntegerType(), False),
                            StructField('Score', IntegerType(), False),
                            StructField('Text', StringType(), False),
                            StructField('CreationDate', TimestampType(), False),
                            StructField('UserId', IntegerType(), False)])

# Defining a schema for 'posts' table
posts_schema = StructType([StructField('Id', IntegerType(), False),
                           StructField('ParentId', IntegerType(), False),
                           StructField('PostTypeId', IntegerType(), False),
                           StructField('CreationDate', TimestampType(), False),
                           StructField('Score', IntegerType(), False),
                           StructField('ViewCount', IntegerType(), False),
                           StructField('Body', StringType(), False),
                           StructField('OwnerUserId', IntegerType(), False),
                           StructField('LastActivityDate', TimestampType(), False),
                           StructField('Title', StringType(), False),
                           StructField('Tags', StringType(), False),
                           StructField('AnswerCount', IntegerType(), False),
                           StructField('CommentCount', IntegerType(), False),
                           StructField('FavoriteCount', IntegerType(), False),
                           StructField('ClosedDate', TimestampType(), False)])

# Defining a schema for 'users' table
users_schema = StructType([StructField('Id', IntegerType(), False),
                           StructField('Reputation', IntegerType(), False),
                           StructField('CreationDate', TimestampType(), False),
                           StructField('DisplayName', StringType(), False),
                           StructField('LastAccessDate', TimestampType(), False),
                           StructField('AboutMe', StringType(), False),
                           StructField('Views', IntegerType(), False),
                           StructField('UpVotes', IntegerType(), False),
                           StructField('DownVotes', IntegerType(), False)])



#### Subtask 2: implementing two helper functions
Next, we need to implement two helper functions:
1. 'load_csv' that as input argument receives path for a CSV file and a schema and loads the CSV pointed by the path into a Spark DataFrame and returns the DataFrame;
2. 'save_df' receives a Spark DataFrame and saves it as a Parquet file on DBFS.

Note that the column separator in CSV files is TAB character ('\t') and the first row includes the name of the columns. 

BTW, DBFS is the name of the distributed filesystem used by Databricks Community Edition to store and access data.

In [0]:
def load_csv(source_file: "path for the CSV file to load", schema:"schema for the CSV file being loaded as a DataFrame") -> DataFrame:
    df = spark.read.load(source_file,
                                format="csv",
                                header="true",
                                sep='\t',
                                schema=schema)
    return df

def save_df(df: "DataFrame to be saved", table_name: "name under which the DataFrame will be saved") -> None:
    df.write.format("parquet").option("parquet.enable.dictionary", "true") \
        .option("parquet.page.write-checksum.enabled", "false").mode('overwrite') \
        .saveAsTable(table_name)

In [0]:
# Loading 'ipython_unittest' so we can use '%%unittest_main' magic command
%load_ext ipython_unittest

#### Subtask 3: validating the implementation by running the tests

Run the cell below and make sure that all the tests run successfully. Moreover, at the end there should be four Parquet files named 'badges', 'comments', 'posts', and 'users' in '/user/hive/warehouse'.

Note that we assumed that the data for the project has already been stored on DBFS on the '/FileStore/tables/' path. (I mean as 'badges_csv.gz', 'comments_csv.gz', 'posts_csv.gz', and 'users_csv.gz'.)

In [0]:
def test_save_dfs(self):
        #dfs = [("/FileStore/tables/users.csv", users_schema, "users"),
         #     ("/FileStore/tables/badges.csv", badges_schema, "badges"),
          #    ("/FileStore/tables/comments.csv", comments_schema, "comments"),
           #   ("/FileStore/tables/posts.csv", posts_schema, "posts")
            #   ]
        
        dfs = [("/FileStore/tables/users_1_csv.gz", users_schema, "users"),
               ("/FileStore/tables/badges_1_csv.gz", badges_schema, "badges"),
               ("/FileStore/tables/comments_1_csv.gz", comments_schema, "comments"),
               ("/FileStore/tables/posts_1_csv.gz", posts_schema, "posts")
               ]

In [0]:
%%unittest_main
class TestTask1(unittest.TestCase):
   
    # test 1
    #result = load_csv(source_file="/FileStore/tables/badges.csv", schema=badges_schema)
    def test_load_badges(self):
        result = load_csv(source_file="/FileStore/tables/badges__1__csv.gz", schema=badges_schema)
        self.assertIsNotNone(result, "Badges dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 105640, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower, ['UserId', 'Name', 'Date', 'Class']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    
    # test 2
    def test_load_posts(self):
        #result = load_csv(source_file="/FileStore/tables/posts.csv", schema=posts_schema)
        result = load_csv(source_file="/FileStore/tables/posts__1__csv.gz", schema=posts_schema)
        self.assertIsNotNone(result, "Posts dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 61432, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower,
                                   ['Id', 'ParentId', 'PostTypeId', 'CreationDate', 'Score', 'ViewCount', 'Body', 'OwnerUserId',
                                    'LastActivityDate', 'Title', 'Tags', 'AnswerCount', 'CommentCount', 'FavoriteCount',
                                    'ClosedDate']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    
    # test 3
    def test_load_comments(self):
        #result = load_csv(source_file="/FileStore/tables/comments.csv", schema=comments_schema)
        result = load_csv(source_file="/FileStore/tables/comments__1__csv.gz", schema=comments_schema)
        self.assertIsNotNone(result, "Comments dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 58735, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower, ['PostId', 'Score', 'Text', 'CreationDate', 'UserId']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    
    # test 4
    def test_load_users(self):
        #result = load_csv(source_file="/FileStore/tables/users.csv", schema=users_schema)
        result = load_csv(source_file="/FileStore/tables/users__1__csv.gz", schema=users_schema)
        self.assertIsNotNone(result, "Users dataframe did not load successfully")
        self.assertIsInstance(result, DataFrame, "Result type is not of spark.sql.DataFrame")
        self.assertEqual(result.count(), 91616, "Number of records is not correct")

        coulmn_names = Counter(map(str.lower,
                                   ['Id', 'Reputation', 'CreationDate', 'DisplayName', 'LastAccessDate', 'AboutMe',
                                    'Views', 'UpVotes', 'DownVotes']))
        self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)),
                              "Missing column(s) or column name mismatch")
    # test 5
    def test_save_dfs(self):
        #dfs = [("/FileStore/tables/users.csv", users_schema, "users"),
         #     ("/FileStore/tables/badges.csv", badges_schema, "badges"),
          #    ("/FileStore/tables/comments.csv", comments_schema, "comments"),
           #   ("/FileStore/tables/posts.csv", posts_schema, "posts")
            #   ]
        
        dfs = [("/FileStore/tables/users__1__csv.gz", users_schema, "users"),
               ("/FileStore/tables/badges__1__csv.gz", badges_schema, "badges"),
               ("/FileStore/tables/comments__1__csv.gz", comments_schema, "comments"),
               ("/FileStore/tables/posts__1__csv.gz", posts_schema, "posts")
               ]

        for i in dfs:
            df = load_csv(source_file=i[0], schema=i[1])
            save_df(df, i[2])

#### Subtask 4: answering to questions about Spark related concepts

Please write a short description for the terms below---one to two short paragraphs for each term. Don't copy-paste; instead, write your own understanding.

1. What do the terms 'Spark Application', 'SparkSession', 'Transformations', 'Action', and 'Lazy Evaluation' mean in the context of Spark?

Write your descriptions in the next cell.

###### Spark Application: 
A Spark application is a standalone program written using the Apache Spark framework. It consists of a driver program and a set of distributed tasks that are executed on a cluster of computers. A Spark application is designed to process large amounts of data in parallel across a distributed computing environment. The driver program creates a SparkSession, which is responsible for coordinating the execution of tasks on the cluster. Spark applications can be written in various programming languages such as Java, Scala, Python, and R.
 
###### SparkSession: 
A SparkSession is the entry point to Spark SQL and provides a single point of entry to interact with Spark data processing capabilities. It is used to create DataFrames, DataSets, and RDD. SparkSession also provides configuration options that control various aspects of Spark’s behavior, such as the number of executor cores and the amount of memory to allocate for each executor. It is created once per application and can be used throughout the lifetime of the application.
 
###### Transformations: 
In Spark, Transformations are operations that are applied to a RDD (Resilient Distributed Dataset) to create a new RDD. Transformations are lazily evaluated, meaning that the computation is not executed immediately when the transformation is called. Instead, Spark builds a directed acyclic graph (DAG) of the transformations to be executed, which is then executed when an action is called. Examples of Transformations include map(), filter(), flatMap(), and reduceByKey().
 
###### Action: 
Actions are operations that trigger the execution of a computation on a RDD and return a value or write data to an external storage system. Actions are eagerly evaluated, meaning that the computation is executed immediately when the action is called. Examples of Actions include collect(), count(), reduce(), and save().
 
###### Lazy Evaluation: 
Lazy evaluation is a technique used by Spark to minimize computation and optimize performance. In Spark, Transformations are lazily evaluated, meaning that the computation is not executed immediately when the transformation is called. Instead, Spark builds a DAG of the transformations to be executed, which is then executed when an action is called. This allows Spark to optimize the execution plan and perform computation only when necessary, which can significantly improve performance.