# Applied Data Science (MAST30034) Tutorial 1

- Installing `pyspark` and Pre-Req Notebook (15-30 minutes)
- Basic intro to Apache Spark (30-60 minutes)
- Project 1 Tips (yes, it's already out and we **strongly recommend you start today**) (remainder of time)
_________________

# Working with Larger Datasets with a Scalable Solution!
Consider the size of the datasets you have worked with at Uni. Probably a few hundred megabytes or a couple gigabytes. Whilst `pandas` and `Excel` do have their use cases, it is not feasible to use them when you work with larger datasets over several gigabytes.

For example:
1. 20k rows would be hard for Excel, but easy for `pandas`.
2. A few million records would be doable for `pandas` depending on RAM (let's say 16GB or 32GB to be generous).
3. Now, consider 100 million rows over several gigabytes. `pandas` **is not your solution**.

Why?

`pandas` works in-memory. That is, you are limited by RAM which can be hard to come across for the average person. Even with 32GB or 64GB memory, it is best to use Apache Spark, which is designed to work with large datasets.

![image.png](https://spark.apache.org/images/spark-logo-trademark.png)


**Disclaimer:**
- Windows 10 or 11 users are required to install `WSL` or `WSL2` for `pyspark`. This is something that you should take the time to learn how to use and install now for a future career in the tech industry. If you have yet to install it, please visit https://github.com/akiratwang/COMP20003
- MacOS (Intel) or Linux is all good. If you are using an M1 or M2 chip, you will need to follow some specific instructions.


If you really don't want to use spark, you can use `pandas` as usual. Just be aware of memory limitations and missed opportunity for upskilling yourself for a job! For those who don't want to use Spark, please run `pip3 install pandas fastparquet` and you can just read it in like this.

In [None]:
import pandas as pd

df = pd.read_parquet('../../data/tlc_data/2022-01.parquet')
df.tail()

You can then follow the tutorial using the alternative `pandas` syntax.

**Steps:**

0. (Pre-Req) Install WSL2 for Windows 10 users. MacOS users, please ensure your terminal is set to `bash`.
1. We recommend a fresh environment for this subject as there can be package conflicts, but all good if you are lazy. If you are getting errors, please refresh your environment before coming to us for help.
2. Install `Java` and `PySpark`:  
    - Linux (WSL or WSL2 or native)
        ```bash
        # Update apt formula
        sudo apt update
        # install java
        sudo apt install openjdk-8-jdk -y
        # add to path
        echo 'JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64"' | sudo tee -a /etc/environment
        # apply to environment
        source /etc/environment
        # install spark
        pip3 install pyspark pyarrow pandas
        ```
    - MacOS
        1. Install [Homebrew](https://brew.sh/)
            - If your shell prompts to set `zsh` as default shell with `chsh -s /bin/zsh`, run that first!!
        2. Install/setup Java/JAVA_HOME (spark uses java for backend)
            ```
            # For Intel CPU
            # install java 8 and link to system java wrapper
            brew install openjdk@8 
            # For newer version of brew, try the command below if brew install doesn't work
            #brew install --cask homebrew/cask-versions/adoptopenjdk8
            sudo ln -sfn /usr/local/opt/openjdk@8/libexec/openjdk.jdk /Library/Java/JavaVirtualMachines/openjdk-8.jdk
            # add to path (earlier OSX defaults to bash while newer ones defaults to zsh)
            echo 'export JAVA_HOME="$(/usr/libexec/java_home -v1.8)"' | tee -a $HOME/.bashrc $HOME/.zshrc
            ```
            (If you are using MacOS (M1 or M2 chip), follow [this guide](https://code2care.org/q/install-native-java-jdk-jre-on-apple-silicon-m1-mac) for Java JDK.)
        3. Install python packages/spark
            ```bash
            # reload java path
            source $HOME/.bashrc ; source $HOME/.zshrc
            # install spark. Note: if you are using anaconda/conda environments, you need to make sure the pip3 is the correct pip3!
            # Or you should install with conda directly!
            #conda install pyspark pyarrow pandas
            pip3 install pyspark pyarrow pandas
            ```
3. If you are using Vscode on windows, you also need to follow this [instruction](https://code.visualstudio.com/docs/remote/wsl-tutorial).
4. After you installed everything, reload your notebook kernel.

Run the code below to see if you have installed it. As long as it runs (despite red warnings) and there are no errors, you're ready to go!

Touble shooting guides:
![ELPERS](https://cdn.betterttv.net/emote/5ec42fc6c752192ee9603b94/1x)
1. Help! The module is still not found after I installed everything!
    - run `which pip` `which python` in your terminal and compare that with results of `import sys; sys.executable` running from your jupyter notebook. They have to be the same path (why?).
    - If not same path, change the kernel of your jupyter notebook to using that python kernel.
2. HELP!! The java instance stopped when executing the cell
    - Ensure java is installed (commands executes without error)
    - make sure `echo $JAVA_HOME` produces the proper location (i.e. it points at where your java is installed)
3. hELppp! `conda`, `pip`, `apt`, `brew`... not found!
    - Install the required softwares and make sure their home folder are present in your `echo $PATH`
4. elPppppp! I am using windows and I don't want to use WSL
    - NO.

In [2]:
import sys
sys.executable

'C:\\Users\\Phikho\\anaconda3\\envs\\MAST30034\\python.exe'

In [3]:
from pyspark.sql import SparkSession

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Tutorial 1")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .getOrCreate()
)

## Starting a Spark Session
To begin with Spark, we need to start a `SparkSession` class (see above).
- `appName`: Name of the Spark app
- `config`: Configurations to initialise with. We will initialise this example with `'spark.sql.repl.eagerEval.enabled'` which enables a nicer HTML display (similar to `pandas`) for the DataFrame outputs.
- `.getOrCreate()`: Create the spark session.
    
A general note is to understand that Spark is **immutable**. We'll discuss it further down the track, but for now, just remember this!

Documentation is also going to be your saving grace. If you have tried your best **and have read the documentation and researched on Stack Overflow** but still can't get it working, then you can ask your tutor for help. This is how it works in Industry.

## Reading in the Parquet
As of 2022, TLC has made a **great decision** to drop `csv` and adopt `parquet` formats instead. So, what's a `parquet`? 

Related materials:
1. [What if you could get the simplicity, convenience, interoperability, and storage niceties of an old-fashioned CSV with the speed of a NoSQL database and the storage requirements of a gzipped file? Enter Parquet.](https://databricks.com/session/spark-parquet-in-depth)
2. [The Parquet Format and Performance Optimization Opportunities](https://databricks.com/session_eu19/the-parquet-format-and-performance-optimization-opportunities)

CSV:
- `csv` are tabular data formats read in line by line using a `,` delimiter.
- That is, these are stored by rows.
- They consume a lot of disk space and are one of the **most inefficient** ways of storing data.
- However, they are widely used and easy to use for smaller datasets.

Parquet:
- `parquet` on the other hand is stored in columns and (ELI5) are very efficient with data formats.
- For example, a single row in a `csv` can contain several different data types. 
- `parquet` just have the single data type per column, allowing compression algorithms to be applied to reduce disk space and read efficiency.
- For alternatives to `csv` for row based data formats, you can take a look at `avro`.

![Divisions of storage format](../../media/storageformat.png)

Cost Analysis from Amazon Web Services (AWS): ![image.png](https://miro.medium.com/max/1400/1*vdasMxTjInhBXIRA8K1XYQ.png)

Spark Docs
- https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.parquet.html?highlight=read%20parquet
- https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.show.html?highlight=show#pyspark.sql.DataFrame.show

In [4]:
# sdf = spark df = spark data frame
sdf = spark.read.parquet('../../data/tlc_data/2022-01.parquet')
sdf.show(1, vertical=True, truncate=100)

AnalysisException: Path does not exist: file:/C:/Users/Phikho/Desktop/Applied Data Science/MAST30034_Python/data/tlc_data/2022-01.parquet

The Spark UI is quite ugly at times, so if you miss `pandas` and want the "nice" display you can set `spark.sql.repl.eagerEval.enabled` to `True` in the config. To see the nice format, use `.limit()`.

`pyspark`'s `.show()`, `.head()`, `.limit()`, etc are all alternatives to `pandas`'s `.head()` (`.tail` exists in both `pandas` and `pyspark`).

In [None]:
sdf.head(5)

Spark has also been designed to read in directories as well! We won't be using it for the tutorial, but if you wish to use it for your project, feel free to do so!

In [None]:
# here, we give it the directory, rather than a specific parquet
sdf_all = spark.read.parquet('../../data/tlc_data/')

To count the number of records, use the `.count()` method. The equivalent in `pandas` would be `len(df)` or `df.shape` or alternative. 

In [None]:
sdf.count(), sdf_all.count()

To view the data types of our `sdf`, we can use:
- `.printSchema()` to print it nicely.
- `.schema` for the actual schema object

The `pandas` alternative is `df.dtypes`

In [None]:
sdf.printSchema()

In [None]:
sdf.schema

See here for the available data types: https://spark.apache.org/docs/latest/sql-ref-datatypes.html

## Basic Operations
### Selection
To show a specific column, we will use `sdf.select(col).limit(5)`. 
- The equivalent in `pandas` is `df[col].head()`.

To _access_ a specific column, use the `sdf[col]` syntax (equivalent to `df[col]`). Avoid using `sdf.col` or `df.col` as it is **not** robust (cannot handle columns with spaces) or future-proof. 

For multiple columns, pass them through an array as usual.

Please note, this selection is only good for seeing bits and pieces of data and not for filtering.

In [None]:
sdf.select('passenger_count').limit(5)

_Students to write code to select the first 10 records for `passenger_count` and `trip_distance`_

In [None]:
# write code here to select the first 10 records for `passenger_count` and `trip_distance`
sdf.select(["passenger_count", "trip_distance"]).limit(10)

### Filtering
For filtering data, we use `sdf.filter(condition)` or `sdf.where(condition)` (they are aliases of each other)
- The equivalent in `pandas` is `df.loc[condition].head()`
- When using multiple conditions, use parenthesis and `&` (AND) / `|` (OR)

To do so, we will use `pyspark.sql.functions.col` to specify the column we are working with.

In [None]:
from pyspark.sql import functions as F

In [None]:
F.col("passenger_count")

As you can see, this is just a "column type" and doesn't do much. We'll come back to this in the next tutorial. For now, take our word.

In [None]:
sdf.filter(F.col('passenger_count') == 5).limit(5)

_Students to write code to retrieve all non-zero passenger counts and all non-zero trip distances using `.where()`_

In [None]:
# write code here to retrieve all non-zero passenger counts and all non-zero trip distances using where
sdf.where((F.col('passenger_count') > 0) & (F.col('trip_distance') > 0)).limit(5)

### GroupBy (Aggregation)
To groupby the data (i.e mean), we can use `sdf.groupby(col).mean(aggregated columns).limit(5)`
- The equivalent in `pandas` is `df.groupby(col)[aggregated columns].mean().head()`

In [None]:
sdf.groupby('passenger_count').mean('trip_distance').limit(5)

We can also apply multiple different aggregations and change their output names using `.agg()` and `.alias()`! To see the list of all SQL functions, visit https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html

We'll also use `.orderBy()` to display the results nicely.

In [None]:
aggregated_results = sdf \
                    .groupBy("passenger_count") \
                    .agg(
                        F.mean("total_amount").alias("avg_trip_amount_usd"),
                        F.max("trip_distance").alias("max_trip_distance_miles")
                    ) \
                    .orderBy("passenger_count")

aggregated_results.show()

## Saving Data
By default, Spark will save your data sources as a `parquet` (highly recommended). If you wish to take a smaller sample and save it as a `csv` to load into `pandas`, that is also fine.

In [None]:
aggregated_results.write.mode('overwrite').parquet('../../data/tute_data/aggregated_results')

Your directory may look a bit funky like this:

![image.png](../../media/aggregated_results_dir.png)

Don't worry, just leave it as is (we don't have time to cover everything about Spark unfortunately) and you can just read in the directory as is.

In [None]:
temp_results = spark.read.parquet('../../data/tute_data/aggregated_results')
temp_results.show()

---

# Summary (and Break)
Cool, we've covered the very very basics of Spark and will now cover the basics of plotting.

Rest assured, we will cover more intricate transformations for the next tutorial (which you may go ahead in of course).

---

## Sampling Data for Plotting

Whilst Spark is amazing at handling big data sets, it isn't a great idea to plot all of it. We suggest taking a maximum of 5% of records for the tutorial. 

You can up it to your requirements, but we recommend sticking to less than 1 million records per month for visualization purposes.

**Project 1 Checklist:**
- You have justified your sample size (i.e due to runtime, distribution of data, etc)
- You have justified your sampling method (i.e random, stratified, etc)
- You have detailed in your report that you have sampled for visualization purposes BUT your analysis still uses the full distribution of data
- You mention any issues that can potentially be caused by sampling (i.e biased visualisation if using random)

To sample your data and convert it into a `pandas` dataframe, you can use the `.toPandas()` and save a sample of the `sdf` to read it in. We will also fix the random seed to be `0` just for consistency.

In [None]:
SAMPLE_SIZE = 0.05

In [None]:
df = sdf.sample(SAMPLE_SIZE, seed=0).toPandas()
df.to_csv('../../data/tute_data/sample_data.csv', index=False)

In [None]:
df.to_parquet('../../data/tute_data/sample_data.parquet')

Just spend a moment and look at the disk space the `csv` takes for the 5% sample size (13.3mb). Compare that to the `parquet` which isn't even 3mb, let alone the full sample size in `parquet` format taking only 37mb of disk space.

Let that sink in and give our thanks to the devs who made Spark. 

In [None]:
%%time
df_csv = pd.read_csv('../../data/tute_data/sample_data.csv')

In [None]:
%%time
df_parquet = pd.read_parquet('../../data/tute_data/sample_data.parquet')

We recommend you save every dataframe or aggregation as `parquet` so you don't keep running your notebook from top to bottom waiting 20 years for a result, or have so many variables and dataframes defined that you run out of memory for small transformations.

We strongly suggest you have a `code` folder in your Project 1 directory with the following structure:
- `preprocessing_notebook_part_1.ipynb`: outputs a structured parquet format and saves it.
- `preprocessing_notebook_part_2.ipynb`: reads in the output from above and does some aggregations and sampling before saving it.
- `data_analysis_xyz.ipynb`: conducts analysis on a single sample or aggregation from the output above.
- `data_analysis_abc.ipynb`: conducts analysis on another single sample or aggregation from the output above.
- `...`

This is a very basic version of what you call a "data pipeline" (or ETL pipeline, etc).

_________________


# Project 1 Tips and Questions

### IMPORTANT PLEASE READ THIS
First and foremost, you want to be familiar with the homepage https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page

Read through the relevant data dictionaries:
- **MUST READ:** https://www1.nyc.gov/assets/tlc/downloads/pdf/trip_record_user_guide.pdf
- https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf
- https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_green.pdf
- https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_fhv.pdf
- https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_hvfhs.pdf

Why? Your tutors can be treated as "experts" in this field. To prepare you for the Industry Project, we need to assess students on adhering to requirements and business rules. 

The tutor team should know this dataset inside out. If you are incorrectly filtering records without sufficient justification, you will be losing marks as per requirements.

### An Incorrect Example
- Scenario: Student does analysis on `tip_amount` and finds several `NULL` values and either drops them or includes it in the analysis. Later on, they use a regression model to predict this value.

- Result: According to the data dictionary, `tip_amount` is automatically populated for credit card tips (`payment_type` is `1`). Cash tips are not included. This means that the students' analysis included all payment types despite this field clearly specifying the rule. 

- Penalty: The student will lose marks on the analysis section. The modelling section will be marked _assuming_ they got this filtering method correct. However, if another issue pops up due to this, there will be another penalty applied. Please get this right!

- Solution: Student should filter for only `payment_type=1` and now, the student can (hopefully) conduct correct analysis on `tip_amount`.

Several students over the past few years have lost many marks for simple rules like this (especially `tip_amount`).

### Readable Code
- We will be assessing the quality of your code and how you present it in your notebooks. 
- This is because there is no point writing code that cannot be easily interpreted. At the end of the day, employers and clients are not only paying for your analysis, but also the corresponding code. 
- If your code is confusing or difficult to read, there is little chance your client will come back to you.

**Variable Names:**  
As long as you are consistent, then it is fine. For example, commit to either using:
- Snake Case: words are seperated by underscores such as `variable_name`
- Camel Case: words are seperated by captials such as `variableName`

Your variables should be contextual and describe the code. That is, try to name your variables to be understandable **without comments**.

**Comments and Docstrings (w.r.t JupyterNotebook Cells):**  
Cells in Jupyter Notebook should aim to do one "block of logic" at a time (i.e importing libraries, defining functions, filtering rows, etc).
- If it takes a reader more than a few seconds to understand your cell, you need comments.
- Your functions need to have docstrings describing what they do. If you forgot, search it online or go visit your COMP10001 Grok course.
- Use markdown cells for longer comments or explaining logic, inline comments in code for short descriptions of hard-to-understand code.

We won't ask you to run `flake8` or `pylint` on your notebooks. We just ask for good comments in the code and markdown cells, reasonable variable names, and clean directories.