# Data Engineering tasks
This file containes the solutions with corresponding description for all the 3 tasks.

## SparkSession initialization
Defining the entry point into all functionality in Spark by creating a `SparkSession` class. 

In [1]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
            .appName("Data Engineering Test")
            .master("local[*]")
            .getOrCreate()

spark = org.apache.spark.sql.SparkSession@26c472c9


## 1. Sales and rentals broadcast rights
In task 1 the two datasets, i.e. `whatson` and `started_streams` are joined based on two joint conditions. The first condition implies that the variable `house_number` should be the identical in both datasets. The other condition requires the countries to be the same. Then, the requested columns are selected from the joined dataset and the same dataset is filtered making the date from the first dataset(`started_streams`) being in the broadcast_right range of whatson data. 

### 1.1 DataFrame and extraction of requested data
Two `DataFrame`'s are created based on the content of `whatson.csv` and `started_streams.csv` files, and named `whatson_df` and `streams_df` respectively.
To be able to join them based on the `house_number` and `country_code`(`broadcast_right_region` for `whatson_df`) the values in the data has to be comparable (e.g. se should map to Sweden).
In order to achieve that, `countryCodeMap` maps a country to its corresponding code to make it possible to compare the countries of the two datasets.
The dataframes are filtered based on the `broadcast_right_vod_type` for `whatson_df` and `product_type` for `streams_df` so that only the data with `svod` or `tvod` is kept.

In [2]:
import org.apache.spark.sql.functions.lower
import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.Column

val countryCodeMap: Column = typedLit(Map(
  "Norway" -> "no",
  "Sweden"-> "se",
  "Denmark" -> "dk",
  "Finland" -> "fi"
))

// Input
val whatson_df = spark.read.format("csv")
                .option("header", "true")
                .load("data/whatson.csv")
                .withColumn("broadcast_right_vod_type",lower($"broadcast_right_vod_type"))
                .filter($"broadcast_right_vod_type" === "tvod" || $"broadcast_right_vod_type" === "svod")
                .withColumn("broadcast_right_region", countryCodeMap($"broadcast_right_region"))
//Input
val streams_df = spark.read.format("csv")
                .option("delimiter",";")
                .option("header", "true")
                .load("data/started_streams.csv")
                .filter($"product_type" === "tvod" || $"product_type" === "svod")


99829
875224


countryCodeMap = keys: [Norway,Sweden,Denmark,Finland], values: [no,se,dk,fi]
whatson_df = [dt: string, house_number: string ... 6 more fields]
streams_df = [dt: string, time: string ... 9 more fields]


[dt: string, time: string ... 9 more fields]

### 1.2 Join 
The inner join is performed based on a `house_number` and country.
After that, the majority of respective columns from `streams_df` are selected and only keeping `broadcase_right` columns from `whatson`, given the desired outcome of the task.  
For the simplicity reasons, `whatson_df` and `streams_df` got aliases `df1` and `df2` respectively.

In [3]:
val joined_df = whatson_df.as("df1").join(streams_df.as("df2"),
                               $"df1.house_number"===$"df2.house_number"&&
                               $"df1.broadcast_right_region"===$"df2.country_code", "inner")
                            .select("df2.dt",
                                    "df2.time",
                                    "df2.device_name",
                                    "df2.house_number",
                                    "df2.user_id",
                                    "df2.country_code",
                                    "df2.program_title",
                                    "df2.season",
                                    "df2.season_episode",
                                    "df2.genre",
                                    "df2.product_type",
                                    "df1.broadcast_right_start_date",
                                    "df1.broadcast_right_end_date")
                            


1571021


joined_df = [dt: string, time: string ... 11 more fields]


[dt: string, time: string ... 11 more fields]

### 1.3 Filtering out old dates 
The final dataframe is obtained by filtering out the `dt`(dates from `started_streams`) that are outiside of range `broadcast_right_start_date` and `broadcast_right_end_date`. 
Lastly, the results are saved localy.

In [8]:
import org.apache.spark.sql.functions._
val final_broadcast_df = joined_df.filter($"dt".between($"broadcast_right_start_date",
                                                        $"broadcast_right_end_date"))
                                  .distinct() // Removes duplicates, 931 268 -> 109 642

final_broadcast_df.write
                  .csv("task1/output")


final_broadcast_df = [dt: string, time: string ... 11 more fields]


[dt: string, time: string ... 11 more fields]

## 2. Product and user count
In task 2, each product is getting the number of total views presented in column `content_count` and the number of unique users in column `unique_users`.
In both cases, function `groupBy` is used to group the users depending on the same values from a column `program_title`. For the `content_count` a simple call to `count()` function is enough to calculate the amount of views each `program_title` has. On the other side, calculating the unique users per each `program_title` reqired a call to `agg()` function in order to be able to sum up only the unique users watching respetive program.
Those two results are later joined in the `streams_df` dataframe to get the final table with all columns included. 

In [9]:
import org.apache.spark.sql.functions._

//Input
val streams_df = spark.read.format("csv")
                .option("delimiter",";")
                .option("header", "true")
                .load("data/started_streams.csv")

val watches_count = streams_df.groupBy("program_title")
                              .count()
                              .withColumnRenamed("count", "content_count")

val unique_users = streams_df.groupBy("program_title")
                             .agg('program_title, countDistinct('user_id).as("unique_users"))

val streams_counts_df =  streams_df.join(watches_count, Seq("program_title"))
val product_user_df =  streams_counts_df.join(unique_users,Seq("program_title"))
                                        .select("dt",
                                                "program_title",
                                                "device_name",
                                                "country_code",
                                                "product_type",
                                                "unique_users",
                                                "content_count")
                                        .distinct() // removes duplicates, 100 000 -> 18 454


product_user_df.write
               .csv("task2/output")


streams_df = [dt: string, time: string ... 9 more fields]
watches_count = [program_title: string, content_count: bigint]
unique_users = [program_title: string, program_title: string ... 1 more field]
streams_counts_df = [program_title: string, dt: string ... 10 more fields]
product_user_df = [dt: string, program_title: string ... 5 more fields]


[dt: string, program_title: string ... 5 more fields]

## 3. Genre and time of day
In the final task, the goal is to calculate the number of unique users per genre and watched hour and finally rank according to the count of unique users.
First, the data grouped on `genre` and `watched_hour` and than `agg()` function is used to calculate the number of unique users per each pair of `genre` and `watched_hour`. The outcome is later joned with `streams_df` (dataframe for `started_streams` data) and desired columns are selected for final output.


In [10]:
import org.apache.spark.sql.functions._

// Input
val streams_df = spark.read.format("csv")
                .option("delimiter",";")
                .option("header", "true")
                .load("data/started_streams.csv")
                .withColumn("time", hour($"time"))
                .withColumnRenamed("time", "watched_hour")
    
// Grouping the data based on numbers of distinct users per watched hour and genre
val unique_users_hour_genre = streams_df.groupBy("genre","watched_hour")
                             .agg('genre, countDistinct('user_id).as("unique_users"))

val users_per_genre_hour =  streams_df.join(unique_users_hour_genre, Seq("genre","watched_hour"))
          .orderBy(desc("unique_users"))
          .select("watched_hour",
                  "genre",
                  "unique_users")
          .distinct() //Removes duplicates, 100 000 -> 192
         

users_per_genre_hour.write
              .csv("task3/output")



streams_df = [dt: string, watched_hour: int ... 9 more fields]
unique_users_hour_genre = [genre: string, watched_hour: int ... 2 more fields]
users_per_genre_hour = [watched_hour: int, genre: string ... 1 more field]


[watched_hour: int, genre: string ... 1 more field]