# Data Tech Lead Homework

In this document you will see a couple execises we would like you to solve. Please carefully read each task and create your solutions as you see fit.

You can use any documentation for Python libraries, API endpoints, database dialects, however using any other resource to solve the coding exercises is strongly prohibited.

Start with the prerequisites!


## Prerequisites

You should already have Python 3.11, Docker and a Database IDE (e.g. DataGrip, DBeaver) installed. Please make sure that the Docker daemon is running.

Open a terminal in VS Code (MacOS: `Cmd + Shift + P` -> Toggle Terminal, Windows: `Ctrl + Shift + P` -> Toggle Terminal) and type the command below.

❗️ You should keep using the Terminal in VS Code during your homework, to make sure everything is working as intended.

```bash
docker ps
```

If Docker is installed, but not running, you will see the following error message and you should start Docker Desktop:

`Cannot connect to the Docker daemon at unix:///Users/<username>/.docker/run/docker.sock. Is the docker daemon running?`

Otherwise you will see the headers of containers, e.g.:

`CONTAINER ID   IMAGE     COMMAND   CREATED   STATUS    PORTS     NAMES`


Create a virtual environment for Python, activate it, and install the packages outlined in requirements.txt:

```bash
python -m venv .venv

source .venv/bin/activate

pip install -r requirements.txt
```

You should select the virtual environment created as your Python kernel for this notebook in the top right corner.

Some of the most common libraries are already included in `requirements.txt`, however if at any point of this homework you need to install an additional library to solve an exercise, you can install it with `pip` (while the venv is activated).

Start the Postgres Server that is needed throughout your homework:
```bash
cd scripts
python -m run_postgres_server
```

Now everything is set up, let's start with the exercises!

In the database you will see tables with relevant information about movies, provided by TMDb.

❗️ If at any point you have trouble with an exercise, you can freely move on, they are not dependent on each other.

## Prerequisites

You should already have Python 3.11, Docker and a Database IDE (e.g. DataGrip, DBeaver) installed. Please make sure that the Docker daemon is running.

Open a terminal in VS Code (MacOS: `Cmd + Shift + P` -> Toggle Terminal, Windows: `Ctrl + Shift + P` -> Toggle Terminal) and type the command below.

❗️ You should keep using the Terminal in VS Code during your homework, to make sure everything is working as intended.

```bash
docker ps
```

If Docker is installed, but not running, you will see the following error message and you should start Docker Desktop:

`Cannot connect to the Docker daemon at unix:///Users/<username>/.docker/run/docker.sock. Is the docker daemon running?`

Otherwise you will see the headers of containers, e.g.:

`CONTAINER ID   IMAGE     COMMAND   CREATED   STATUS    PORTS     NAMES`


Create a virtual environment for Python, activate it, and install the packages outlined in requirements.txt:

```bash
python -m venv .venv

source .venv/bin/activate

pip install -r requirements.txt
```

You should select the virtual environment created as your Python kernel for this notebook in the top right corner.

Some of the most common libraries are already included in `requirements.txt`, however if at any point of this homework you need to install an additional library to solve an exercise, you can install it with `pip` (while the venv is activated).

Start the Postgres Server that is needed throughout your homework:
```bash
cd scripts
python -m run_postgres_server
```

Now everything is set up, let's start with the exercises!

In the database you will see tables with relevant information about movies, provided by TMDb.

❗️ If at any point you have trouble with an exercise, you can freely move on, they are not dependent on each other.

### Exercise 1 (0.5 point)

Write an SQL query that returns the number of films produced in Hungary.

In [None]:
SELECT country_name, COUNT(distinct(movie_id)) as Number_Movie
FROM staging.production_countries p inner join staging.movie_production_countries mp on p.country_iso = mp.country_iso 
WHERE country_name = 'Hungary'
group by country_name

### Exercise 2 (0.5 point)

Write an SQL query that returns the title and release year of movies that grossed over 1 billion USD.

In [None]:
SELECT m.title, EXTRACT(YEAR FROM m.release_date::date) AS release_year
FROM staging.movies m INNER JOIN staging.financials f ON m.id = f.movie_id
WHERE f.revenue > 1000000000;

### Exercise 3 (1 point)

Write an SQL query that returns the top 3 movies that achieved the most revenue by country.

In [None]:
SELECT country_name, title, revenue
FROM (
    SELECT m.title, f.revenue,p.country_name,
           ROW_NUMBER() OVER (PARTITION BY c.country_iso ORDER BY f.revenue DESC) AS rank
    FROM staging.movies m
    INNER JOIN staging.financials f ON m.id = f.movie_id
    inner join staging.movie_production_countries c on m.id = c.movie_id
    inner join staging.production_countries p on c.country_iso = p.country_iso
) ranked_movies
WHERE rank <= 3;

### Exercise 4 (1 point)

Write an SQL query that returns the top 5 countries based on the average rating of movies produced there.

In [None]:
SELECT country_name, AVG(v.vote_average) AS average_rating
FROM staging.votes v
INNER JOIN staging.movie_production_countries c ON v.movie_id = c.movie_id
inner join staging.production_countries p on c.country_iso = p.country_iso
GROUP BY country_name
HAVING AVG(v.vote_average) IS NOT NULL
ORDER BY average_rating DESC
LIMIT 5;

### Exercise 5 (1 point)

It seems like data in `staging.production_companies` table is duplicated. Write an SQL query that fixes it without risking any data that is currently in the table.

In [None]:
WITH deduplicated_companies AS (
    SELECT
        company_id,
        company_name,
        ROW_NUMBER() OVER (PARTITION BY company_id, company_name ORDER BY company_id) AS row_num
    FROM staging.production_companies
)
SELECT
    company_id,
    company_name
FROM
    deduplicated_companies
WHERE
    row_num = 1;


What would have prevented this duplication to occur? How would you fix it? (0.5 point)

By using INSERT ... ON CONFLICT  duplicate records are handled gracefully by either ignoring them or updating the existing row in PostgreSQL

### Exercise 6 (3 points)

It seems like that the data is not full in the tables. You see a couple of `.csv` files with the prefix of `staging` in the data folder.

Load these files to the DB. Make sure that rows would not be duplicated.

🩼 *You can find the SQLAlchemy connection URL in your terminal window after you started the Postgres server.*

In [7]:
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.exc import SQLAlchemyError

# Replace with your actual connection string
connection_string = 'postgresql+psycopg2://de_homework_user:u3Hg3tOy59uG@localhost:58563/homework_db'
engine = create_engine(connection_string, pool_pre_ping=True)

# Function to load a CSV into a DataFrame and remove duplicates
def load_csv(file_path):
    df = pd.read_csv(file_path)
    df.drop_duplicates(inplace=True)
    return df

# Function to convert NumPy types to native Python types
def convert_to_native_type(value):
    if pd.isnull(value):
        return None
    if isinstance(value, (pd.Int64Dtype, pd.Float64Dtype, pd.StringDtype)):
        return value
    if hasattr(value, 'item'):
        return value.item()  # Convert numpy types to native types
    return value

# Function to insert data into the database
def insert_data(engine, table_name, schema_name, dataframe, insert_query, params_mapping):
    with engine.connect() as conn:
        for _, row in dataframe.iterrows():
            try:
                # Convert row values to native Python types
                params = {key: convert_to_native_type(row.get(key)) for key in params_mapping}
                conn.execute(insert_query, params)
            except SQLAlchemyError as e:
                print(f"Error inserting row into {schema_name}.{table_name}: {e}")

# Data configurations
filepath = 'C:/Users/omogn/Documents/HomeWORK/data-tech-lead-homework/data/'
data_configs = [
    {
        'file': f"{filepath}/staging_movies.csv",
        'table': 'movies',
        'schema': 'staging',
        'insert_query': text("""
            INSERT INTO staging.movies (id, title, original_title, status, release_date, runtime, original_language, homepage)
            VALUES (:id, :title, :original_title, :status, :release_date, :runtime, :original_language, :homepage)
            ON CONFLICT (id) DO NOTHING;
        """),
        'params': ['id', 'title', 'original_title', 'status', 'release_date', 'runtime', 'original_language', 'homepage']
    },
    {
        'file': f"{filepath}/staging_financials.csv",
        'table': 'financials',
        'schema': 'staging',
        'insert_query': text("""
            INSERT INTO staging.financials (movie_id, budget, revenue)
            VALUES (:movie_id, :budget, :revenue)
            ON CONFLICT (movie_id) DO NOTHING;
        """),
        'params': ['movie_id', 'budget', 'revenue']
    },
    {
        'file': f"{filepath}/staging_movie_genres.csv",
        'table': 'movie_genres',
        'schema': 'staging',
        'insert_query': text("""
            INSERT INTO staging.movie_genres (movie_id, genre_id)
            VALUES (:movie_id, :genre_id)
            ON CONFLICT (movie_id, genre_id) DO NOTHING;
        """),
        'params': ['movie_id', 'genre_id']
    },
    {
        'file': f"{filepath}/staging_movie_cast.csv",
        'table': 'movie_cast',
        'schema': 'staging',
        'insert_query': text("""
            INSERT INTO staging.movie_cast (movie_id, credit_id, cast_id, character_id)
            VALUES (:movie_id, :credit_id, :cast_id, :character_id)
            ON CONFLICT (movie_id, credit_id) DO NOTHING;
        """),
        'params': ['movie_id', 'credit_id', 'cast_id', 'character_id']
    },
    {
        'file': f"{filepath}/staging_votes.csv",
        'table': 'votes',
        'schema': 'staging',
        'insert_query': text("""
            INSERT INTO staging.votes (movie_id, vote_average, vote_count)
            VALUES (:movie_id, :vote_average, :vote_count)
            ON CONFLICT (movie_id) DO NOTHING;
        """),
        'params': ['movie_id', 'vote_average', 'vote_count']
    },
    {
        'file': f"{filepath}/staging_movie_crew.csv",
        'table': 'movie_crew',
        'schema': 'staging',
        'insert_query': text("""
            INSERT INTO staging.movie_crew (movie_id, credit_id, crew_id, job_id)
            VALUES (:movie_id, :credit_id, :crew_id, :job_id)
            ON CONFLICT (movie_id, credit_id) DO NOTHING;
        """),
        'params': ['movie_id', 'credit_id', 'crew_id', 'job_id']
    },
    {
        'file': f"{filepath}/staging_movie_production_companies.csv",
        'table': 'movie_production_companies',
        'schema': 'staging',
        'insert_query': text("""
            INSERT INTO staging.movie_production_companies (movie_id, company_id)
            VALUES (:movie_id, :company_id)
            ON CONFLICT (movie_id, company_id) DO NOTHING;
        """),
        'params': ['movie_id', 'company_id']
    },
    {
        'file': f"{filepath}/staging_movie_spoken_languages.csv",
        'table': 'movie_spoken_languages',
        'schema': 'staging',
        'insert_query': text("""
            INSERT INTO staging.movie_spoken_languages (movie_id, language_iso)
            VALUES (:movie_id, :language_iso)
            ON CONFLICT (movie_id, language_iso) DO NOTHING;
        """),
        'params': ['movie_id', 'language_iso']
    },
    {
        'file': f"{filepath}/staging_movie_production_countries.csv",
        'table': 'movie_production_countries',
        'schema': 'staging',
        'insert_query': text("""
            INSERT INTO staging.movie_production_countries (movie_id, country_iso)
            VALUES (:movie_id, :country_iso)
            ON CONFLICT (movie_id, country_iso) DO NOTHING;
        """),
        'params': ['movie_id', 'country_iso']
    }
]


# Main process to load and insert data
for config in data_configs:
    df = load_csv(config['file'])
    insert_data(engine, config['table'], config['schema'], df, config['insert_query'], config['params'])

print("Data insertion process completed successfully.")

   

Data insertion process completed successfully.


### Exercise 7 (3 points)

It seems like that in the `staging.votes` table we have a couple of fields with `NULL` values.

Use the TMDb API to retrieve the `vote_count` and `vote_average` fields for these movies and update the table.

TMDb documentation is available here: https://developer.themoviedb.org/reference/intro/getting-started

You can use this header for authentication:
```python
headers = {
    "accept": "application/json",
    "Authorization": "Bearer eyJhbGciOiJIUzI1NiJ9.eyJhdWQiOiJjMDdiMGIwMmQwNzM4ZmU3YWVmNzEyMDk3OTlkOWExNCIsInN1YiI6IjY2NGM3YjM3OThmNWZjNWJiNTU1MmNlNiIsInNjb3BlcyI6WyJhcGlfcmVhZCJdLCJ2ZXJzaW9uIjoxfQ.oN61KwLmLTLir5KML1pzZ7raPn4iDZREOI59XT5cyIk"
}
```

In [None]:
import requests
import pandas as pd
from sqlalchemy import create_engine, text

# Replace with your actual connection string
connection_string = 'postgresql+psycopg2://de_homework_user:jokcoG44hoKW@localhost:51409/homework_db'
engine = create_engine(connection_string,pool_pre_ping=True)
conn = engine.connect()

# Define your headers with authentication
headers = {
    "accept": "application/json",
    "Authorization": "Bearer eyJhbGciOiJIUzI1NiJ9.eyJhdWQiOiJjMDdiMGIwMmQwNzM4ZmU3YWVmNzEyMDk3OTlkOWExNCIsInN1YiI6IjY2NGM3YjM3OThmNWZjNWJiNTU1MmNlNiIsInNjb3BlcyI6WyJhcGlfcmVhZCJdLCJ2ZXJzaW9uIjoxfQ.oN61KwLmLTLir5KML1pzZ7raPn4iDZREOI59XT5cyIk"
}

# SQL query to get movie IDs with NULL vote_count or vote_average
query = "SELECT movie_id FROM staging.votes WHERE vote_count IS NULL OR vote_average IS NULL;"

# Fetch movie IDs from the database
with conn:
    result = conn.execute(text(query))
    movie_ids = [row['movie_id'] for row in result]

# Function to get vote details from TMDb API
def fetch_vote_details(movie_id):
    try:
        url = f"https://api.themoviedb.org/3/movie/{movie_id}"
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            data = response.json()
            return {
                "movie_id": movie_id,
                "vote_count": data.get("vote_count"),
                "vote_average": data.get("vote_average")
            }
        else:
            print(f"Failed to fetch data for movie_id: {movie_id}")
            return None
    except Exception as e:
        print(f"Error occurred: {e}")
        return None

# Fetch details for all movies with missing votes
movie_details_list = []
for movie_id in movie_ids:
    details = fetch_vote_details(movie_id)
    if details:
        movie_details_list.append(details)

# Convert to DataFrame
df = pd.DataFrame(movie_details_list)
print(df)


# Function to update vote data in the database
def update_vote_data(movie_id, vote_count, vote_average):
    update_query = text("""
        UPDATE staging.votes
        SET vote_count = :vote_count, vote_average = :vote_average
        WHERE movie_id = :movie_id;
    """)
    
    with engine.connect() as conn:
        conn.execute(update_query, {
            'movie_id': movie_id,
            'vote_count': vote_count,
            'vote_average': vote_average
        })

# Update each movie record
for _, row in df.iterrows():
    if pd.notnull(row['vote_count']) and pd.notnull(row['vote_average']):
        update_vote_data(row['movie_id'], row['vote_count'], row['vote_average'])

print("Database updated successfully.")






### Exercise 8 (3 points)

Write an SQL query that returns the title of all of the space colony movies.

We do not have the `keywords` column in tabular format from `raw.movies`. You can query that one or create a new table in the `staging` schema that contains this information.

In [None]:
## query to return all title space colony movies
SELECT movie_id,title
FROM (SELECT
    jsonb_array_elements(keywords) ->> 'id' AS movie_id,
    jsonb_array_elements(keywords) ->> 'name' AS title
FROM raw.movies)
where title ='space colony'

### Exercise 9 (6 points)

Your task is to design a datamart on top of the staging tables. Your client is building a BI dashboard and is looking to answer the following KPIs:
- correlation between revenue and ratings
- top 10 movies by revenue
- number of movies produced by each production company
- average budget and revenue by genre
- most prevalent jobs in the movie industry

Create an ELT pipeline to load data from the staging tables to the datamart tables you designed. For bonus points, write your functions for Dagster, Airflow or Prefect frameworks.

Your schema should be called `datamart`. You are free to design the datamart however you see fit, the main goal is to be able to answer these questions based on the datamart on a BI dashboard.

❗ *You do not need to implement a full solution, just design your DAGs with these frameworks (using decorators, IO managers, etc) in mind for the bonus points.*

In [2]:

from dagster import job, op, In, Out
from sqlalchemy import create_engine, text
import pandas as pd

# Replace with your actual connection string
connection_string = 'postgresql+psycopg2://de_homework_user:jokcoG44hoKW@localhost:51409/homework_db'
engine = create_engine(connection_string,pool_pre_ping=True)
conn = engine.connect()

# Extract Operations
@op(out={"movies": Out(), "production_companies": Out(), "movie_genres": Out(), "financials": Out(), "votes": Out(),"movie_production_companies": Out(),"genres": Out(),"movie_crew": Out(),"crew_job": Out()})
def extract_data():
    with engine.connect() as conn:
        movies = pd.read_sql("SELECT * FROM staging.movies", conn)
        production_companies = pd.read_sql("SELECT * FROM staging.production_companies", conn)
        movie_genres = pd.read_sql("SELECT * FROM staging.movie_genres", conn)
        financials = pd.read_sql("SELECT * FROM staging.financials", conn)
        votes = pd.read_sql("SELECT * FROM staging.votes", conn)
        movie_production_companies = pd.read_sql("SELECT * FROM staging.movie_production_companies", conn)
        genres = pd.read_sql("SELECT * FROM staging.genres", conn)
        movie_crew = pd.read_sql("SELECT * FROM staging.movie_crew", conn)
        crew_job = pd.read_sql("SELECT * FROM staging.crew_job", conn)


    return movies, production_companies, movie_genres, financials, votes,movie_production_companies,genres,movie_crew,crew_job

# Call the function to extract data
movies, production_companies, movie_genres, financials, votes,movie_production_companies,genres,movie_crew,crew_job = extract_data()
                            

##Transformation of data for Movies metrics,production company summary,genre summary and job frequency

@op(ins={"movies": In(), "movie_genres": In(), "financials": In(), "votes": In()})
def transform_movie_metrics(movies, movie_genres, financials, votes):
    # Ensure 'movie_id' exists in financials, movie_genres, and votes
    # Check for common columns that can be used for joining
    # Rename columns where necessary to avoid conflicts after merging
    
    # Merge movies with financials
    merged_df = (
        movies.merge(movie_genres, left_on='id', right_on='movie_id', how='left')
              .merge(financials, left_on='id', right_on='movie_id', how='left')
              .merge(votes, left_on='id', right_on='movie_id', how='left')
    )

    # Select and rename columns as needed to avoid duplicates
    movie_metrics = merged_df[['id','movie_id','title', 'genre_id', 'status', 'revenue', 'budget', 'vote_average', 'vote_count']].drop_duplicates()
    return movie_metrics

# Example: assuming these variables are defined correctly
movie_metrics = transform_movie_metrics(movies, movie_genres, financials, votes)

# Print the result
#print(movie_metrics.head())


@op(ins={"movie_production_companies": In(), "production_companies": In()})
def transform_production_company_summary(production_companies, movie_production_companies):
    # Merge the two DataFrames on `company_id`
    merged_df = (movie_production_companies.merge(production_companies, on='company_id', how='inner')
                                          .merge(financials, on='movie_id', how='inner'))
    
    
     # Select relevant columns and drop duplicates
    production_company_summary= merged_df[['company_id', 'company_name', 'movie_id', 'revenue', 'budget']].drop_duplicates()
    
    
    # Group by `genre_id` and `genre_name`, and calculate required metrics
    production_company_summary = (
        production_company_summary.groupby(['company_id', 'company_name'])
        .agg(
            movie_count=('movie_id', 'count'), 
            total_revenue=('revenue', 'mean'), 
            total_budget=('budget', 'mean')
        )
        .reset_index()
    )

    # Sort by `vol` in descending order
    #result = result.sort_values(by='vol', ascending=False)
    
    return production_company_summary

# Call the function with the appropriate inputs
production_company_summary = transform_production_company_summary(production_companies, movie_production_companies)

# Print or display the result
#print(production_company_summary)


@op(ins={"movie_genres": In(), "financials": In(), "genres": In()})
def transform_production_genre_summary(movie_genres, financials, genres):
    # Merge movie_genres with genres and financials
    merged_df = (
        movie_genres.merge(genres, on='genre_id', how='inner')
                    .merge(financials, on='movie_id', how='inner')
    )
    
    # Select relevant columns and drop duplicates
    genre_summary = merged_df[['genre_id', 'genre_name', 'movie_id', 'revenue', 'budget']].drop_duplicates()

    # Group by `genre_id` and `genre_name`, and calculate required metrics
    genre_summary = (
        genre_summary.groupby(['genre_id', 'genre_name'])
        .agg(
            average_revenue=('revenue', 'mean'), 
            average_budget=('budget', 'mean'),
            movie_count=('movie_id', 'count'),
        )
        .reset_index()
    )

    # Sort by `vol` in descending order
    #genre_summary = genre_summary.sort_values(by='vol', ascending=False)
    
    return genre_summary

# Call the function with the appropriate inputs
genre_summary = transform_production_genre_summary(movie_genres, financials, genres)

# Print or display the result
#print(genre_summary)


@op(ins={"movie_crew": In(), "crew_job": In()})
def transform_job_frequency(movie_crew, crew_job):
    # Merge movie_crew with crew_job on 'job_id'
    merged_df = movie_crew.merge(crew_job, on='job_id', how='inner')
    
    # Select relevant columns and drop duplicates
    job_frequency = merged_df[['job_id', 'job_name', 'movie_id']].drop_duplicates()

    # Group by `job_id` and `job_name`, and calculate the occurrence count
    job_frequency = (
        job_frequency.groupby(['job_id', 'job_name'])
        .agg(
            occurrence_count=('movie_id', 'count')
        )
        .reset_index()
    )

    # Return the resulting DataFrame
    return job_frequency

# Call the function with the appropriate inputs
job_frequency = transform_job_frequency(movie_crew, crew_job)

# Print or display the result
#print(job_frequency)


## Load Operations movie metric

@op(ins={"movie_metrics": In()})
def load_movie_metrics(movie_metrics):
    #with engine.connect() as conn:
        movie_metrics.to_sql("movie_metrics", conn, schema="datamart", if_exists="replace", index=False)
        return movie_metrics   

movie_metrics = load_movie_metrics(movie_metrics)

#print(movie_metrics.head())



## Load Operations production company summary

@op(ins={"production_company_summary": In()})
def load_production_company_summary(production_company_summary):
    #with engine.connect() as conn:
        production_company_summary.to_sql("production_company_summary", conn, schema="datamart", if_exists="replace", index=False)
        return production_company_summary   

production_company_summary = load_production_company_summary(production_company_summary)

#print(production_company_summary.head())


## Load Operations genre summary

@op(ins={"genre_summary": In()})
def load_genre_summary(genre_summary):
    #with engine.connect() as conn:
        genre_summary.to_sql("genre_summary", conn, schema="datamart", if_exists="replace", index=False)
        return genre_summary   

genre_summary = load_genre_summary(genre_summary)

#print(genre_summary.head())

 
## Load Operations job frequency

@op(ins={"job_frequency": In()})
def load_job_frequency(job_frequency):
    #with engine.connect() as conn:
        job_frequency.to_sql("job_frequency", conn, schema="datamart", if_exists="replace", index=False)
        return job_frequency   

job_frequency = load_job_frequency(job_frequency)

#print(job_frequency.head())



## Pipeline to load the the data 

@job
def datamart_elt_pipeline():
    movies, production_companies, movie_genres, financials, votes,movie_production_companies,genres,movie_crew,crew_job = extract_data()
    movie_metrics = transform_movie_metrics(movies, movie_genres, financials, votes)
    movie_metrics = load_movie_metrics(movie_metrics)
                                       
    production_company_summary = transform_production_genre_summary(movie_production_companies,financials,production_companies)                                 
    production_company_summary = load_production_company_summary(production_company_summary)
                                       
    genre_summary = transform_production_genre_summary(movie_genres, financials, genres)                                   
    genre_summary = load_genre_summary(genre_summary)
                                       
    job_frequency = transform_job_frequency(movie_crew, crew_job)                                  
    job_frequency = load_job_frequency(job_frequency)    





### Exercise 10 (10 points)

In this assignment we will describe an imaginary project about a US based home insurance company who would like to have a plan for an independent centralised reporting platform.
Your task will be to evaluate the situation and come-up with a plan to deliver it.

#### WBSS Home Insurance Overview

There is a US based home insurance company who works as an insurance platform. Their USP is that they are acting as a one-stop-shop and provide not just insurance policy handling but water leakage sensors to their clients.
The platform is a website where the clients can login, look and manage their policy statuses, they can enroll to the smart protection program, they can submit claims, and receive alerts when there is a water leakage in the house.
The insurance company works with different 3rd parties for:
- policy management
- sensor shipment and sensor data collection
- customer claim handling

The request is to provide a plan to have an independent centralised business reporting platform under their own premises.

#### Source Systems and Architecture

The WBSS platform itself is a website (built on Azure) where there is a Google Analytics event tracking in place to see platform performance and customer behaviour.
The 3rd party policy management system can provide CSV file extracts on a daily basis about the policy status changes and cover new policies.
The smart home sensor 3rd party vendor is able to share smart home data (customer, event and sensor level information) through providing an AWS RDS server view access.
The claim handling company provided a SOAP API endpoint with access to request data for all needed claims.
The logical relationship between the data points is the following: An insurance holder can have a policy related to a home. In the home there can be installed smart home sensors. These sensors are sending health and alert data that can become a claim in case of a damage. Also, they can submit individual claims through the WBSS platform that is appearing in the claims management system as well.

#### Your Task
- Create a high level architecture plan in Azure where it is possible to integrate all data sources and have a connected datamart. Describe data ingestion for all relevant sources, plan the ETL/ELT pipelines, describe what components you would use and what tools.
- Create a high level estimate for the main tasks.
- List possible risks, and desrcibe how would you mitigate them.
- How would you start to estimate and optimize cloud costs? Please let us know your thought process.
- You can use any (free) tool for your architecture design (e.g. Draw.io, Eraser) and please include a picture of it below. For the estimations and risks, you can use the markdown cell below.
- Be as thorough as possible, if you feel you need to add additional notes to describe your solution, please do so.
- Where you feel you don't have detailed enough information, make your own assumptions and recommendations.


### Exercise 10 Solution:
High-Level Architecture Plan in Azure

To create a centralized business reporting platform, we will leverage the Modern Analytics Architecture on Azure with Azure Databricks as the core engine for data processing and transformation. This architecture will provide a robust, scalable, and efficient system to integrate all data sources and deliver a connected datamart for business intelligence and reporting.

Architecture Overview
This architecture uses a Medallion Data Lake Architecture (Bronze, Silver, Gold) to manage data at different stages:
•	Bronze Layer: Raw data directly from source systems, stored for archival purposes.
•	Silver Layer: Cleaned, deduplicated, and normalized data.
•	Gold Layer: Aggregated, analytics-ready data used for business intelligence.

For Architecture design diagram:Please refer to git repository. 

The key components of this architecture include:

1. Azure Data Lake Storage (ADLS) Gen2
•	Purpose: Central repository for all data, with hierarchical structure.
•	Functionality:
o	Bronze Layer: Stores raw data ingested from external sources, including CSV files, sensor data from AWS RDS, and claims data from SOAP API.
o	Silver Layer: Stores cleaned and structured data after transformation.
o	Gold Layer: Stores aggregated and enriched data ready for analytics and reporting.

2. Azure Data Factory (ADF)
•	Purpose: Ingest data from various sources and orchestrate data pipelines.
•	Functionality:
o	CSV Files: Connect to policy management system, import CSV extracts into the Bronze Layer of ADLS.
o	AWS RDS Data: Use an integrated connector to extract sensor data from AWS databases.
o	SOAP API: Interact with claims handling system to request data using REST/SOAP activities.
o	Scheduled Pipelines: Set up pipelines to run at specific intervals (e.g., daily for batch processing or near real-time for sensor alerts).

3. Azure Databricks
•	Purpose: Central engine for data processing, transformation, and advanced analytics.
•	Functionality:
o	ETL/ELT Processes: Transform raw data (from Bronze to Silver) by cleansing, deduplication, and applying business rules.
o	Aggregation: Prepare data (from Silver to Gold) by performing data joins, aggregations, and enrichment to create ready-to-use datasets for reporting.
o	Real-Time Streaming: Ingest real-time sensor data using Structured Streaming to capture alerts and events.

4. Azure Synapse Analytics
•	Purpose: Serve as the central data warehouse, housing the final, processed data from the Gold Layer.
•	Functionality:
o	Fast Querying: Optimized for handling large-scale queries and supporting business intelligence requirements.
o	Data Modeling: Define fact and dimension tables to support relational queries and Power BI reports.
o	Data Exploration: SQL-based exploration for ad-hoc analysis.

5. Power BI
•	Purpose: Visualization and business intelligence tool for creating dashboards and reports.
•	Functionality:
o	Real-Time Dashboards: Create dynamic dashboards that reflect real-time alerts from sensors.
o	Data Insights: Offer insights on policy trends, claim patterns, customer behavior, and platform performance.
o	User Access Control: Configure access permissions for different teams (e.g., claims team, management, operations).

6. Azure Key Vault
•	Purpose: Secure storage for sensitive information like API keys, database connection strings, and other secrets.
•	Functionality:
o	Data Security: Ensure secure data access, enhancing the platform’s security posture.
o	Access Management: Control access to keys, secrets, and certificates to prevent unauthorized data access.

7. Azure Monitor & Log Analytics
•	Purpose: Monitoring, alerting, and diagnostics for the entire data platform.
•	Functionality:
o	Monitoring Pipelines: Track data pipeline execution, detect failures, and alert teams for immediate action.
o	Resource Utilization: Monitor CPU, memory, and storage utilization across Databricks, ADF, and Synapse to ensure optimal performance.
o	Diagnostic Logs: Collect logs and diagnostic data to troubleshoot any issues.

Detailed Data Ingestion and ETL/ELT Pipelines

Data Sources & Ingestion

Data Source              	          Ingestion Method	               Frequency	           Storage Layer
Policy Management (CSV)	ADF           File Ingestion	               Daily Batch             ADLS (Bronze)
Smart Home Sensors (AWS RDS)	      ADF Database Connector	       Near Real-Time	       ADLS (Bronze)
Claims Handling (SOAP API)	          Azure Logic Apps + ADF	       Daily Batch	           ADLS (Bronze)
Google Analytics (User Behavior)	  Google Analytics API Connector   Hourly	               ADLS (Bronze)


ETL/ELT Pipelines
1.	Bronze to Silver Transformation (Data Cleansing)
o	Task: Deduplicate records, handle missing values, normalize data, validate data types.
o	Tool: Azure Databricks
o	Output: Clean and structured data stored in the Silver Layer of ADLS.

2.	Silver to Gold Transformation (Data Aggregation & Enrichment)
o	Task: Join datasets (e.g., linking policy data with sensor data), aggregate information (e.g., total claims per policy), enrich with calculated fields (e.g., risk scores).
o	Tool: Azure Databricks
o	Output: Aggregated datasets stored in Azure Synapse Analytics for fast querying.

3.	Real-Time Processing (Sensor Alerts)
o	Task: Stream sensor alerts and events, process them in real-time to detect anomalies (e.g., water leakage alerts).
o	Tool: Azure Databricks Structured Streaming
o	Output: Alert data stored in ADLS (Silver) and pushed to Azure Synapse Analytics (Gold) for live monitoring via Power BI.


3. High-Level Task Estimation
Task	                                         Description	                                                   Estimated  Time
Architecture Design     	              Define architecture, components, and data flow.          	                   1 Week
Data Ingestion Setup	                  Configure ADF, Logic Apps, and connectors for data sources.	               2 Weeks
Azure Databricks ETL/ELT Pipelines	      Develop data transformation and aggregation pipelines.	                   5 Weeks
Azure Synapse Setup & Data Modeling	      Design data schemas, tables, and relationships.	                           2 Weeks
Power BI Dashboard Design	              Develop and deploy reporting dashboards for various stakeholders.            3 Weeks
Testing & Quality Assurance	              Validate data flow, data accuracy, and system integration.       	           2 Weeks
Deployment & Documentation	              Deploy production environment, create user manuals and operational guides.   1 Week
Ongoing Project Management	              Ongoing tasks: planning, coordination, issue tracking, documentation.   	   Ongoing

4. Risk Assessment and Mitigation Please refer to the attached  Excel spreadsheet.

5. Cloud Cost Estimation and Optimization

Cost Estimation Approach
1.	Storage:
o	Estimate based on the expected data volume (GBs/TBs) per day multiplied by 30 days for monthly costs in Azure Data Lake Storage.
2.	Compute:
o	Evaluate costs based on Databricks cluster sizes and runtime (per hour). Calculate based on expected daily workload and usage.
3.	Data Orchestration:
o	Calculate Azure Data Factory costs based on the number of pipeline activities and frequency of runs.
4.	Data Warehouse:
o	Estimate Synapse Analytics costs based on provisioned resources and query workload.
  
Cost Optimization Strategies
•	Auto-Scaling & Right-Sizing: Configure auto-scaling in Azure Databricks clusters and Synapse Analytics to save costs by adjusting resources based on demand.
•	Data Tiering: Move less frequently accessed data to lower-cost storage tiers.
•	Optimize Pipelines: Efficiently design data pipelines to reduce runtime, saving on compute costs.
•	Cost Monitoring: Set up Azure Cost Alerts and budgets to proactively monitor and control spending.

Conclusion
The proposed architecture ensures a scalable, secure


























