<a href="https://colab.research.google.com/github/goff1999/SimpleLoyalty/blob/main/in-class-assignments/ica02/How_to_Read_and_Represent_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

< [Hello World](../ica01/hello_world.ipynb) | Contents (TODO) | [Data Mining](../ica03/Data_Mining.ipynb) >

<a href="https://colab.research.google.com/github/stephenbaek/bigdata/blob/master/in-class-assignments/ica02/How_to_Read_and_Represent_Data.ipynb"><img align="left" src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open in Colab" title="Open and Execute in Google Colaboratory"></a>

# How to Read and Represent Data

## 1. Introduction to Spark DataFrames

Spark SQL is a Spark module for structured data processing. The interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL and the Dataset API. When computing a result, the same execution engine is used, independent of which API/language you are using to express the computation. This unification means that developers can easily switch back and forth between different APIs based on which provides the most natural way to express a given transformation.

One use of Spark SQL is to execute SQL queries. Spark SQL can also be used to read data from an existing Hive installation. For more on how to configure this feature, please refer to the Hive Tables section. When running SQL from within another programming language the results will be returned as a Dataset/DataFrame. You can also interact with the SQL interface using the command-line or over JDBC/ODBC.

A **Dataset** is a distributed collection of data. The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). A **DataFrame** is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R.

(source: https://spark.apache.org/docs/latest/sql-programming-guide.html)

### 1.1. Getting Started with PySpark in Google Colab

PySpark can be installed on Google Colab by running the cells below:

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# Use the archive.apache.org link for older versions
!wget -q https://archive.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark


In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

In [4]:
import findspark
findspark.init()

### 1.2. Getting Started with PySpark on your local machine
(This section is not available yet.)

## 2. Your first PySpark DataFrame

Now, let us define our first PySpark DataFrame. DataFrame in PySpark is like a spreadsheet that accommodates your data. If you are already familiar with Pandas, you can safely assume that PySpark DataFrame is sort of like Pandas DataFrame (but with more powerful features). If you are not so familiar with Pandas, or have no idea what that even is (cf: it is certainly not the animal...), don't worry. I'll walk you through what PySpark DataFrame is, and a prior knowledge of Pandas, although it will certainly be useful, is not critical.

First off, before to mess around with DataFrames, the first thing you need to do is to create a new PySpark session, which can be done by running the following:

In [1]:
# 1. Install Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# 2. Download Spark 3.5.1 (This version works with modern Python)
!wget -q https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz

# 3. Unzip
!tar xf spark-3.5.1-bin-hadoop3.tgz

# 4. Install findspark
!pip install -q findspark

# 5. Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

# 6. Initialize and Test
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("Test").getOrCreate()

print(f"Success! Spark version {spark.version} is running.")

Success! Spark version 3.5.1 is running.


Just as a note, we will run the above, almost all the time when we use PySpark. Hence, it should be a good idea to memorize (or at least get familiar with) these lines.

Now, let's read a data set from a comma-separated values (CSV) file. PySpark comes with a convenient set of functions (also called 'methods' in Python) for reading and parsing some common data file formats, such as `spark.read.csv()` (below) or `spark.read.json()`. Once the contents of the file is parsed, the values are returned as PySpark DataFrame, which essentially is a spreadsheet, like I mentioned earlier, or a data container, if you will. The actual usage looks as follows.

In [3]:
from google.colab import files

# 1. This triggers the "Choose Files" button
uploaded = files.upload()

# 2. Automatically get the name of the file you just uploaded
filename = next(iter(uploaded))

# 3. Read the uploaded file into Spark
df = spark.read.csv(filename, header=True, inferSchema=True)

# 4. Show the first few rows to verify
df.show(5)

Saving customers-100.csv to customers-100.csv
+-----+---------------+----------+---------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+
|Index|    Customer Id|First Name|Last Name|             Company|             City|             Country|             Phone 1|             Phone 2|               Email|Subscription Date|             Website|
+-----+---------------+----------+---------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+
|    1|DD37Cf93aecA6Dc|    Sheryl|   Baxter|     Rasmussen Group|     East Leonard|               Chile|        229.077.5154|    397.884.0519x718|zunigavanessa@smi...|       2020-08-24|http://www.stephe...|
|    2|1Ef7b82A4CAAD10|   Preston|   Lozano|         Vega-Gentry|East Jimmychester|            Djibouti|          5153435776| 

In the above, we have downloaded the Major League Baseball data set from the URL using `wget` command (note: it is a linux command for downloading web data), and parsed it into a PySpark DataFrame named `df`.

Note that `header=True` argument is used to indicate the first row of the CSV file contains the name of the columns (parameters). If you set it `None`, which is the default value, PySpark will just come up with some random column names and the first row of the CSV file will just be assumed as a part of data entries.

For now, let's not worry about `inferSchema=True` here. I'll explain this later in this tutorial.

TIP: If you would like to read your own data file on your local drive but in Colab, here's a simple way to do so:
```python
from google.colab import files
files.upload()
```

To see the contents of a DataFrame, `DataFrame.show()` method provides a simple way to check the raw data.

In [4]:
df.show()

+-----+---------------+----------+---------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+
|Index|    Customer Id|First Name|Last Name|             Company|             City|             Country|             Phone 1|             Phone 2|               Email|Subscription Date|             Website|
+-----+---------------+----------+---------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+
|    1|DD37Cf93aecA6Dc|    Sheryl|   Baxter|     Rasmussen Group|     East Leonard|               Chile|        229.077.5154|    397.884.0519x718|zunigavanessa@smi...|       2020-08-24|http://www.stephe...|
|    2|1Ef7b82A4CAAD10|   Preston|   Lozano|         Vega-Gentry|East Jimmychester|            Djibouti|          5153435776|    686-620-1820x944|     vmata@colon.com|     

Another way to see the structure of data is by printing, what is called, *Schema*. While the details will be explained several lines later, Schema defines the column name, variable type, and *nullability* of the data. Again, I'll explain those terms later, but for now, let's just check what happens when we run `DataFrame.printSchema()` method.

In [5]:
df.printSchema()   # nullable --> sets missing data to null, when false, returns error

root
 |-- Index: integer (nullable = true)
 |-- Customer Id: string (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Last Name: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Phone 1: string (nullable = true)
 |-- Phone 2: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Subscription Date: date (nullable = true)
 |-- Website: string (nullable = true)



Another way of listing all the column names in the data set is by printing `DataFrame.columns`. This method returns a list of `strings` containing column names.

In [6]:
df.columns

['Index',
 'Customer Id',
 'First Name',
 'Last Name',
 'Company',
 'City',
 'Country',
 'Phone 1',
 'Phone 2',
 'Email',
 'Subscription Date',
 'Website']

Another useful method is `DataFrame.describe()`. It computes the basic summary statistics of your data set, including $N$ (the number of rows), mean, standard deviation, minimun, and maximum for each column. Such a summary statistics is returned as another DataFrame object, which allows you to "show" the contents therein.

In [7]:
df.describe().show()

+-------+------------------+---------------+----------+---------+--------------------+-------------+--------+--------------------+-------------------+--------------------+--------------------+
|summary|             Index|    Customer Id|First Name|Last Name|             Company|         City| Country|             Phone 1|            Phone 2|               Email|             Website|
+-------+------------------+---------------+----------+---------+--------------------+-------------+--------+--------------------+-------------------+--------------------+--------------------+
|  count|               100|            100|       100|      100|                 100|          100|     100|                 100|                100|                 100|                 100|
|   mean|              50.5|           NULL|      NULL|     NULL|                NULL|         NULL|    NULL|      5.1711689515E9|    6.22990414175E9|                NULL|                NULL|
| stddev|29.011491975882016|       

Observe some of the summary stat items are `null`. This happens usually when the column contains non-numeric values.

## 3. Schema

One of the things that deserved an attention but we didn't quite do a due diligence was the concept of *schema*. The term schema is a database jargon referring to the organization/structure of data.

In PySpark, schema is a list of *column names*, *data types*, and *nullabilities*. A column name is literally just the name of a column and there's nothing to worry about. A data type indicates the type of values in the corresponding column and is typically one of the following:

- BooleanType – Boolean values.
- IntegerType – An integer value. (i.e. a signed 32-bit integer)
- FloatType - A single precision floating point number.
- DoubleType – A floating-point double value.
- StringType – A text string.
- DateType – A datetime value. (datetime.date)
- TimestampType – A timestamp value (typically in seconds from 1/1/1970).
- NullType – The data type representing None, used for the types that cannot be inferred.

In some situations, the following could also be used as a data type:
- BinaryType – Binary (byte array) data.
- ByteType – A byte value. (i.e. a signed integer in a single byte)
- LongType – A long integer value.
- ShortType – A short integer value.
- DecimalType - A fixed precision value. DecimalType(5,2) means the maximum total number of digits is 5 and the number of digits after the decimal point (the dot) is 2. Therefore, numbers in the range $[-999.99, 999.99]$ can be represented with DecimalType(5,2).

For more information, see https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#module-pyspark.sql.types

Lastly, the nullablility indicates wheather or not the column can contain `null` values. That is, if `nullable` is set to `True`, the column can contain `null` values in the entry. As a quick note, unless you have a full certainty about the data set, it's always better to set it to `True` to be safe because there can be `null` values almost always in real-world data sets (e.g. empty cells, missing values).

You may recall that, when we called `spark.read.csv`, we passed an argument that says `inferSchema=True`. Now it may start to make sense to you. Yes, it was an argument to let PySpark to automatically infer data types from what the values look like.

By default, `inferSchema` option is disabled. So, compare the line below with what we had above:

In [None]:
df = spark.read.csv('People.csv', header=True)
df.printSchema()

Notice that when `inferSchema` option is turned off, Spark just assumes everything is `StringType`, which is the safest assumption.

Sometimes, you may want to force your own data types. Below is one way of doing so.

In [None]:
from pyspark.sql.types import StructField, StringType, IntegerType, LongType, StructType

In [None]:
data_schema = [StructField('birthYear', IntegerType(), True),
               StructField('nameLast', StringType(), True)]

In [None]:
data_schema = [StructField('playerID', StringType(), True),
 StructField('birthYear', IntegerType(), True),
 StructField('birthMonth', StringType(), True),
 StructField('birthDay', StringType(), True),
 StructField('birthCountry', StringType(), True),
 StructField('birthState', StringType(), True),
 StructField('birthCity', StringType(), True),
 StructField('deathYear', StringType(), True),
 StructField('deathMonth', StringType(), True),
 StructField('deathDay', StringType(), True),
 StructField('deathCountry', StringType(), True),
 StructField('deathState', StringType(), True),
 StructField('deathCity', StringType(), True),
 StructField('nameFirst', StringType(), True),
 StructField('nameLast', StringType(), True),
 StructField('nameGiven', StringType(), True),
 StructField('weight', StringType(), True),
 StructField('height', StringType(), True),
 StructField('bats', StringType(), True),
 StructField('throws', StringType(), True),
 StructField('debut', StringType(), True),
 StructField('finalGame', StringType(), True),
 StructField('retroID', StringType(), True),
 StructField('bbrefID', StringType(), True)]

In [None]:
schema_struct = StructType(fields=data_schema)

In [None]:
df = spark.read.csv('People.csv', header=True, inferSchema=True, schema=schema_struct)
df.printSchema()

## 4. Some basic operations

Now that we understand the basic structure of Spark DataFrames, let us take a look at some basic operations that allow us to interact with data.

In [None]:
!wget -q https://github.com/chadwickbureau/baseballdatabank/raw/master/core/Batting.csv
df = spark.read.csv('Batting.csv', header=True, inferSchema=True)
df.show()

### 4.1. Selecting Columns

As you analyze data, what you will do a lot is to select a few subset of columns from a dataset and to process them. In PySpark, selection of a column is as simple as `df['column_name']`.

In [None]:
df['playerID']

Note that the cell above prints a spark column object. To see that more explicitly, you can run `type()` function in Python to check the data type.

In [None]:
type(df['playerID'])

An important thing to remeber here is that column objects are **not** the same as DataFrames. For example, `show()` method that was available for usual DataFrames will **NOT work** with a column object. Hence, the following cell is **supposed to return a compile error**:

In [None]:
df['playerID'].show()

Then why did people bother to create a column data type? Well, as we will see later, one reason is to use it as an efficient reference (sort of like a pointer) to a column. For now though, it should be enough to understand column objects are not DataFrames.

What should we do then, if we want to handle columns like the usual DataFrames? In other words, what should we do if we want to call `show()` method to print the contents of a specific column? For this purpose, there is a `select()` method you can call from a DataFrame. See the example below.

In [None]:
df.select('playerID')

Now the type is DataFrame. Or more explicitly:

In [None]:
type(df.select('playerID'))

And of course, since now the column is selected as a DataFrame, we can call any DataFrame method we would like.

In [None]:
df.select('playerID').show()

Column selection doesn't need to be just a single column, but multiple columns can be selected at once. In this case, column names to be selected can be passed as a Python list. For example:


In [None]:
df.select( ['playerID', 'yearID'] ).show()

### 4.2. Selecting Rows

As much you need to select columns, you will need to select rows too. However, selection of rows is not as trivial as in columns. In fact, if you think about it, rows are basically entries of data that are different case by case, sample by sample. Furthermore, when your data is big, we are not talking about just a few handful number of rows, but thousands or millions of them (or even more).

So selection of rows is often conducted by specifying some particular conditions. For example, you may want to select the first 10 rows of your DataFrame, which has already been sorted by some rules. You may also want to select rows that satisfy a certain condition. It is uncommon (especially in big data scenarios) to select rows using indices, like what we've done for columns.

That said, here is our simplest example of selecting some rows. Let's say you are interested in extracting the first 10 rows of a DataFrame. `head()` method provides that exact functionality.

In [None]:
df.head(10)

Notice that the row elements are returned as a Python list (look at the square bracket!). This means that you can access the selected row elements as if you are accessing a value in a list.

In [None]:
df.head(10)[0]  # zero-th element

The data type of a row element is `Row` type, or more explicit, `pyspark.sql.types.Row'.

In [None]:
type(df.head(10)[0])

Just to reemphasize, what `head()` method returns is a Python list.

In [None]:
type(df.head(10))

Quite evidently, a list object is not the same as a DataFrame object. So, for example, you cannot call `show()` from what you retrieved using `head()` method. The following cell **will return a compile error**.

In [None]:
df.head(10).show()

Now, what if we want to get the top n rows as a DataFrame? Like we have `select` function for selecting columns as a DataFrame, we have `limit` function for selecting rows as a DataFrame. The usage is quite trivial at this point, so let's just take a look at the following example.

In [None]:
df.limit(10).show()

Now, being able to select top rows can already be quite useful on its own. As we will see shortly, we can sort tables quite easily based on some criteria using PySpark. Therefore, we can extract rows that satisfy particular conditions by sorting rows first and then selecting the top n rows. However, sorting the entire table can be actually quite burdensome, and you will soon notice that there's got to be a better way.

In such cases, my favorite option is to use `where()` method. `where()` method allows you to select rows that satisfy certain conditions. For example, the line below extracts the list of baseball players that played between 2000 and 2010 (inclusive) .

In [None]:
df.where(df['yearID'].between(2000,2010)).show()

One thing you must pay attention to is that `show()` method was called directly from the output of `where()` method. This indicates that the output of `where()` is a DataFrame (as opposed to a Python list of Row elements).

Another thing you must know is that we used a column object returned by `df['column-name']` method. So this is, in fact, another reason why column objects exist. It provides an efficient way of defining search rules for different columns, without having to pass an entire DataFrame as a whole.

As a matter of fact, if you look at the data type of the output of `between()` function, you will notice that it is sort of like a pointer variable referencing a column with some criteria.

In [None]:
df['yearID'].between(2000,2010)

Of course, there are many methods other than `between()` that are available in PySpark. In fact, you don't even have to rely on a predefined function. Instead, the usual boolean operators are nicely compatible with PySpark column objects. For example, the line below is perfectly legitimate way of selecting rows whose `yearID` are less than 1900.

In [None]:
df['yearID'] < 1900

In [None]:
df.where(df['yearID'] < 1900).show()

Of course, when you have multiple conditions, you can simply use `&` operator or `|` operator to conjoin different conditions. Just don't forget to use parenthesis for each condition. For example, below example finds MLB hitters that recorded more than 20 homeruns and 20 base steals since 2015.

In [None]:
(df['yearID'] >= 2015) & (df['HR'] > 20) & (df['SB'] > 20)

In [None]:
df.where((df['yearID'] >= 2015) & (df['HR'] > 20) & (df['SB'] > 20)).show()

### 4.3. Adding Columns

Another type of operation that can be quite useful is to add new columns. For example, let say you invented a new feature (or a baseball metric) called 'base steal success rate (SBSR)' defined as the number of stolen bases (SB) divided by the number of caught stealing (CS). (Of course, there are, for example, many players who never even tried a single base steal, so there is no caught stealing. But let's not get too serious about this new metric.) For this, the following line can be quite useful.

In [None]:
df.withColumn('SBSR', df['SB']/df['CS']).show()

Okay, I know there are a lot to unpack there. First of all, `withColumn(<column-name>, <value>)` is a method to create a new column. `df.withColumn()` function creates a new DataFrame that has the same contents as the original DataFrame `df`, but with an added column `<column-name>`. Note that it doesn't change the original DataFrame `df`.

For the value, you can pass a reference to a column to be added. Of course, the column must have the same number of rows to be added to the existing DataFrame. An interesting thing here is that algebraic operations can be performed directly on columns.

In [None]:
df['SB'] + df['CS']

In [None]:
df['SB'] * df['CS']

In [None]:
df['SB'] / df['CS']

At this point, you may have already noticed that column objects have something special in them. At the beginning of this lab tutorial, I have emphasized that column objects must be distinguished from DataFrames. I have also mentioned that column objects are sort of like pointers or references to the corresponding column in the DataFrame.

Well, to be more precise, column objects are essentially containers for queries. For example, a few lines ago, we saw that a boolean operation like this `(df['yearID'] >= 2015) & (df['HR'] > 20) & (df['SB'] > 20)` creates a column object like this `Column<b'(((yearID >= 2015) AND (HR > 20)) AND (SB > 20))>`. Right above, we also saw that columns could be added, subtracted, multiplied, and divided and the result of that was something like this `Column<b'(SB / CS)'>`.

In fact, what's happening here is that as we run arithematic/boolean operations on columns, PySpark stores those operations as a query string in a column object. It doesn't really run actual numerical operations at this point. So nothing is really added, multiplied, or compared. Instead, by the time the column object is pass along to a DataFrame method (e.g. `where()`, `withColumn()'), PySpark converts the query string into an optimized Spark code internally and executes the query.

This is an important design principle of Spark you must understand. The choice behind this is actually quite obvious. It is obviously not a good idea to run calculations each time a user enters a code, because there can be so many data entries in a big data problem and the data are typically distributed across multiple DataNodes. Instead, it is more beneficial to collect all the queries until the end, and run only once when the user requests to fetch actual results. Further, when this happens, Spark optimizes the queries in a way that is compatible to the database system it is running on (in our case, Hadoop), which is a secret recipe behind efficient distributed computing.

By understanding how PySpark handles queries, we have just opened up a treasure chest with many toolsets. One of them is a set of math functions that are predefined in PySpark. These functions can be loaded by importing `pyspark.sql.functions` module.

In [None]:
import pyspark.sql.functions as F

Using this, new columns with custom processed values can be defined quite easily. While I won't be listing the full, exhaustive list of functions here, the following example will give you the gist of how those functions could be used. (For a full list of functions, see https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions)

In [None]:
F.exp(df['SB']) + F.cos(df['SB'])

In [None]:
df.withColumn('exp(HR) + cos(SB)', F.exp(df['SB']) + F.cos(df['SB'])).show()

## 5. Sorting

Once how queries work in PySpark become clear, the rest should be easy peasy. For example, a DataFrame function that sorts rows according to sorting criteria is `sort()`. In the meantime, a query function to define the ascending/descending order is `asc()`/`desc()`. With this, we can create a list of top 10 MLB hitters in homeruns per season.



In [None]:
df.sort(df['hr'].desc()).limit(10).show()

You can also specify multiple sorting conditions. For example, the last two rows in the top 10 list above has two players with 59 homeruns (HR). Let's assume that we want to sort them further by `RBI`, low to high (ascending order). For this, the code below will do the job.

In [None]:
df.sort(df['hr'].desc(), df['rbi'].asc()).limit(10).show()

Notice that the ordering has now been changed so that when `HR` is the same, the entries are sorted based on the ascending order of `RBI`.

## 6. Joining DataFrames

It is quite normal to store big data in different tables. For example, an online shopping mall may have separate tables of customer data, product data, inventory data, etc. In our baseball example as well, there are multiple database files such as player data, batting data, pitching data, salary data, etc. Knowing how to retrieve information from multiple DataFrames, therefore, is of critical importance.

To see how this can be done in PySpark, let us now read two different csv files, `Batting.csv` and `People.csv`. We have in fact downloaded these two files already from the previous examples, so they can be accessed directly.

In [None]:
df_batting = spark.read.csv('Batting.csv', header=True, inferSchema=True)
df_people = spark.read.csv('People.csv', header=True, inferSchema=True)

Now, let's join `df_batting` with `df_people`. In fact, joining two DataFrames is as simple as just `df1.join(df2)`.

In [None]:
df_batting.join(df_people)

Of course, `join()` function in PySpark is not all that smart, so if you try to fetch the actual result (remember, PySpark doesn't actually execute anything but just accumulates queries, until you request for actual result), you should see an error:

In [None]:
df_batting.join(df_people).show()

The reason why we are seeing the above error is because the joining operation is quite ambiguous. For example, `df_batting` and `df_people` have different number of rows and different entries. Hence there is no trivial way to join them together.

For this, you will need to specify key values to join two tables. In our case, `playerID`s are the ones that we can use to connect the two databases.

In [None]:
df_batting.join(df_people, on=['playerID']).show()

Now you can see that the two tables have been successfully joined together using `playerID` as a key connecting them.

## 7. Aggregation

Now the last topic for this lab tutorial is data aggregation. For example, the batting statistics we have are based on each season. In other words, the same player may have multiple rows of their record if they played more than one season. Now let's assume that we are interested in career homerun records (i.e. how many homeruns that a player has hit throughout the entire career), rather than season records. To do this, we need to be able to aggregate all the rows corresponding to each player by summing them up.

So far, we've learned how to sort values in a DataFrame:

In [None]:
df_batting.sort(df_batting['hr'].desc()).limit(10).show()

Note that Sammy Sosa has been named three times in the top 10 list, because of the number of homeruns he recorded in 1998, 1999, and 2001 seasons. This is not what we want, apparently. Instead, we want to aggregate individual homerun records to make them career homerun records.

To do this, we are going to first group rows by `playerID`.

In [None]:
df_batting.groupby('playerid')

This creates a grouped DataFrame where the rows are ready to be aggregated by `playerID`. Since we want the career total homeruns, the aggregation method of our choice is `sum()`.

In [None]:
df_career = df_batting.groupBy('playerid').sum()
df_career.show()

As can be noticed from the above, `sum()` method creates a new DataFrame comprised of aggregated column values. There are other aggregiation methods such as `avg()`, `max()`, `min()`, etc., whose functions are quite self-explanatory. (For an exhaustive list, see https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData)

Now let's join this aggregated DataFrame with `df_people` so that we can see player names on the side.

In [None]:
df_homerun = df_career.select(['playerid', 'sum(hr)']).join(df_people.select(['playerid', 'namefirst', 'namelast']), on=['playerid'])
df_homerun.show()

Finally, we can sort the new DataFram (`df_homerun`) in an ascending order of `sum(HR)` to generate a top 10 career homerun ranking.

In [None]:
df_homerun.sort(df_homerun['sum(hr)'].desc()).limit(10).show()

## 8. SQL (Optional)

If you are already familiar with SQL, Spark actually allows you to create queries using SQL. While this is beyond the scope of this class, I find it may be useful for some of you and hence provide an example below.

In [None]:
df_people.createOrReplaceTempView('people')

In [None]:
results = spark.sql("SELECT * FROM people")

In [None]:
results.show()

In [None]:
spark.sql("SELECT * FROM people WHERE birthYear>1990").show()

< [Hello World](../ica01/hello_world.ipynb) | Contents (TODO) | [Data Mining](../ica03/Data_Mining.ipynb) >

<a href="https://colab.research.google.com/github/stephenbaek/bigdata/blob/master/in-class-assignments/ica02/How_to_Read_and_Represent_Data.ipynb"><img align="left" src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open in Colab" title="Open and Execute in Google Colaboratory"></a>