##  Python Data Science

> Exercises: Introduction to PySpark

Kuo, Yao-Jen <yaojenkuo@datainpoint.com> from [DATAINPOINT](https://www.datainpoint.com)

In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat

## Define a function called `create_spark_session` that is able to return 2 objects: a SparkContext and a SparkSession.

- Expected inputs: None.
- Expected outputs: a SparkContext and a SparkSession.

In [2]:
def create_spark_session():
    """
    >>> sc, spark = create_spark_session()
    >>> print(type(sc))
    <class 'pyspark.context.SparkContext'>
    >>> print(type(spark))
    <class 'pyspark.sql.session.SparkSession'>
    >>> print(sc.version)
    3.0.0
    """
    ### BEGIN SOLUTION
    sc = SparkContext.getOrCreate()
    spark = SparkSession(sc)
    return sc, spark
    ### END SOLUTION

## Define a function named `create_spark_dataframe` that is able to create a Spark DataFrame from scratch.

- Expected inputs: a SparkSession.
- Expected outputs: a (4, 3) Spark DataFrame.

In [3]:
def create_spark_dataframe(spark):
    """
    >>> sc, spark = create_spark_session()
    >>> spark_dataframe = create_spark_dataframe(spark)
    >>> print(type(spark_dataframe))
    <class 'pyspark.sql.dataframe.DataFrame'>
    >>> spark_dataframe.show()
    +--------------------+------------+-----------+
    |               title|release_year|imdb_rating|
    +--------------------+------------+-----------+
    |        The Avengers|        2012|        8.0|
    |Avengers: Age of ...|        2015|        7.3|
    |Avengers: Infinit...|        2018|        8.4|
    |   Avengers: Endgame|        2019|        8.4|
    +--------------------+------------+-----------+
    >>> print(spark_dataframe.count())
    4
    >>> print(len(spark_dataframe.dtypes))
    3
    """
    ### BEGIN SOLUTION
    columns = ["title", "release_year", "imdb_rating"]
    rows = [
        ("The Avengers", 2012, 8.0),
        ("Avengers: Age of Ultron", 2015, 7.3),
        ("Avengers: Infinity War", 2018, 8.4),
        ("Avengers: Endgame", 2019, 8.4)
    ]
    spark_df = spark.createDataFrame(rows).toDF(*columns)
    return spark_df
    ### END SOLUTION

## Define a function named `read_csv_as_spark_dataframe` that is able to create a Spark DataFrame via a external CSV file.

- Expected inputs: a SparkSession and a CSV file.
- Expected outputs: a (51678, 7) Spark DataFrame.

In [4]:
def read_csv_as_spark_dataframe(spark, csv_file):
    """
    >>> sc, spark = create_spark_session()
    >>> spark_dataframe = read_csv_as_spark_dataframe(spark, 'presidential.csv')
    >>> spark_dataframe.show(5)
    +------+------+-------+------+------+-----------+-----+
    |county|  town|village|office|number|  candidate|votes|
    +------+------+-------+------+------+-----------+-----+
    |宜蘭縣|宜蘭市| 民族里|     1|     1|宋楚瑜/余湘|   37|
    |宜蘭縣|宜蘭市| 民族里|     2|     1|宋楚瑜/余湘|   31|
    |宜蘭縣|宜蘭市| 建軍里|     3|     1|宋楚瑜/余湘|   19|
    |宜蘭縣|宜蘭市| 建軍里|     4|     1|宋楚瑜/余湘|   29|
    |宜蘭縣|宜蘭市| 泰山里|     5|     1|宋楚瑜/余湘|   25|
    +------+------+-------+------+------+-----------+-----+
    only showing top 5 rows
    >>> print(spark_dataframe.count())
    51678
    >>> print(len(spark_dataframe.dtypes))
    7
    >>> spark_dataframe.dtypes[0][1]
    'string'
    >>> spark_dataframe.dtypes[-1][1]
    'int'
    """
    ### BEGIN SOLUTION
    spark_df = spark.read.csv(csv_file, header=True, inferSchema=True)
    return spark_df
    ### END SOLUTION

## Define a function named `get_votes_summary` that is able to summarize the total votes by candidate.

- Expected inputs: a SparkSession and a CSV file.
- Expected outputs: a (3, 3) Spark DataFrame.

In [5]:
def get_votes_summary(spark, csv_file):
    """
    >>> sc, spark = create_spark_session()
    >>> votes_summary = get_votes_summary(spark, 'presidential.csv')
    >>> votes_summary.show()
    +------+-------------+----------+
    |number|    candidate|sum(votes)|
    +------+-------------+----------+
    |     1|  宋楚瑜/余湘|    608590|
    |     2|韓國瑜/張善政|   5522119|
    |     3|蔡英文/賴清德|   8170231|
    +------+-------------+----------+
    >>> print(votes_summary.count())
    3
    >>> print(len(votes_summary.dtypes))
    3
    >>> print(votes_summary.head(3)[0][-1])
    608590
    >>> print(votes_summary.head(3)[1][-1])
    5522119
    >>> print(votes_summary.head(3)[2][-1])
    8170231
    """
    ### BEGIN SOLUTION
    spark_df = spark.read.csv(csv_file, header=True, inferSchema=True)
    summary_df = spark_df.groupBy('number', 'candidate').sum('votes').sort('number')
    return summary_df
    ### END SOLUTION

## Define a function named `get_combined_key` that is able to concat 3 columns: county, town, and village into 1 combined key column and let us know many distinct electoral districts are there in Taiwan.

- Expected inputs: a SparkSession and a CSV file.
- Expected outputs: a (7737, 1) Spark DataFrame.

PS Use `concat` function that we've imported for you.

In [6]:
def get_combined_key(spark, csv_file):
    """
    >>> sc, spark = create_spark_session()
    >>> combined_key = get_combined_key(spark, 'presidential.csv')
    >>> combined_key.show(5)
    +------------------+
    |      combined_key|
    +------------------+
    |南投縣中寮鄉中寮村|
    |南投縣中寮鄉內城村|
    |南投縣中寮鄉八仙村|
    |南投縣中寮鄉和興村|
    |南投縣中寮鄉崁頂村|
    +------------------+
    only showing top 5 rows
    >>> print(combined_key.count())
    7737
    >>> print(len(combined_key.dtypes))
    1
    """
    ### BEGIN SOLUTION
    spark_df = spark.read.csv(csv_file, header=True, inferSchema=True)
    combined_key = spark_df.select(concat(spark_df.county, 
                                          spark_df.town,
                                          spark_df.village).alias("combined_key"))
    distinct_combined_key = combined_key.distinct().sort('combined_key')
    return distinct_combined_key
    ### END SOLUTION

## Run tests!

In [7]:
import unittest

class TestIntroductionToSpark(unittest.TestCase):
    def test_create_spark_session(self):
        sc, spark = create_spark_session()
        self.assertIsInstance(sc, pyspark.context.SparkContext)
        self.assertIsInstance(spark, pyspark.sql.session.SparkSession)
        self.assertIsInstance(sc.version, str)
    def test_create_spark_dataframe(self):
        sc, spark = create_spark_session()
        spark_dataframe = create_spark_dataframe(spark)
        self.assertIsInstance(spark_dataframe, pyspark.sql.dataframe.DataFrame)
        self.assertEqual(spark_dataframe.count(), 4)
        self.assertEqual(len(spark_dataframe.dtypes), 3)
    def test_read_csv_as_spark_dataframe(self):
        sc, spark = create_spark_session()
        spark_dataframe = read_csv_as_spark_dataframe(spark, 'presidential.csv')
        self.assertIsInstance(spark_dataframe, pyspark.sql.dataframe.DataFrame)
        self.assertEqual(spark_dataframe.count(), 51678)
        self.assertEqual(len(spark_dataframe.dtypes), 7)
        self.assertEqual(spark_dataframe.dtypes[0][1], 'string')
        self.assertEqual(spark_dataframe.dtypes[-1][1], 'int')
    def test_get_votes_summary(self):
        sc, spark = create_spark_session()
        votes_summary = get_votes_summary(spark, 'presidential.csv')
        self.assertIsInstance(votes_summary, pyspark.sql.dataframe.DataFrame)
        self.assertEqual(votes_summary.count(), 3)
        self.assertEqual(len(votes_summary.dtypes), 3)
        self.assertEqual(votes_summary.head(3)[0][-1], 608590)
        self.assertEqual(votes_summary.head(3)[1][-1], 5522119)
        self.assertEqual(votes_summary.head(3)[2][-1], 8170231)
    def test_get_combined_key(self):
        sc, spark = create_spark_session()
        combined_key = get_combined_key(spark, 'presidential.csv')
        self.assertIsInstance(combined_key, pyspark.sql.dataframe.DataFrame)
        self.assertEqual(combined_key.count(), 7737)
        self.assertEqual(len(combined_key.dtypes), 1)

suite = unittest.TestLoader().loadTestsFromTestCase(TestIntroductionToSpark)
runner = unittest.TextTestRunner(verbosity=2)
test_results = runner.run(suite)
number_of_failures = len(test_results.failures)
number_of_errors = len(test_results.errors)
number_of_test_runs = test_results.testsRun
number_of_successes = number_of_test_runs - (number_of_failures + number_of_errors)
total_points = number_of_successes * 2

test_create_spark_dataframe (__main__.TestIntroductionToSpark) ... ok
test_create_spark_session (__main__.TestIntroductionToSpark) ... ok
test_get_combined_key (__main__.TestIntroductionToSpark) ... ok
test_get_votes_summary (__main__.TestIntroductionToSpark) ... ok
test_read_csv_as_spark_dataframe (__main__.TestIntroductionToSpark) ... ok

----------------------------------------------------------------------
Ran 5 tests in 35.109s

OK


In [8]:
print("You've got {} successes with {} points.".format(number_of_successes, total_points))

You've got 5 successes with 10 points.
