# Data Engineer - Technical Assessment

In this section of the interview at Beyond Finance, you will be assessed on your ability to perform several Data Engineering tasks. To perform well on this task, you will demonstate competence in the following areas:

* preprocessing data to prepare for a database load
* understanding entity relationships in a database
* merging data from different tables
* filtering data to relevant subsets
* calculating aggregations and descriptive statistics

It will be pretty difficult to complete all questions in the allotted time. Your goal is not to speed through the answers, but to come up with answers that demonstrate your knowledge. It's more about your thought process and logic than getting the right answer or your code.


## Getting Started

This exercise will be broken into 2 parts
1. Data Processing
2. Data Wrangling

### Data Processing
In this section you will take files from the ./raw_data/ subfolders, combine them into a single newline-delimited `json.gz` file per subfolder, and place that CSV file in a ./processed_data/ directory. You may have to do some light investigation into the data files to understand their file formats and delimiters

**Example**

Files
- ./raw_data/tracks/tracks_0.csv
- ./raw_data/tracks/tracks_1.json
- ./raw_data/tracks/tracks_2.csv
- etc... 

should be combined into a single file ./processed_data/tracks.json.gz

**What we look for**

- Can you handle all subfolders in a single pass over the raw data files?
- How can you limit memory consumption? (hint `chunksize`)

### Data Wrangling
For this section, we'll pretend you loaded the raw data plus additional tables into a small SQLite database containing roughly a dozen tables. **We've provided this database for you so don't worry about loading it yourself**. If you are not familiar with the SQLite database, it uses a fairly complete and standard SQL syntax, though does not many advanced analytics functions. Consider it just a remote datastore for storing and retrieving data from. 

![](db-diagram.png)

## Data Processing (40 minutes)

In [1]:
import pandas as pd 
import os

#!pip install memory_profiler
%load_ext memory_profiler

In [12]:
%%memit
# ... your code here

## Create Processed Data directory 
if not os.path.exists('processed_data'):
    os.makedirs('processed_data')

## List of Folders
raw_data_dir = './raw_data/'
read_folders = os.listdir(raw_data_dir)



## Read  folder data and write to processed data folder with final output
def processed_data(folder):
    print("function started")
    final_df = pd.DataFrame()
    path = raw_data_dir + folder + '/'
    files = os.listdir(path)
    print(files)
    for j in range(len(files)):
        file = path + files[j]
        if file.endswith(".json"):
            df = pd.read_json(file,lines=True, chunksize=1000)
        elif file.endswith(".csv"):
            df = pd.read_csv(file,iterator=True, chunksize=1000)
        final_df = final_df.append(df)
    final_df.to_json(f'processed_data/{folder}.json.gz', orient='records', lines=True,compression='gzip')

## Multi Processing to run all sub folders at same time (number of processer is based on cpu count)
import multiprocessing

if __name__=="__main__":

    folders = []
    n = int(multiprocessing.cpu_count()/2)
    for i in range(0, len(read_folders), n):
        folder = read_folders[i:i + n]
        pool_objects = []
        for j in range(len(folder)):
            p = multiprocessing.Process(target=processed_data, args=(folder[j], ))
            p.start()
            pool_objects.append(p)

        for p in pool_objects:
            p.join()

    # All processes finished
    print("Done!")

Done!
Done!
peak memory: 87.26 MiB, increment: 0.00 MiB


## Data Wrangling (20 minutes)

In [3]:
%load_ext sql 
%sql sqlite:///db/sqlite/chinook.db

In [4]:
import sqlite3

con = sqlite3.connect("db/sqlite/chinook.db")

### 1. How many different customers are there?

In [5]:
%%sql
select count(distinct customerid) as cnt from customers

 * sqlite:///db/sqlite/chinook.db
Done.


cnt
59


### 2. How long is the longest track in minutes?

In [6]:
%%sql
select max(milliseconds)/60000 as minutes from tracks

 * sqlite:///db/sqlite/chinook.db
Done.


minutes
88


### 3. Which genre has the shortest average track length?

In [7]:
%%sql
select b.name as genre_name from 
(select t1.genreid,min(average)  from 
(select genreid,avg(milliseconds) as average from tracks group by genreid)t1 )a 
join genres b on a.genreid = b.genreid

 * sqlite:///db/sqlite/chinook.db
Done.


genre_name
Rock And Roll


### 4. Which artist shows up in the most playlists?

In [8]:
%%sql
select artist_name from (select t1.artist_name,max(t1.cnt) from (Select a.name as artist_name,count(e.name) as cnt
From artists a
Join albums b 
On a.artistid = b.artistid
join tracks c
on b.albumid = c.albumid
join playlist_track d 
on c.trackid = d.trackid
join playlists e 
on d.playlistid = e.playlistid
Group By a.Name)t1)

 * sqlite:///db/sqlite/chinook.db
Done.


artist_name
Iron Maiden


### 5. What was the most popular album among these customers?

In [9]:
%%sql
select title from (select a.title,count(c.quantity) as cnt from albums a 
join tracks b 
on a.albumid = b.albumid
join invoice_items c 
on b.trackid = c.trackid group by a.title ) order by cnt desc limit 1

 * sqlite:///db/sqlite/chinook.db
Done.


title
Minha Historia
