## Technical Questions

1. Data Modeling and Warehousing
Question: How do you design a schema for a new data-intensive application?
Depending on the project. First, it's mandatory know the end user, the decision the end user will make (so we know which decisions the data system will help to make), and gather requirements from the end user.
Second, we can start modeling the data. Identifying all the tables and their relationship, decide between schemas (such as snowflake, star, etc) and depending on how the tables behave, we can normalize or denormalize (in star schema, tables are denormalized, to improve querying time)
Then, we can plan and apply indexing strategies, to optimize performance and we can decide where the data system will be stored at (Snowflake, any cloud services, etc), always taking into account not only princing but also reliability, fault tolerance, backup options, etc.
Another important step that if often ignored, is that after all the tables and schemas are created, we need to control access. So, that would be the next step, controling access to secure our new data system.
Lastly, continous monitoring and definition of backup plans will help us not only to monitor the quality of our data, but also, to mitigate risk if we have data loss.


2. ETL/ELT Design and Implementation
Question: Describe a complex ETL pipeline you've designed. What were some of the challenges, and how did you address them?
In my past job, I had to design a Risk Score Dashboard, to detect fraudulent clients. The project used at least two data system (one data warehosue stored in Snowflake and the other in PostgreSQL) and many tables with complex queries and transformations.
Challenges:
    1. The project used a codebase, this code was a legacy code: a single file with over 1000 lines of code and without proper documentation.
    2. Our intern client was data science and they needed over 30 metrics with the possibility of adding more in the future.
    3. The project only had two engineers (being me one of them).

My first step was to understand, not only the codebase but also, the DS team needs. It was difficult to find a common ground, since DE and DS didn't have much communication within the company. But once we agreed on a set of features that we could launch as the first version, the second challange was to create a new architecture out of the code base, having in mind scalability and the best software design practices.

The first approach was to create an architecture using the Template Method, which allowed us to find patterns between the metrics, thus, the implementation of new metrics will be easier in terms of code. Also, we used the advantage of Template Method with multithreading (even with all python restrictions). Having multithreads help us to calculate metrics faster!

Once we knew how the architecture will work, the second part was to optimize the usage of tables, since it was a legacy code it used outdated tables, so our next task was not only to change the use of outdated tables, but also, to optimize queries, since one of the main problems of the legacy code was performance and querying time.

At the end, we had a very sofisticated architecture with clean code and tables that were faster and efficient. 

3. Data Infrastructure and Orchestration
Question: What experience do you have with managing data infrastructure on-premises or in the cloud?
I have worked with AWS services, a PostgreSQL table stored in a EC2 instance in AWS, I have used Glue and Athena to create tables, and also, in my past job, my tasks were mainly focused on Snowflake and some of its features, such as tasks, streams, views, snowpipes and snowpark.

4. Programming and Software Engineering Practices
Question: What programming languages are you most comfortable with, and what libraries do you frequently use in data engineering projects?
I'm comfortable with Python, and I've worked mostly with pandas, numpy, boto3 (for aws), awswrangler, sqlalchemy (orm for python-databases), some visualization libraries (to mention, pandas, altair, seaborn). Also, I like to use poetry for enviroment managing, pydantic for managing enviroment variables and credentials; ruff and black for linting.

5. Data Security and Compliance
Question: How do you implement security measures in your data engineering projects?
Using enviroments managers for credentials, enforcing the use of repositories (gitlab, github, etc) and SSH keys, and the use of hooks to test the project before launching it into production and controling access to data resources.


**Please explain how you ussually follow CI/CD pipelines**
In my past job, we had a CI/CD pipeline in gitlab which consisted in three fases, linting, testing and deploy.
The first one, ensured that best python practices were followed in every project.
The second, ensured that every project had at least a 80% of the code tested, using Unit Testing and mocks.
The last one, was just the final deploy. Every project had usually three stages, develop, stage and production. Every staged had the same CI/CD with minor variatons

## Code Challenge SQL

Code Challenge Description
Title: Building a MySQL Database Interface in Python

Objective:
You are tasked with creating a Python application that interfaces with a MySQL database. The application will manage a dataset representing sales data for a tech company that sells various products across multiple countries. Your goal is to establish a database connection, create a table, and populate this table with sample data.

Tasks:

Create a Database Connection:
Implement a Python function to establish a connection to a MySQL database using provided credentials (host, username, password, and database name).
Define and Create a Table:
Write SQL commands within your Python script to create a table named sales. This table should have columns for id, country, category, price, quantity, and final_sales, with appropriate data types.
Insert Data:
Prepare a series of SQL INSERT statements to populate the sales table with the provided sample data. Ensure each record accurately reflects the sales data format.
Execute Queries:
Write functions to execute SQL queries to create the table and insert data into the table. Include error handling to manage potential SQL execution errors.
Expected Deliverables:

A Python script that can be run to connect to a MySQL database, create the necessary table, and populate it with data.
Your script should handle common errors that might occur during database operations, such as connection failures or SQL syntax errors.
Evaluation Criteria:

Correctness: The script should correctly execute all database operations without errors.

Code Quality: Code should be clear, well-organized, and appropriately commented.

Error Handling: The script should effectively handle and report errors during database operations.

Efficiency: SQL operations should be written efficiently to optimize execution.

Setup Instructions
Just use mysql local community server and a made up data set related to sales of devices in a tech company 
https://dev.mysql.com/downloads/mysql/

Table 1 Sales 
    product_id ,
    country ,
    category ,
    price ,
    quantity ,
    final_sales
    
Table 2 Product
    id  PRIMARY KEY,
    category ,
    capacity ,
    color ,
    screen_size ,
    memory ,
    other_specs .
Ensure you have mysql-connector-python installed in your environment. If not, you can install it using pip install mysql-connector-python.

Tips for Success
Test each part of your script incrementally to ensure that each function behaves as expected.
Consider the edge cases, such as what happens if the table already exists or the database connection cannot be established.

This challenge is designed to test your ability to integrate Python programming with SQL database management, reflecting tasks you may handle as a data engineer in our organization. Good luck!

**Create a MySQL - Python Connection**

For this part, I'll be using SQLAlchemy as an ORM. I usually use ORMs to handle databases connections and queries, it is not only simpler, but safer.

Needless to say that everything inside the 'database' folder will be related to the database.

First, we have a config.py file, inside of it, we will find a class called Settings that will help to load all the enviroment variables needed for the MySQL connection (username, password, schema name, etc).

Then, we have a mySQL.py file, in which we will be creating the engine and session using sqlalchemy. This file uses an .env file where all the enviroment variables will be declare. I will provide an env-test file with all the variables needed for this project.

Lastly, we create our data models (basically, a class representation of our tables in python), in this case: Sales and Product, defined in the Tables.py file. Usually, it's a good practice to have one file for one model, but for simplicity, I'll be creating both classes in a single file.


In [1]:
# checking the connection:
from database.mySQL import get_engine, get_session
from database.config import log
from sqlalchemy.exc import SQLAlchemyError
try:
    conne = get_engine().connect()
    log.info("Success!")
except SQLAlchemyError as e:
    log.error(e)

[2m2024-06-06 10:19:56[0m [[32m[1minfo     [0m] [1mSuccess!                      [0m


Now, we can create our tables or check if the table already exists.
In my case, the tables don't exist.

In [2]:
from sqlalchemy import inspect
from database.models.Tables import Sales, Product, get_base

engine = get_engine()
inspector = inspect(engine)
base = get_base()

tables = ['Sales', 'Product']

for table in tables:
    if not inspector.has_table(table):
        base.metadata.create_all(engine)
        break

Another advantage of using an ORM is that queries look cleaner.
This is a query for a batch insert.


In [3]:
from sqlalchemy import insert

session = get_session()

products = [
        Product(
            id = "84db9881-904c-4e2e-b3e6-74596b58a528",
            category = "wearable",
            capacity= 4,
            color = "blue",
            screen_size = "1.78 inches",
            memory= "4 gb",
            other_specs= "Cool smart watch"
        ),
        Product(
            id = "c935302a-ff92-40b9-9052-7b814be04ea5",
            category="wearable",
            capacity=3,
            color="red",
            screen_size="1.60 inches",
            memory="1 gb",
            other_specs="Older but cool smart watch"
        ),
        Product(
            id = "99d0c46e-9ae4-4e25-aec0-75bc73f79d06",
            category = "smartphone",
            capacity = 6,
            color = "pink",
            screen_size="6.06 inches",
            memory="4 gb",
            other_specs="The most famous smartphone in the market!"
        ),
        Product(
            id = "2b08cc8d-6b67-4a8c-8147-e363dd86f0f1",
            category = "laptop",
            capacity = 512,
            color = "silver",
            screen_size = "15.6 inches",
            memory = "16 gb",
            other_specs = "High-performance laptop for professionals."
        ),
        Product(
            id = "80a01da4-ae4f-4a2e-bc13-33d605a0a49d",
            category = "tablet",
            capacity = 128,
            color = "black",
            screen_size = "10 inches",
            memory = "8 gb",
            other_specs = "Portable tablet for entertainment and productivity."
        ),
        Product(
            id = "c5b7f5d2-5d07-43f3-9757-c38e5cfad52d",
            category = "headphones",
            capacity = None,
            color = "white",
            screen_size = None,
            memory = "16 gb",
            other_specs = "Wireless headphones with noise-canceling feature."
        ),
        Product(
            id= "f7ac1912-6b17-41de-a3fc-0efc3c4d4df4",
            category = "smartwatch",
            capacity = None,
            color = "blue",
            screen_size = "1.3 inches",
            memory= "2 gb",
            other_specs= "Fitness tracker and smartwatch with heart rate monitor."
        )
]

session.bulk_save_objects(products)
session.commit()

In [3]:
# check if the data was inserted:
session = get_session()  
query = session.query(Product.id, Product.category)
rows = query.all()
results = []
for row in rows:
    results.append(row)
results

[('2b08cc8d-6b67-4a8c-8147-e363dd86f0f1', 'laptop'),
 ('80a01da4-ae4f-4a2e-bc13-33d605a0a49d', 'tablet'),
 ('84db9881-904c-4e2e-b3e6-74596b58a528', 'wearable'),
 ('99d0c46e-9ae4-4e25-aec0-75bc73f79d06', 'smartphone'),
 ('c5b7f5d2-5d07-43f3-9757-c38e5cfad52d', 'headphones'),
 ('c935302a-ff92-40b9-9052-7b814be04ea5', 'wearable'),
 ('f7ac1912-6b17-41de-a3fc-0efc3c4d4df4', 'smartwatch')]

In [6]:
session = get_session()

sales_rows = [
        Sales(
            product_id = "84db9881-904c-4e2e-b3e6-74596b58a528",
            country = "USA",
            price= 4500.0,
            quantity = 2.0,
            final_sales = 9000.0
        ),
        Sales(
            product_id = "c935302a-ff92-40b9-9052-7b814be04ea5",
            country = "USA",
            price= 2500.0,
            quantity = 2.0,
            final_sales = 5000.0
        ),
        Sales(
            product_id = "99d0c46e-9ae4-4e25-aec0-75bc73f79d06",
            country = "Korea",
            price= 1500.0,
            quantity = 2.0,
            final_sales = 3000.0
        ),
        Sales(
            product_id = "2b08cc8d-6b67-4a8c-8147-e363dd86f0f1",
            country = "Germany",
            price = 2000.0,
            quantity = 1.0,
            final_sales = 2000.0
        ), 
        Sales(    
            product_id = "80a01da4-ae4f-4a2e-bc13-33d605a0a49d",
            country = "Japan",
            price = 900.0,
            quantity = 5.0,
            final_sales = 4500.0
        ),
        Sales(
            product_id = "c5b7f5d2-5d07-43f3-9757-c38e5cfad52d",
            country = "UK",
            price = 200.0,
            quantity = 10.0,
            final_sales = 2000.0
        ),
        Sales(
            product_id = "f7ac1912-6b17-41de-a3fc-0efc3c4d4df4",
            country = "Australia",
            price = 300.0,
            quantity = 3.0,
            final_sales = 900.0
        )
]

session.bulk_save_objects(sales_rows)
session.commit()

In [4]:
session = get_session()  
query = session.query(Sales.product_id, Sales.country)
rows = query.all()
results = []
for row in rows:
    results.append(row)
results

[('2b08cc8d-6b67-4a8c-8147-e363dd86f0f1', 'Germany'),
 ('80a01da4-ae4f-4a2e-bc13-33d605a0a49d', 'Japan'),
 ('84db9881-904c-4e2e-b3e6-74596b58a528', 'USA'),
 ('99d0c46e-9ae4-4e25-aec0-75bc73f79d06', 'Korea'),
 ('c5b7f5d2-5d07-43f3-9757-c38e5cfad52d', 'UK'),
 ('c935302a-ff92-40b9-9052-7b814be04ea5', 'USA'),
 ('f7ac1912-6b17-41de-a3fc-0efc3c4d4df4', 'Australia')]

## Use Case: Detailed Sales Analysis
### Objective:

Determine the top-selling product categories in each country.
Retrieve detailed product specifications for these top-selling products.
Provide additional insights like the total number of distinct products sold and the maximum sales recorded for each category.

In [5]:
from sqlalchemy import select
import pandas as pd
 
def get_joined_tables():
    """Function that executes a join and returns the results. """
    session = get_session()  
    query = session.execute(select(Product, Sales).join(Sales, Product.id == Sales.product_id)).all()
    return query

def get_sales():
    """Function that executes a select and returns the results. """
    session = get_session()  
    query = session.execute(select(Sales)).all()
    return query

def get_sales_df() -> pd.DataFrame:
    results = get_joined_tables()
    results_list = []
    for row in results:
        results_list.append({
            "product_id": row.Sales.product_id,
            "country": row.Sales.country,
            "price": row.Sales.price,
            "quantity": row.Sales.quantity,
            "final_sales": row.Sales.final_sales
        }
        )
    return pd.DataFrame(results_list)
    
def get_result_df() -> pd.DataFrame:
    """function that parses the result from a query and turns it into a pandas dataframe. """
    results = get_joined_tables()
    results_list = []
    for row in results:
        results_list.append({
            "id": row.Product.id,
            "category": row.Product.category,
            "capacity": row.Product.capacity,
            "color": row.Product.color,
            "screen_size": row.Product.screen_size,
            "memory": row.Product.memory,
            "other_specs": row.Product.other_specs,
            "product_id": row.Sales.product_id,
            "country": row.Sales.country,
            "price": row.Sales.price,
            "quantity": row.Sales.quantity,
            "final_sales": row.Sales.final_sales
        }
        )
    return pd.DataFrame(results_list)

In [6]:
result_df = get_result_df()

In [7]:
#Determine the top-selling product categories in each country.

# group each category into countries, and sum all sales.
grouped_cat = result_df.groupby(["country","category", "id"]).agg({"final_sales": "sum"}).reset_index()
grouped_cat

Unnamed: 0,country,category,id,final_sales
0,Australia,smartwatch,f7ac1912-6b17-41de-a3fc-0efc3c4d4df4,900
1,Germany,laptop,2b08cc8d-6b67-4a8c-8147-e363dd86f0f1,2000
2,Japan,tablet,80a01da4-ae4f-4a2e-bc13-33d605a0a49d,4500
3,Korea,smartphone,99d0c46e-9ae4-4e25-aec0-75bc73f79d06,3000
4,UK,headphones,c5b7f5d2-5d07-43f3-9757-c38e5cfad52d,2000
5,USA,wearable,84db9881-904c-4e2e-b3e6-74596b58a528,9000
6,USA,wearable,c935302a-ff92-40b9-9052-7b814be04ea5,5000


In [8]:
# select only top selling by country
top_selling = grouped_cat.loc[grouped_cat.groupby(["country"])['final_sales'].idxmax()]
top_selling

Unnamed: 0,country,category,id,final_sales
0,Australia,smartwatch,f7ac1912-6b17-41de-a3fc-0efc3c4d4df4,900
1,Germany,laptop,2b08cc8d-6b67-4a8c-8147-e363dd86f0f1,2000
2,Japan,tablet,80a01da4-ae4f-4a2e-bc13-33d605a0a49d,4500
3,Korea,smartphone,99d0c46e-9ae4-4e25-aec0-75bc73f79d06,3000
4,UK,headphones,c5b7f5d2-5d07-43f3-9757-c38e5cfad52d,2000
5,USA,wearable,84db9881-904c-4e2e-b3e6-74596b58a528,9000


Retrieve detailed product specifications for these top-selling products.

In [9]:
# get the product id as a list
ids = top_selling["id"].to_list()

In [10]:
result_df[result_df.id.isin(ids)]

Unnamed: 0,id,category,capacity,color,screen_size,memory,other_specs,product_id,country,price,quantity,final_sales
0,2b08cc8d-6b67-4a8c-8147-e363dd86f0f1,laptop,512.0,silver,15.6 inches,16 gb,High-performance laptop for professionals.,2b08cc8d-6b67-4a8c-8147-e363dd86f0f1,Germany,2000.0,1.0,2000
1,80a01da4-ae4f-4a2e-bc13-33d605a0a49d,tablet,128.0,black,10 inches,8 gb,Portable tablet for entertainment and producti...,80a01da4-ae4f-4a2e-bc13-33d605a0a49d,Japan,900.0,5.0,4500
2,84db9881-904c-4e2e-b3e6-74596b58a528,wearable,4.0,blue,1.78 inches,4 gb,Cool smart watch,84db9881-904c-4e2e-b3e6-74596b58a528,USA,4500.0,2.0,9000
3,99d0c46e-9ae4-4e25-aec0-75bc73f79d06,smartphone,6.0,pink,6.06 inches,4 gb,The most famous smartphone in the market!,99d0c46e-9ae4-4e25-aec0-75bc73f79d06,Korea,1500.0,2.0,3000
4,c5b7f5d2-5d07-43f3-9757-c38e5cfad52d,headphones,,white,,16 gb,Wireless headphones with noise-canceling feature.,c5b7f5d2-5d07-43f3-9757-c38e5cfad52d,UK,200.0,10.0,2000
6,f7ac1912-6b17-41de-a3fc-0efc3c4d4df4,smartwatch,,blue,1.3 inches,2 gb,Fitness tracker and smartwatch with heart rate...,f7ac1912-6b17-41de-a3fc-0efc3c4d4df4,Australia,300.0,3.0,900


Provide additional insights like the total number of distinct products sold and the maximum sales recorded for each category.

In [11]:
# total number of distinct products sold
sales = get_sales_df()
unique_products= sales.product_id.unique()
result_df[result_df.id.isin(unique_products)]

Unnamed: 0,id,category,capacity,color,screen_size,memory,other_specs,product_id,country,price,quantity,final_sales
0,2b08cc8d-6b67-4a8c-8147-e363dd86f0f1,laptop,512.0,silver,15.6 inches,16 gb,High-performance laptop for professionals.,2b08cc8d-6b67-4a8c-8147-e363dd86f0f1,Germany,2000.0,1.0,2000
1,80a01da4-ae4f-4a2e-bc13-33d605a0a49d,tablet,128.0,black,10 inches,8 gb,Portable tablet for entertainment and producti...,80a01da4-ae4f-4a2e-bc13-33d605a0a49d,Japan,900.0,5.0,4500
2,84db9881-904c-4e2e-b3e6-74596b58a528,wearable,4.0,blue,1.78 inches,4 gb,Cool smart watch,84db9881-904c-4e2e-b3e6-74596b58a528,USA,4500.0,2.0,9000
3,99d0c46e-9ae4-4e25-aec0-75bc73f79d06,smartphone,6.0,pink,6.06 inches,4 gb,The most famous smartphone in the market!,99d0c46e-9ae4-4e25-aec0-75bc73f79d06,Korea,1500.0,2.0,3000
4,c5b7f5d2-5d07-43f3-9757-c38e5cfad52d,headphones,,white,,16 gb,Wireless headphones with noise-canceling feature.,c5b7f5d2-5d07-43f3-9757-c38e5cfad52d,UK,200.0,10.0,2000
5,c935302a-ff92-40b9-9052-7b814be04ea5,wearable,3.0,red,1.60 inches,1 gb,Older but cool smart watch,c935302a-ff92-40b9-9052-7b814be04ea5,USA,2500.0,2.0,5000
6,f7ac1912-6b17-41de-a3fc-0efc3c4d4df4,smartwatch,,blue,1.3 inches,2 gb,Fitness tracker and smartwatch with heart rate...,f7ac1912-6b17-41de-a3fc-0efc3c4d4df4,Australia,300.0,3.0,900


In [12]:
#maximum sales recorded for each category
result_df.groupby(["category",]).agg({"final_sales": "max"}).reset_index()

Unnamed: 0,category,final_sales
0,headphones,2000
1,laptop,2000
2,smartphone,3000
3,smartwatch,900
4,tablet,4500
5,wearable,9000


## Code Challenge ETL on Python
### ETL Code Challenge Description
#### Title: ETL Process Simulation for Tech Company Sales Data

Background:

A tech company has multiple tables storing sales and product details. The sales table records transactions including the country, product category, and sales details. The product table includes specifications like capacity and color. Your task is to extract data from these tables, apply transformations to derive new insights, and load the results into a new structured format.


Extract:

Write a Python function to retrieve data from the existing sales and product tables. The extracted data should include country, category, product capacity, color, quantity sold, and final sales amount.

Transform:

Implement transformations to calculate the total revenue for each product (defined as quantity * final_sales).
Categorize each transaction based on sales volume into 'High', 'Medium', or 'Low'.

Load:

Design and create a new table called transformed_sales to store the transformed data.
Load the transformed data into this table with appropriate field names and data types.

In [13]:
#extraction part already defined above.
#transform:
def get_revenue(row):
    return row["quantity"] * row["final_sales"]

def categorize(row):
    if row <= 10000:
        return "low"
    elif row > 18000:
        return "high"
    elif row > 10000 or row <= 18000:
        return "medium"
    
joined_df = get_result_df()
joined_df["revenue"] = joined_df.apply(lambda row: get_revenue(row), axis=1)
joined_df["sales_volume"] = joined_df["revenue"].apply(categorize)
joined_df

Unnamed: 0,id,category,capacity,color,screen_size,memory,other_specs,product_id,country,price,quantity,final_sales,revenue,sales_volume
0,2b08cc8d-6b67-4a8c-8147-e363dd86f0f1,laptop,512.0,silver,15.6 inches,16 gb,High-performance laptop for professionals.,2b08cc8d-6b67-4a8c-8147-e363dd86f0f1,Germany,2000.0,1.0,2000,2000.0,low
1,80a01da4-ae4f-4a2e-bc13-33d605a0a49d,tablet,128.0,black,10 inches,8 gb,Portable tablet for entertainment and producti...,80a01da4-ae4f-4a2e-bc13-33d605a0a49d,Japan,900.0,5.0,4500,22500.0,high
2,84db9881-904c-4e2e-b3e6-74596b58a528,wearable,4.0,blue,1.78 inches,4 gb,Cool smart watch,84db9881-904c-4e2e-b3e6-74596b58a528,USA,4500.0,2.0,9000,18000.0,medium
3,99d0c46e-9ae4-4e25-aec0-75bc73f79d06,smartphone,6.0,pink,6.06 inches,4 gb,The most famous smartphone in the market!,99d0c46e-9ae4-4e25-aec0-75bc73f79d06,Korea,1500.0,2.0,3000,6000.0,low
4,c5b7f5d2-5d07-43f3-9757-c38e5cfad52d,headphones,,white,,16 gb,Wireless headphones with noise-canceling feature.,c5b7f5d2-5d07-43f3-9757-c38e5cfad52d,UK,200.0,10.0,2000,20000.0,high
5,c935302a-ff92-40b9-9052-7b814be04ea5,wearable,3.0,red,1.60 inches,1 gb,Older but cool smart watch,c935302a-ff92-40b9-9052-7b814be04ea5,USA,2500.0,2.0,5000,10000.0,low
6,f7ac1912-6b17-41de-a3fc-0efc3c4d4df4,smartwatch,,blue,1.3 inches,2 gb,Fitness tracker and smartwatch with heart rate...,f7ac1912-6b17-41de-a3fc-0efc3c4d4df4,Australia,300.0,3.0,900,2700.0,low


In [14]:
#load
from sqlalchemy import inspect
from database.models.Tables import TransformedSales, get_base

engine = get_engine()
inspector = inspect(engine)
base = get_base()

if not inspector.has_table("TransformedSales"):
    base.metadata.create_all(engine)

In [None]:
joined_df = joined_df[["id", "country", "revenue", "category", "sales_volume"]]
joined_df.to_sql(name="TransformedSales", con=engine, if_exists='append', index=False)

In [16]:
# query table to test if data loaded correctly
session = get_session()  
query = session.query(TransformedSales.id, TransformedSales.revenue, TransformedSales.sales_volume)
rows = query.all()
results = []
for row in rows:
    results.append(row)
results

[('2b08cc8d-6b67-4a8c-8147-e363dd86f0f1', 2000.0, 'low'),
 ('80a01da4-ae4f-4a2e-bc13-33d605a0a49d', 22500.0, 'high'),
 ('84db9881-904c-4e2e-b3e6-74596b58a528', 18000.0, 'medium'),
 ('99d0c46e-9ae4-4e25-aec0-75bc73f79d06', 6000.0, 'low'),
 ('c5b7f5d2-5d07-43f3-9757-c38e5cfad52d', 20000.0, 'high'),
 ('c935302a-ff92-40b9-9052-7b814be04ea5', 10000.0, 'low'),
 ('f7ac1912-6b17-41de-a3fc-0efc3c4d4df4', 2700.0, 'low')]

## Code Challenge Airflow
Background:

Automating ETL tasks is crucial for ensuring data accuracy and availability in real-time or near-real-time for analysis and decision-making. Airflow is a platform used to programmatically author, schedule, and monitor workflows.

Tasks:

Complete the Python Functions:

Extract Function: 
Implement logic to extract data from a predefined data source. This could be a database, a file, an API, or any simulated data source.

Transform Function: 
Apply necessary data transformations which could include cleaning, aggregating, or any other form of data manipulation.

Load Function: 
Implement the logic to load the transformed data into a specified target, which could be a database or a data warehouse.

Integrate Functions with Airflow:

Use the provided Airflow DAG skeleton to integrate your Python functions.
Configure the DAG to ensure that tasks are executed in the correct order, handling dependencies correctly.

In [52]:
def extract_data() -> pd.DataFrame:
   return get_result_df()

def get_revenue(row):
    return row["quantity"] * row["final_sales"]

def categorize(row):
    if row <= 10000:
        return "low"
    elif row > 18000:
        return "high"
    elif row > 10000 or row <= 18000:
        return "medium"

def transform_data(data : pd.DataFrame) -> pd.DataFrame:
   data["revenue"] = data.apply(lambda row: get_revenue(row), axis=1)
   data["sales_volume"] = data["revenue"].apply(categorize)
   return data

def load_data(data : pd.DataFrame) -> None:
    engine = get_engine()
    inspector = inspect(engine)
    base = get_base()

    if not inspector.has_table("TransformedSales"):
        base.metadata.create_all(engine)
    joined_df = joined_df[["id", "country", "revenue", "category", "sales_volume"]]
    joined_df.to_sql(name="TransformedSales", con=engine, if_exists='append', index=False)

In [53]:
import datetime as dt

In [51]:
from airflow import DAG
from airflow.operators.python import PythonOperator

default_args = {
    "owner" : "alejandra ramirez",
    "start_date" : dt.datetime(2024, 5, 6),
    "retries" : 2,
    "retry_delay" : dt.timedelta(seconds=100)
}

with DAG("sales_airflow",
         default_args=default_args,
         schedule_interval="0 0 * * *"
         ):
    
    extract_data = PythonOperator(task_id = "extract",
                                  python_callable=extract_data())
    transform_data = PythonOperator(task_id = "transform",
                                  python_callable=transform_data())
    load_data = PythonOperator(task_id = "load",
                                  python_callable=load_data())
    
extract_data >> transform_data >> load_data

## Code Challenge API

Tasks:

Extract Data:

Write a Python function to fetch data from a given API endpoint. This function should handle network errors, API rate limits, and other common issues that can occur during API interaction.

Transform Data:

Implement logic to transform the raw data fetched from the API. Assume the data includes various product details; extract and format this data into a structured JSON format that focuses on specific fields like product_id, product_name, category, and price.

Output Data:

Instead of loading the data into a database or storage system, output the transformed data to the console or a file in a clean, readable format. This simulates the final step in an ETL process where data is made available for further use.
Expected Deliverables:

A Python script that efficiently and correctly extracts, transforms, and outputs data as described.
Effective use of exception handling to manage potential errors during the API request.
Logging throughout the process to track operations and facilitate debugging and monitoring.
Evaluation Criteria:

Correctness and Completeness: 
The script should correctly fetch and process the API data according to the specifications provided.

Error Handling: 
Robust handling of errors and exceptional conditions in the API interaction.

Code Quality: 
The code should be clean, well-organized, commented, and follow best practices for Python development.

Output Formatting: 
The transformed data should be outputted in a structured and readable format, demonstrating an understanding of data presentation.
Instructions for Setup and Execution:

Ensure the requests and logging libraries are installed in your Python environment.
Use any public API endpoint of your preference if tech_company related would be better.

In [49]:
import requests
import json

def format_json(data : dict) -> pd.DataFrame:
    products = []
    product_data = data["data"]
    for item in product_data:
        product_dict = {}
        # product related info
        product_dict["purchase_id"] = item["id"]
        product_dict["product_id"] = item["product_id"]
        product_dict["product_name"] = item["product_name"]
        product_dict["product_price"] = item["product_price"]
        product_dict["price_info"] = item["price_info"]
        product_dict["product_category"] = item["product_category"]
        product_dict["quantity"] = item["quantity"]
        
        # client related info
        client_info = item["client_info"]
        product_dict["client_id"] = client_info["client_id"]
        product_dict["client_email"] = client_info["client_email"]
        product_dict["payment_method"] = client_info["payment_method"]
        product_dict["purchased_data"] = client_info["purchased_date"]
        
        products.append(product_dict)
    return pd.DataFrame(products)
       
def write_csv(data : pd.DataFrame) -> None:
    data.to_csv("output/test.csv")
    
    
def get_sales() -> None:
    url = f"https://non.existent-api.org/imaginary"
    params = {
        "real_param_1" : "1",
        "real_param_2" : "2"        
    }
    try:
        response = requests.get(url, params=params)
        response.raise_for_status()
        data = response.json()
        formated_data = format_json(data)
        write_csv(formated_data)
        log.info("Success!")
    except requests.exceptions.ReadTimeout as timeout_err:
        log.error(timeout_err)
    except requests.exceptions.HTTPError as http_err:
        log.error(http_err)
    except requests.exceptions.ConnectionError as conn_err:
        log.error(conn_err)
    except requests.exceptions.RequestException as req_err:
        log.error(req_err)
    finally:
        log.info("Done!")
    

In [46]:
get_sales() #shows error because url doesn't exists, but doesn't break the code.

[2m2024-06-06 11:48:06[0m [[31m[1merror    [0m] [1mHTTPSConnectionPool(host='non.existent-api.org', port=443): Max retries exceeded with url: /imaginary?real_param_1=1&real_param_2=2 (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x1482867f0>: Failed to resolve 'non.existent-api.org' ([Errno 8] nodename nor servname provided, or not known)"))[0m
[2m2024-06-06 11:48:06[0m [[32m[1minfo     [0m] [1mDone!                         [0m


In [50]:
# testing functions with testing data
file = open("test/success-response.json")
data = json.load(file)
formated_data = format_json(data)
write_csv(formated_data)
    

In [48]:
formated_data

Unnamed: 0,purchase_id,product_id,product_name,product_price,price_info,product_category,quantity,client_id,client_email,payment_method,purchased_data
0,123455,123543,smartwatch,5500,MXN,smart device,1,12345,email_1@provider.com,credit card,2023-01-02
1,126435,325563,LV Laptop,20500,MXN,laptops & computers,1,12335,email_2@provider.com,paypal,2023-01-02
2,223755,326573,Computer,25500,MXN,laptops & computers,1,42335,email_3@provider.com,debit card,2023-01-02
3,922436,346873,XPhone,12500,MXN,smartphone,1,42637,email_4@provider.com,credit card,2023-01-02
4,225468,566972,XPhone Pro,17500,MXN,smartphone,1,26739,email_5@provider.com,paypal,2023-01-02
5,428458,346873,XPhone,12500,MXN,smartphone,1,21499,email_6@provider.com,credit card,2023-01-02
6,233596,464592,Book Pro,17500,MXN,laptops & computers,1,23569,email_7@provider.com,debit card,2023-01-02
