# Introduction to Apache Beam API

This notebook contains interactive exercises which introduces to [**Apache Beam**](https://beam.apache.org/) API which are needed for doing Differentially Private (`DP`) computations. 

In this notebook, we will first familiarise with Apache Beam and its [Python SDK](https://beam.apache.org/documentation/sdks/python/) by walking through multiple exercises of increasing complexity. 

**Don't worry**: if you are not yet familiar with Apache Beam and its Python APIs, each exercise will include a _Hints_ subsection that will point you in the right direction, suggesting what you should be looking at in the documentation to solve the exercise. Then, if you are still struggling with the exercise - or simply to compare your results - you can look at the attached _Solution_.

In the next notebook, we will use Apache Beam to implement a fully _differentially private_ data analysis pipeline.

## Instructions

We recommend opening and running this notebook in **Google Colab** by simply clicking on the badge below.

[![Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/PipelineDP/blob/main/docs/tutorial_1/1_beam_introduction.ipynb)

[![nbviewer](https://raw.githubusercontent.com/jupyter/design/master/logos/Badges/nbviewer_badge.svg)](https://nbviewer.jupyter.org/github/OpenMined/PipelineDP/blob/main/docs/tutorial_1/1_beam_introduction.ipynb)

[![GitHub](https://img.shields.io/badge/view%20on-GitHub-blue)](https://github.com/OpenMined/PipelineDP/blob/main/docs/tutorial_1/1_beam_introduction.ipynb)

Once in Colab, here is what you should do: 

1. Click "Run in Google Colab" link above
2. Make a copy in Drive (File $\mapsto$ Save a copy in Drive)
3. Run all the cells in **Setup** area
4. Enjoy the exercises :)

In [1]:
#@title Setup: Install and import `apache-beam` 
from IPython.display import clear_output
try:
    import apache_beam as beam
except ImportError:
    !pip install "apache-beam[interactive]"    
    clear_output()
finally:
    import apache_beam as beam
    print("apache_beam package available and imported correctly ✅")
    print(f"Apache Beam Python SDK version: {beam.__version__}")

apache_beam package available and imported correctly ✅
Apache Beam Python SDK version: 2.28.0


## Dataset

In this notebook we will be using the [**Netflix Prize Dataset**](https://en.wikipedia.org/wiki/Netflix_Prize) as a driver example.

> The Netflix Prize was an open competition for the best collaborative filtering algorithm to predict user ratings for films, based on previous ratings without any other information about the users or films, i.e. without the users or the films being identified except by numbers assigned for the contest.

(_Source_: Wikipedia)

The choice of this dataset is particularly interesting from our perspective of _differentially private_ data pipelines, since this dataset was used as the reference example to present a new class of statistical de-anonymisation attacks against high-dimensional data. 
More details on these techniques, and their application to the `Netflix Prize` dataset are available in this [article](https://www.schneier.com/blog/archives/2007/12/anonymity_and_t_2.html), and in the original paper $\Rightarrow$ [**Robust De-anonymization of Large Sparse Datasets**](https://www.cs.utexas.edu/~shmat/shmat_oak08netflix.pdf).

### The `Netflix-Prize` Dataset

The Netflix Prize dataset (`Netflix-prize`) is available on [Kaggle](https://www.kaggle.com/netflix-inc/netflix-prize-data), so to download the dataset we need to configure Python [`kaggle`](https://pypi.org/project/kaggle/) official API.

**(A)** Let's first setup the `DATASET_FOLDER` where the dataset will be downloaded and saved:

In [2]:
#@title Run to set up Data and Output folders { display-mode: "form" }
import os
from pathlib import Path


NETFLIX_PRIZE_FOLDER = Path("netflix_prize_dataset")
DATASET_FOLDER = NETFLIX_PRIZE_FOLDER / "data"
OUTPUT_FOLDER = NETFLIX_PRIZE_FOLDER  / "outputs"

DATA_FILE = str(DATASET_FOLDER / "movie_views.txt")
OUTFILE_TEMPLATE = str(OUTPUT_FOLDER / '{}.txt')

os.makedirs(DATASET_FOLDER, exist_ok=True)
os.makedirs(OUTPUT_FOLDER, exist_ok=True)

**(B)** Download the `Netflix-prize` dataset from Kaggle

In [3]:
#@title Run to setup `kaggle` Python API { display-mode: "form" }
try:
    import kaggle
except ImportError:
    !pip install kaggle
    clear_output()
except OSError:
    print("Kaggle package is available, but credentials needs to be setup.")
else:
    print("kaggle package imported correctly ✅")

kaggle package imported correctly ✅


**Create and place your API token**

In order to download datasets using the Kaggle API, please create and download your API token from Kaggle website:

1. Log in to [Kaggle](https://www.kaggle.com/) website.
2. Go to your **Account** (locate your profile avatar in the top-right corner of the page)

![kaggle-account](images/kaggle_account.png)

3. Scroll down your account page until you find the **API cell**.
4. Click on to "Create new API token" button.

![kaggle-api-key](images/api_key.png)

This will download a file called `kaggle.json`. This file contains the credentials required to use the Kaggle APIs. 

5. Open this file, and include this data into the form below:

In [4]:
#@title Setup Kaggle API credentials { display-mode: "form" }
from ipywidgets import Password
from ipywidgets import widgets
from IPython.display import display
from IPython.display import Image
import os
import stat
import json

kaggle_api_credentials = os.path.expanduser('~/.kaggle/kaggle.json')

API_CREDENTIALS_EXIST = os.path.exists(kaggle_api_credentials) 

if not API_CREDENTIALS_EXIST:
    user = widgets.Text(
        placeholder='Insert your Kaggle User Name!',
        description='User Name',
        disabled=False
    )

    pwd = Password(
        description='Key',
        placeholder='Insert key for user {}'.format(user.value)
    )

    def store_kaggle_credentials(wdgt):
        folder = os.path.expanduser('~/.kaggle')
        os.makedirs(folder, exist_ok=True)
        file = '{}/kaggle.json'.format(folder)
        with open(file, 'w') as kaggle_file:
            json.dump({
                'username': user.value,
                'key': wdgt.value
            }, kaggle_file)

            print("Credentials created in {} ✅".format(folder))
        os.chmod(file, stat.S_IREAD | stat.S_IWRITE)

    user.on_submit(lambda wdg: display(pwd))
    pwd.on_submit(store_kaggle_credentials)

    display(user)
else:
    print("Kaggle API credentials found ✅ - All set, Ready to start!")

Kaggle API credentials found ✅ - All set, Ready to start!


In [5]:
#@title Authenticate on Kaggle via APIs { display-mode: "form" }
import kaggle

kaggle.api.authenticate()

In [6]:
#@title **Search** on Kaggle for available Netflix-Prize datasets { display-mode: "form" }
!kaggle datasets list -s "netflix-prize"

ref                                                    title                                             size  lastUpdated          downloadCount  voteCount  usabilityRating  
-----------------------------------------------------  -----------------------------------------------  -----  -------------------  -------------  ---------  ---------------  
netflix-inc/netflix-prize-data                         Netflix Prize data                               683MB  2019-11-13 18:39:37          45080        878  0.7647059        
stephanerappeneau/350-000-movies-from-themoviedborg    350 000+ movies from themoviedb.org               67MB  2017-10-12 19:49:17           4592        147  0.7647059        
gspmoreira/articles-sharing-reading-from-cit-deskdrop  Articles sharing and reading from CI&T DeskDrop    8MB  2017-08-27 21:33:01           9319        127  0.8235294        
arashnic/book-recommendation-dataset                   Book Recommendation Dataset                       24MB  2020

The top on the list is the dataset we are looking for: `netflix-inc/netflix-prize-data`.

The dimension of the archive is `638 MB`, which means it may take some time to download and uncompress. (⚠️ **this may take sometime, depending on your internet connection**) Please keep this in mind while you will run the next cell.

In [7]:
#@title Run this cell to initiate the download { display-mode: "form" }
if not len(os.listdir(DATASET_FOLDER)) or not os.path.exists(DATA_FILE):
    # Checking here to avoid download multiple times in case of notebook re-run
    kaggle.api.dataset_download_files("netflix-inc/netflix-prize-data", 
                                      path=DATASET_FOLDER, unzip=True, quiet=False)

  0%|          | 1.00M/683M [00:00<01:24, 8.46MB/s]

Downloading netflix-prize-data.zip to netflix_prize_dataset/data


100%|██████████| 683M/683M [01:24<00:00, 8.50MB/s] 





In [8]:
#@title **Check** that everything was downloaded ok { display-mode: "form" }
os.listdir(DATASET_FOLDER)

['movie_titles.csv',
 'qualifying.txt',
 'combined_data_2.txt',
 'combined_data_3.txt',
 'combined_data_1.txt',
 'combined_data_4.txt',
 'README',
 'probe.txt']

#### `Netflix-prize` data files

Before getting into the actual processing of this dataset with **Apache Beam**, let's briefly describe the actual data files included in the Kaggle archive. 

The dataset is organised into multiple files, accounting for over `100` million ratings from `480` thousand
randomly-chosen, anonymous Netflix customers, over `17` thousand movie titles. 

- The `probe.txt` and the `qualitfying.txt` files are needed for the submission on Kaggle, so discarded as not interesting here.

- Movie information (i.e. _metadata_) are contained in the `movie_titles.txt` in the following format:

```
MovieID,YearOfRelease,Title

- MovieID is a general (randomised) ID for movies, rangin from 1 to 17,770.
- YearOfRelease can range from 1,890 to 2,005.
- Title (in English) is the Netflix title.
```

- The various `combined_data_*.txt` contain the actual data in the following format:

```
MovieID:
CustomerID,Rating,Date
CustomerID,Rating,Date
....

- MovieIDs matches those in the list of "movie_titles.txt".
- CustomerIDs range from 1 to 2,649,429, with gaps. There are 480,189 users.
- Ratings are on a five star (integral) scale from 1 to 5.
- Dates have the format YYYY-MM-DD.
```

A more thorough description of each of those files is available in the [`README`](netflix_prize_dataset/data/README) file. 


The total dimension of the whole dataset is around `2GB` - considering all the `combined_data_*.txt` data files together - and computing times are prohibitive when executed interactively in Colab. 

Therefore, without any loss of generality, we will use only `combined_data_1.txt` as for the reference data file (i.e. `movie_views.txt`)

#### Preparing our dataset 

The size of the full and complete dataset is around `2GB`, if combining together all the `combined_data_` files. This means that we won't be able to process the entire collection of data as quickly and interactive as we would hope while running a notebook on our laptop - despite all the boosting with `beam` we will be setting up, nonetheless. 

Therefore we will limit our analysis to a reduced version of the dataset. 

However, you'll get to choose how big this dataset will be, depending on the resources you have on your computer, and the amount of time you'd like to wait for the computation to finish :)

In [36]:
#@title Let's see how many lines each data file contains { display-mode: "form" }
data_files = filter(lambda f: f.startswith("combined_data_"), os.listdir(DATASET_FOLDER))
lines_count = 0
for data_file in sorted(data_files):
    file_path = DATASET_FOLDER / data_file
    wc_l = !wc -l $file_path
    lines, _ = wc_l[0].strip().split()
    lines_count += int(lines)
    print(f"{data_file}:  {int(lines):,}")
print(f"Total Lines Count  : {lines_count:,}")

combined_data_1.txt:  24,058,263
combined_data_2.txt:  26,982,302
combined_data_3.txt:  22,605,786
combined_data_4.txt:  26,851,926
Total Lines Count  : 100,498,277


From these statistics, we can immediately notice that each data file roughly contains the same number of lines, which means that the amout of time required to process each one of them will be more or less the same.

However, even just limiting our analysis to one single file (i.e. `>20M` lines) might be prohibitive, and indeed beyond of the scope of this tutorial. The **main goal** of the tutorial is to demostrate the ability of `apache_beam` to scale up the computation on large dataset. For real scalability, we also need decent computation capacity. 

Therefore, let's decide how many lines we want our _reduced_ dataset to be (**default**: `1M` lines). 

➡️ If you are up to process a much larger dataset, please feel free to choose a different number of lines.

In [69]:
#@title Choose the size (in lines) of the Netflix-prize Dataset used {display-mode: "form"}

# Note: this piece of code is a bit weird when read into a notebook, but it makes much
# more sense if run into Google Colab as it uses the new Form feature
DATASET_SIZE = "1M lines" #@param ["10K lines", "100K lines", "500K lines", "1M lines", "5M lines", "10M lines", "15M lines", "20M lines", "Full Dataset"] {type: "string"}

if DATASET_SIZE == "Full Dataset":
    DS_LINES = 24058263  # whole combined_data_1.txt file
else:
    DS_LINES = int(DATASET_SIZE.replace(" lines", "").replace("K", "000").replace("M", "000000"))

print("Selected Number of Lines: {:,}".format(DS_LINES))

Selected Number of Lines: 1,000,000


In [70]:
#@title **Create** the reference data file `movie_views.txt` { display-mode: "form" }

!head -n $DS_LINES $DATASET_FOLDER/combined_data_1.txt > $DATA_FILE 

In [71]:
!du -h $DATA_FILE

 20M	netflix_prize_dataset/data/movie_views.txt


✅ Well done! 

The setup of the `Netflix-prize` dataset is now complete! Now it is time to start setting up our execution framework based on **Apache Beam**.

---

## The Execution Framework

In this section we will define the core main components that will be used throughout the exercises. These components will be based on **Apache Beam**, which consitutes the reference computational framework, and provides the building blocks to define our data workflows.  

### `apache_beam` in a Nutshell

Apache Beam adheres to a programming model which is centred around a few key elements:

* `Pipeline`: determines the **main** _data processing workflow_ as defined as a _sequence_ of **operations** on the data. This sequence is internally encoded as a directed acyclic graph in which nodes and arcs correspond to instances of `PCollection` and `PTransform`, respectively. 


- `PCollection`: represents a generic (**parallel**, `P`) data collection that can be either _bounded_ or _unbounded_ in size.


- `PTransform`: encodes a single (transform) operation. Each single `PTransform` accepts a `PCollection` in input, and it is expected to return a new `PCollection`. Beam provides some [**core**](https://beam.apache.org/documentation/programming-guide/#core-beam-transforms) transformers (e.g. [`ParDo`](https://beam.apache.org/documentation/programming-guide/#pardo), [`Flatten`](https://beam.apache.org/documentation/programming-guide/#flatten), and [`Partition`](https://beam.apache.org/documentation/programming-guide/#partition)) which correspondo to specific processing paradigms.


- `Runner`: (or `PipelineRunner`) determines the computational environment in which the whole data processing will be executed. 

Finally, pipelines I/O operations are supported by the `apache_beam` framework as core primitives (i.e. `PTransform`) to `Read` and `Write` from and to external sources.

$\Rightarrow$ More information on the Basics of [Beam Model](https://beam.apache.org/documentation/basics/) and on [Pipeline I/O](https://beam.apache.org/documentation/programming-guide/#pipeline-io) are available in the online documentation.

### Core Execution Modules on `Netflix-Prize` dataset

Despite of its simplicity, the reference programming model in Apache Beam accounts for one very important feature: **composability**. In fact, it is possible to either re-use (part of) a `Pipeline` within other and more complex workflows, or simply chaining multiple pipelines together. 

With specific reference to the `Netflix-prize` dataset, the general default workflow is composed by the following steps:

1. **[S1]** Read from the `DATA_FILE`, and generate the reference `PCollection`
    - `S1.1` A proper **data abstraction** is necessary to encapsulate the information for each _movie view_;
    - `S1.2` Data I/O will be performed in a parallel fashion (i.e. `ParDo`), leveraging on one of the most important feature of Apache Beam: _Data Parallelization_. 


2. **[S2]** Pass on the generated `PCollection` to the core (_abstract_) processing module 
    - `S2.1` Each core module will be expected as a _callable_ (implemented in the following exercises) to return the final `PCollection` to store and save (see `S3`).

3. **[S3]** Write the results to the corresponding `OUTPUT_FILE`.

#### **`S1.1`** Data Abstraction

To encapsulate the information for each _movie view_, we will make use of Python 3.7+ `dataclass` (see [doc](https://docs.python.org/3.8/library/dataclasses.html)). 

Our reference `MovieView` dataclass model is very simple, and will represent the atomic **data object** included in the `netflix-prize` `PCollection`. 

To simplify objects instantiation, the `date_stamp` init attribute (i.e. [`InitVar`](https://docs.python.org/3.8/library/dataclasses.html#init-only-variables)), will be later converted with proper data type and format in the `date` field.

In [72]:
from dataclasses import dataclass, InitVar
from datetime import datetime

@dataclass
class MovieView:
    user_id: int
    movie_id: int
    rating: int
    date_stamp: InitVar[str]
    date: datetime.date = None
    
    def __post_init__(self, date_stamp):
        if self.date is None and date_stamp is not None:
            self.date = datetime.strptime(date_stamp, '%Y-%m-%d')

##### Python Type Safety and Custom objects

The Apache Beam SDK for Python uses type hints during pipeline construction and runtime to try to emulate the correctness guarantees achieved by true static typing. Additionally, using type hints lays some groundwork that allows the backend service to perform efficient type deduction and registration of `Coder` objects.

Since in this tutorial we want to handle our own custom **data object**, we also need to implement our custom (_yet simple_) `beam.coders.Coder` to allow for a correct object _serialisation_.

**Note**: This step is particularly crucial during the execution (_in parallel_) of the Pipeline, to avoid `apache_beam` to fallback to the default `Serializer` (i.e. `Coder`) based in `pickle`.
More information on **Python Type Safety** in Apache Beam pipelines is available in the official [documentation](https://beam.apache.org/documentation/sdks/python-type-safety/).

In [73]:
class MovieViewCoder(beam.coders.Coder):
    """Beam Coder Serialiser for our custom MovieView Data Object"""
    def encode(self, view):
        return (f"{view.user_id}:{view.movie_id}:{view.rating}:{str(view.date.date())}").encode("utf-8")

    def decode(self, s):
        return MovieView(*s.decode("utf-8").split(':'))

    def is_deterministic(self):
        return True

# Once defined, we can register the new Coder
beam.coders.registry.register_coder(MovieView, MovieViewCoder)

##### Workaround `multiprocessing` issues with namespace

⚠️ **Note**: As a workaround to enable _parallel computation_ in the notebook via Python `multiprocessing` and `beam.DoFn` (see next section), the dataclass and its coder will be saved into an external module (i.e. `movie_view.py`) for later use via **direct import**. This will guarantee a proper object serialisation, while also adhering to **Apache Beam** requirements. See [Requirements for writing user code for beam transforms](https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms) for more information.

In [74]:
%%bash 

echo -e "
from dataclasses import dataclass, InitVar
from datetime import datetime
import apache_beam as beam


@dataclass
class MovieView:
    user_id: int
    movie_id: int
    rating: int
    date_stamp: InitVar[str]
    date: datetime.date = None
    
    def __post_init__(self, date_stamp):
        if self.date is None and date_stamp is not None:
            self.date = datetime.strptime(date_stamp, '%Y-%m-%d')
            

class MovieViewCoder(beam.coders.Coder):
    \"\"\"Beam Coder Serialiser for our custom MovieView Data Object\"\"\"
    def encode(self, view):
        return (f'{view.user_id}:{view.movie_id}:{view.rating}:{str(view.date.date())}').encode('utf-8')

    def decode(self, s):
        return MovieView(*s.decode('utf-8').split(':'))

    def is_deterministic(self):
        return True

# Once defined, we can register the new Coder
beam.coders.registry.register_coder(MovieView, MovieViewCoder)
" > movie_view.py

In [75]:
#@title **Verify** that import from movie_view module works properly { display-mode: "form" } 
try: 
    from movie_view import MovieView
except ImportError:
    print("movie_view module cannot be imported. Please check!")
else:
    print("MovieView dataclass has been successfully imported.")

MovieView dataclass has been successfully imported.


#### **`S1.2`** `beam.ParDo` handler

The [`beam.ParDo`](https://beam.apache.org/releases/pydoc/2.28.0/apache_beam.transforms.core.html#apache_beam.transforms.core.ParDo) processing paradigm is very similar to the `Map` phase of a `Map`/`Shuffle`/`Reduce`-style algorithm. 

The [`ParDo`](https://beam.apache.org/documentation/transforms/python/elementwise/pardo/) transformer is executed in parallel elementwise to the `PCollection`, via custom [`beam.DoFn`](https://beam.apache.org/releases/pydoc/2.28.0/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn) callable.

In [76]:
@beam.typehints.with_output_types(MovieView)  # type-hint annotation for the output
class ParseMovieViews(beam.DoFn):
    """ParDo main execution core. Each line of the DATA_FILE will be processed, 
    and an instance of MovieView will be created and returned, accordingly."""
    
    def __init__(self):
        self.movie_id: int = -1
    
    def process(self, line: str):
        from movie_view import MovieView
        line = line.strip()  # get rid of any useless tabulation
        if not line:
            return  # Skip Line
        if line.endswith(":"):
            movie_id, _ = line.split(":")
            self.movie_id = int(movie_id)
            return  # Automatically skip this line
        user_id, rating, date_stamp, *_ = line.split(',')
        user_id, rating = int(user_id), int(rating)
        yield MovieView(user_id, self.movie_id, rating, date_stamp)

The `ParseMovieViews` callable will be used to generate the initial reference `PCollection` for the `Netflix-prize` dataset

In [77]:
def netflix_movie_views_collection(p: beam.Pipeline, data_file: str = DATA_FILE) -> beam.PCollection:
    """Generate the initial (Parallel) Collection of 
    `MovieView` objects as extracted from the input `data_file`"""
    return (
        p
        | beam.io.ReadFromText(data_file)
        | beam.ParDo(ParseMovieViews()).with_output_types(MovieView)
    )

#### **`S2.1`** The core `beam.Pipeline`

Now it is finally the time to create the whole `beam.Pipeline` infrastructure, that will allow pluggable callables to be used (as implemented in the following Exercises).
The Pipeline will be also configured to leverage as much as possible parallel execution in a way that we could circumvent the Python [**GIL**](https://wiki.python.org/moin/GlobalInterpreterLock).

In [78]:
#@title **Check** Python multiprocessing settings { display-mode: "form" }
import multiprocessing as mp

print("Python Multiprocessing Start method: ", mp.get_start_method())
print("Multiprocessing available Cores: ", mp.cpu_count())

Python Multiprocessing Start method:  spawn
Multiprocessing available Cores:  8


**Define `PipelineModule` type for accepted callables**

(The type hints is mainly for design and documentation purpose)

In [79]:
from typing import Callable

# A PipelineModule is defined as a Callable that accepts any parameter
# and returns a beam.PCollection` instance
PipelineModule = Callable[[beam.PCollection], beam.PCollection]

**Run Pipeline**

In [93]:
import time
from apache_beam.runners import DirectRunner
from apache_beam.options.pipeline_options import PipelineOptions

def run_pipeline(pipeline_fn: PipelineModule, mode: str = "multi_processing", 
                 data_file: str = DATA_FILE):
    start_time = time.time()
    fn_name = pipeline_fn.__name__
    outfile = str(OUTFILE_TEMPLATE).format(fn_name)  # output logfile

    # Pipeline Options
    pipeline_options = PipelineOptions.from_dictionary(
        {
            "direct_num_workers": 0,  # setting parallelism to CPU count
            "direct_running_mode": mode,  # work around GIL
            "job-server-timeout": 1048576,  # used to avoid potential timeout interruptions of workers
        }
    )
    with beam.Pipeline(runner=DirectRunner(), options=pipeline_options) as p:
        views_coll = netflix_movie_views_collection(p, data_file=data_file)
        pipeline_fn(views_coll) | "Write result" >> beam.io.WriteToText(
            outfile, shard_name_template=""
        )
    # Timing
    elapsed_time = time.time() - start_time
    hours, remainder = divmod(elapsed_time, 3600)
    minutes, seconds = divmod(remainder, 60)
    print(f"Elapsed time in {fn_name} is {hours:2.0f}H {minutes:2.0f}m {seconds:2.0f}s")

In [94]:
from apache_beam.transforms import combiners

In [82]:
!tail -n 10 $DATA_FILE

428688,3,2005-01-31
2225578,2,2005-01-31
1656194,2,2005-02-01
1768021,3,2005-02-08
747879,2,2005-02-08
1196927,3,2005-02-15
528854,5,2005-02-21
962705,3,2005-02-22
1299323,2,2005-02-24
2026970,4,2005-02-27


In [None]:
# Number of§a
# The exercise framework calls this funciton with |views| representing all MovieViews in the dataset.
# |views| has type PCollection, which is a parrallel collection.
# Beam pipelines consist of applying PTransforms (i.e. parallel transformations) on PCollections
def count_all_views(views: beam.PCollection): 
   # This is an example of applying PTransform
   # Beam uses | (pipe) syntax:
   # new_p_collection = p_collection | "Optional comment" >> PTransform
   return views | "Count all views" >> combiners.Count.Globally().with_input_types(MovieView)

In [86]:
!head -n 10000000 $DATASET_FOLDER/combined_data_1.txt > $DATASET_FOLDER/tenmillionlines.txt 

In [87]:
!du -h $DATASET_FOLDER/tenmillionlines.txt 

201M	netflix_prize_dataset/data/tenmillionlines.txt


In [90]:
run_pipeline(count_all_views, mode="multi_processing", data_file=str(DATASET_FOLDER/"tenmillionlines.txt"))

Elapsed time in count_all_views is  0H  1m 22s


In [91]:
run_pipeline(count_all_views, mode="in_memory", data_file=str(DATASET_FOLDER/"tenmillionlines.txt"))



Elapsed time in count_all_views is  0H  5m 57s


In [95]:
run_pipeline(count_all_views, mode="multi_threading", data_file=str(DATASET_FOLDER/"tenmillionlines.txt"))



Elapsed time in count_all_views is  0H  6m 15s


In [96]:
print("\nFirst lines of the output file:")
!head -1 $OUTPUT_FOLDER/"count_all_views.txt"


First lines of the output file:
9998038


---

## Exercises

Each exercise it to write an Apache Beam pipeline, which does some transformations on a part of Netflix prize dataset data (the first 10K lines). 

The completion of the exercises requires **additional research** on usage of Beam API with a search engine or looking at the offcial [API reference](https://beam.apache.org/documentation/sdks/python/). 

Let's start by looking at first lines of the dataset. 

In [106]:
# There are 2 types of lines '<movie_id>:' and '<user_id>, <rating>, <date>'. 
# '<movie_id>:' says that the following views have movie ids <movie_id>
# Rating is in 1..5
!head -5 $DATA_FILE

1:
1488844,3,2005-09-06
822109,5,2005-05-13
885013,4,2005-10-19
30878,4,2005-12-26


In [107]:
from apache_beam.transforms import combiners

In [108]:
mp.get_start_method()

'spawn'

In [None]:
print("\nFirst lines of the output file:")
!head -1 $OUTPUT_FOLDER/"count_all_views.txt"

The exercise framework function **run_pipeline** executes a user defined pipeline function.
The output of the pipeline is written in a file 'content/outputs/(pipeline_fn name)'.

Each exercise has a solution. 

The exercises don't assume prior knowledge of Apache Beam. This Colab shows basically all Beam API which is needed for working on Privacy Beam. It assumes that neccassary API is searched on the Internet.

In all exercises elements of the input collection are MovieView objects:



Let's start with an example: the goal is to write a pipeline function for computing number of records in the dataset. The example code is in the following code cell:

## Exercise 1

Write a pipeline function for computing distinct movies IDs in the dataset



In [42]:
def distinct_movies_ids(views):
    # TODO: write your code here
    pass

run_pipeline(distinct_movies_ids)

print("\Content of the output file:")
!cat $OUTPUT_FOLDER/"distinct_movies_ids.txt"

IndentationError: expected an indented block (<ipython-input-42-8486879f73cd>, line 4)

In [None]:
#@title Hint (double click to open)
# Search for beam.Map and beam.Distinct

In [42]:
#@title Solution (click on the arrow button to run, double click to open)
def distinct_movies_ids(views):
  movie_ids = views | beam.Map(lambda mv: mv.movie_id)
  return movie_ids | beam.Distinct()

run_pipeline(distinct_movies_ids)

print("\nContent of the output file:")
!cat $OUTPUT_FOLDER/"distinct_movies_ids.txt"

Elapsed time in distinct_movies_ids is  0H  0m  3s

Content of the output file:
8
7
5
2
4
6
1
3


## Exercise 2

Write a pipeline function for computing the **number** of distinct movies ids in the dataset

In [None]:
def number_distinct_movies_ids(views):
  # TODO: write your code here
  # Hint: you can use the function from the previous exercise

run_pipeline(number_distinct_movies_ids)

print("\nContent of the output file:")
!cat "outputs/number_distinct_movies_ids"

In [None]:
#@title Solution (click on the arrow button to run, double click to open)
def number_distinct_movies_ids(views):
  return distinct_movies_ids(views) | combiners.Count.Globally()

run_pipeline(number_distinct_movies_ids)

print("\nContent of the output file:")
!cat "outputs/number_distinct_movies_ids"

## Exercise 3

Write a pipeline which computes the number of views per movie i.e. it outputsPCollection (movie_id, number of views).

In [None]:
def views_count_per_movie(views):
   # TODO: write your code

run_pipeline(views_count_per_movie)

print("\nContent of the output file:")
!cat "outputs/views_count_per_movie"

In [None]:
#@title Hint (double click to open)
# Search for beam combiners

In [None]:
#@title Solution
def views_count_per_movie(views):
  movie_ids = views | beam.Map(lambda mv: mv.movie_id)
  return movie_ids | "Count movies" >> combiners.Count.PerElement()

run_pipeline(views_count_per_movie)

print("\nContent of the output file:")
!cat "outputs/views_count_per_movie"

## Exercise 4

Write a pipeline which computes the average rating per movie, i.e. it outputs PCollection (movie_id, average rating).

In [None]:
def mean_rating_per_movie(views):
  # TODO: write your code here

run_pipeline(mean_rating_per_movie)

print("\nContent of the output file:")
!cat "outputs/mean_rating_per_movie"

In [None]:
#@title Solution

def mean_rating_per_movie(views):
  # 2 element tuples then tuples are treated as (key, value) in PCollections
  movie_rating = views | beam.Map(lambda mv: (mv.movie_id, mv.rating))
  return movie_rating | combiners.Mean.PerKey()

run_pipeline(mean_rating_per_movie)

print("\nContent of the output file:")
!cat "outputs/mean_rating_per_movie"

## Exercise 5

Write a pipeline which bounds number of views per user by |max_views|, it outputs PCollection(MovieView), such that for each user_id there are not more than |max_views| records.

In [None]:
def bound_number_of_views_per_user(views, max_views=2):
  # TODO: write your code here

run_pipeline(bound_number_of_views_per_user)

print("\nContent of the output file:")
!head -5 "outputs/bound_number_of_views_per_user"

print("\nNumber of lines (the correct number is 9930 for max_views=2):")
!wc -l "outputs/bound_number_of_views_per_user"

In [None]:
#@title Hint
# Search for combiners.Sample and beam.FlatMap

In [None]:
#@title Solution
def bound_number_of_views_per_user(views,
                                   max_views=2):
  user_view = views | beam.Map(lambda mv: (mv.user_id, mv))
  # user_id -> list of MovieView
  user_bounded_views = user_view | combiners.Sample.FixedSizePerKey(max_views)
  bounded_views = user_bounded_views | "Remove keys" >> beam.Values()
  return bounded_views | "Unnest lists" >> beam.FlatMap(lambda x: x)

run_pipeline(bound_number_of_views_per_user)

print("\nContent of the output file:")
!head -5 "outputs/bound_number_of_views_per_user"
print("\nNumber of lines:")
!wc -l "outputs/bound_number_of_views_per_user"

## Exercise 6
Write a pipeline which computes rating histogram per movie, i.e. 
it outputs PCollection((movie_id, histogram_tuple)), where histogram_tuple is 
the 5 elements tuple, which contains the number of views with corresponding rating.

This exercise is for learning how to create custom combiners (look for beam.CombineFn).

Note: this exercise is bigger than previous ones.

In [None]:
def movie_rating_histogram(views):
   # TODO: write your code here

run_pipeline(movie_rating_histogram)

print("\nContent of the output file:")
!head -10 "outputs/movie_rating_histogram"

In [None]:
#@title Solution
class SumTuplesCombiner(beam.CombineFn):
  def __init__(self, ndim):
    self.ndim = ndim

  def create_accumulator(self):
    return [0] * self.ndim

  def add_input(self, accumulator, element):
    for i, b in enumerate(element):
      accumulator[i] += b
    return accumulator

  def merge_accumulators(self, accumulators):
    res = self.create_accumulator()
    for a in accumulators:
      self.add_input(res, a)
    return res

  def extract_output(self, accumulator):
    return accumulator

def movie_rating_histogram(views):
  def one_hot(rating):
    return (rating == 1, rating == 2, rating == 3, rating == 4, rating == 5)

  movie_rating_vector = views | beam.Map(lambda mv: (mv.movie_id, one_hot(mv.rating)))
  return movie_rating_vector | beam.CombinePerKey(SumTuplesCombiner(ndim=5))

run_pipeline(movie_rating_histogram)

print("\nContent of the output file:")
!head -10 "outputs/movie_rating_histogram"

##Exercise 7

This exercise has 2 parts, the first one is about reading from files, the second is about using join.

In this exercise we will work with MovieTitle. The next code cell contains data class definition and code for parsing MovieTitle from string.


In [None]:
# MovieTitle class and parsing of Movie titles

@dataclass
class MovieTitle:
  movie_id: int
  year: int
  name: str

def parse_movie_title(line):
  if line.strip() == "": return []
  spl = line.split(',')
  movie_id = int(spl[0])
  year = -1 if spl[1] == "NULL" else int(spl[1])
  title = spl[2]
  return MovieTitle(movie_id, year, title)


### Exercise 7a
Read and parse a movie titles file and output movie names. It should output PCollection(str).

There is MOVIE_TITLES_FILE const with the path for the file. 

Hint: use MovieTitle and parse_movie_title from the previous cell.

In [None]:
def read_movie_names(_):  # _ is acctually PCollection of views, because the framework supports only views
  pipeline = _.pipeline  # a pipeline object is needed as "input" for reading operation 
   # TODO: write your code here

run_pipeline(read_movie_names)

print("\nContent of the output file:")
!head -10 "outputs/read_movie_names"

In [None]:
#@title Solution
def read_movie_names(_):
  pipeline = _.pipeline
  return (pipeline 
            | "Read movie titles" >> beam.io.ReadFromText(MOVIE_TITLES_FILE)
            | "Parse ">> beam.Map(parse_movie_title) 
            | "Extract name">> beam.Map(lambda title: title.name))

run_pipeline(read_movie_names)

print("\nContent of the output file:")
!head -10 "outputs/read_movie_names"

### Exercise 7b

This part of the exercise is for learning of using join (which is called CoGroupByKey in Beam).

Output PCollection((movie_name, number of views)), only for movies with views.

Hint: reuse the function for number of views per movie from Exercise 3

In [None]:
def movie_name_count_views(views):
  # TODO: 1. reading movie titles similar to 7a

  # TODO: 2. compute movie_id_counts

  # TODO: 3. join 1 and 2 (search for CoGroupByKey)

run_pipeline(movie_name_count_views)

print("\nContent of the output file:")
!head -10 "outputs/movie_name_count_views"

In [None]:
#@title Solution
def movie_name_count_views(views):
  movie_id_count = views_count_per_movie(views)
  pipeline = views.pipeline
  movie_titles = (pipeline 
            | "Read movie titles" >> beam.io.ReadFromText(MOVIE_TITLES_FILE)
            | "Parse ">> beam.Map(parse_movie_title)) 

  movie_id_title = movie_titles | "To (movie_id, title)" >> beam.Map(lambda t: (t.movie_id, t))

  # Join movie_id_count and movie_id_title
  COUNT_KEY = 0
  TITLES_KEY = 1
  # Element Format: (movie_id, {COUNT_KEY: [], TITLES_KEY: []})
  join_res = {COUNT_KEY: movie_id_count, TITLES_KEY: movie_id_title} | beam.CoGroupByKey()

  def process_join(movie_id_dict):
    if len(movie_id_dict[1][COUNT_KEY]) == 0: return [] # no views of this movie. It happens because only part of the views dataset is processed.
    yield (movie_id_dict[1][TITLES_KEY][0].name, movie_id_dict[1][COUNT_KEY][0])

  return join_res | beam.ParDo(process_join)

run_pipeline(movie_name_count_views)

print("\nContent of the output file:")
!head -10 "outputs/movie_name_count_views"

##Next
The next [colab](https://github.com/OpenMined/PipelineDP/blob/main/docs/tutorial_1/2_beam_laplace_mechansim.ipynb) introduces Laplace mechanism and how to use it.