In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

In [3]:
from pyspark.sql.functions import *

In [4]:
spark = SparkSession.builder.getOrCreate()

In [5]:
spark

In [6]:
# Read Products table
productsDF = spark.read \
    .format("json") \
    .load("../data/products.json")

# Read TicketLines table
ticketLinesDF = spark.read \
    .format("csv") \
    .option("header", "true") \
    .load("../data/ticket_line.csv")

# Read Stores table
storesDF = spark.read \
    .format("csv") \
    .option("header", "true") \
    .load("../data/stores.csv")

In [7]:
productsDF.show()

+--------------------+----------+--------------------+
|          categories|product_id|        product_name|
+--------------------+----------+--------------------+
|[{1023, Dairy pro...|         1|        Milbona milk|
|[{1023, Dairy pro...|         2|      Milbona cheese|
|[{1023, Dairy pro...|         3|       Milbona yogur|
|    [{1025, Cereal}]|         4|    Crownfield black|
|    [{1025, Cereal}]|         5|Crownfield with milk|
|   [{1026, Cookies}]|         6|      Sondey cookeys|
|[{1026, Cookies},...|         7|    Sondey digestive|
|   [{1026, Cookies}]|         8|    Sondey hazelnuts|
|[{1028, Vegetables}]|         9|      Freshona beans|
|[{1028, Vegetables}]|        10|   Freshona potatoes|
+--------------------+----------+--------------------+



In [8]:
ticketLinesDF.show()

+---------+----------+--------+----------+--------+
|ticket_id|product_id|store_id|      date|quantity|
+---------+----------+--------+----------+--------+
|       22|         8|      41|2022-01-02|       3|
|      258|         6|      40|2022-01-02|       2|
|      623|         2|      45|2022-01-02|       2|
|       37|         5|      47|2022-01-02|       2|
|      378|         4|      47|2022-01-02|       2|
|      391|         1|      46|2022-01-02|       2|
|      399|        10|      47|2022-01-02|       3|
|      727|         3|      45|2022-01-02|       3|
|      994|         2|      44|2022-01-02|       2|
|      844|         6|      42|2022-01-02|       3|
|      234|        10|      42|2022-01-02|       1|
|      206|         7|      44|2022-01-02|       2|
|      477|        10|      44|2022-01-02|       3|
|      496|         8|      43|2022-01-02|       3|
|      586|         1|      47|2022-01-02|       3|
|      511|        10|      41|2022-01-02|       3|
|      507| 

In [31]:
storesDF.show()

+--------+-------+-------+
|store_id|country|version|
+--------+-------+-------+
|      40|     DE|      1|
|      41|     DE|      1|
|      42|     DE|      1|
|      43|     DE|      1|
|      44|     DE|      1|
|      45|     DE|      1|
+--------+-------+-------+



In [14]:
totalQuantityDF = ticketLinesDF.groupBy("product_id") \
  .agg(sum("quantity").alias("total_quantity"))

totalQuantityDF.show()

+----------+--------------+
|product_id|total_quantity|
+----------+--------------+
|         7|         219.0|
|         3|         193.0|
|         8|         179.0|
|         5|         160.0|
|         6|         240.0|
|         9|         147.0|
|         1|         211.0|
|        10|         206.0|
|         4|         228.0|
|         2|         193.0|
+----------+--------------+



In [15]:
joinedDF = ticketLinesDF.join(productsDF, ["product_id"]) \
  .join(storesDF, ["store_id"]) \
  .select("ticket_id", "product_name", "country")

joinedDF.show()

+---------+--------------------+-------+
|ticket_id|        product_name|country|
+---------+--------------------+-------+
|       22|    Sondey hazelnuts|     DE|
|      258|      Sondey cookeys|     DE|
|      623|      Milbona cheese|     DE|
|      727|       Milbona yogur|     DE|
|      994|      Milbona cheese|     DE|
|      844|      Sondey cookeys|     DE|
|      234|   Freshona potatoes|     DE|
|      206|    Sondey digestive|     DE|
|      477|   Freshona potatoes|     DE|
|      496|    Sondey hazelnuts|     DE|
|      511|   Freshona potatoes|     DE|
|      507|      Milbona cheese|     DE|
|      271|      Sondey cookeys|     DE|
|      567|   Freshona potatoes|     DE|
|      175|       Milbona yogur|     DE|
|       45|       Milbona yogur|     DE|
|      385|Crownfield with milk|     DE|
|      708|   Freshona potatoes|     DE|
|      552|Crownfield with milk|     DE|
|      889|    Sondey digestive|     DE|
+---------+--------------------+-------+
only showing top

### 1. Find how many different stores is each product being sold. Please consider only the stores provided in the store.csv file, as not all stores are included in the Lidl Plus program.

In [16]:
# First Question
distinctStoresDF = productsDF \
    .join(ticketLinesDF, productsDF.product_id == ticketLinesDF.product_id, "left") \
    .groupBy(productsDF.product_id) \
    .agg(count(ticketLinesDF.store_id).alias("num_stores")) \
    .orderBy(productsDF.product_id, ascending = [True])

distinctStoresDF.show()

+----------+----------+
|product_id|num_stores|
+----------+----------+
|         1|       111|
|         2|        96|
|         3|       104|
|         4|       119|
|         5|        86|
|         6|       112|
|         7|       105|
|         8|        89|
|         9|        77|
|        10|       101|
+----------+----------+



### 2. Calculate the 2nd most selling store for each product as we need a target for advertisement. As the previous one, consider only the stores that are included in the store.csv file.

In [21]:
rankedStoresDF.show(50)

+----------+--------+--------------+----+
|product_id|store_id|total_quantity|rank|
+----------+--------+--------------+----+
|         1|      43|          46.0|   1|
|         1|      42|          31.0|   2|
|         1|      41|          29.0|   3|
|         1|      44|          22.0|   4|
|         1|      40|          20.0|   5|
|         1|      45|          18.0|   6|
|        10|      45|          34.0|   1|
|        10|      42|          30.0|   2|
|        10|      43|          28.0|   3|
|        10|      41|          26.0|   4|
|        10|      44|          23.0|   5|
|        10|      40|          19.0|   6|
|         2|      43|          29.0|   1|
|         2|      42|          25.0|   2|
|         2|      41|          25.0|   3|
|         2|      45|          24.0|   4|
|         2|      44|          23.0|   5|
|         2|      40|          16.0|   6|
|         3|      42|          31.0|   1|
|         3|      44|          28.0|   2|
+----------+--------+-------------

In [17]:
from pyspark.sql.window import Window

windowSpec = Window.partitionBy("product_id").orderBy(desc("total_quantity"))

rankedStoresDF = ticketLinesDF \
  .join(storesDF, ["store_id"]) \
  .groupBy("product_id", "store_id") \
  .agg(sum("quantity").alias("total_quantity")) \
  .withColumn("rank", row_number().over(windowSpec))

secondMostSellingDF = rankedStoresDF \
  .filter(col("rank") == 2) \
  .select("product_id", "store_id", "total_quantity")

secondMostSellingDF.show()

+----------+--------+--------------+
|product_id|store_id|total_quantity|
+----------+--------+--------------+
|         1|      42|          31.0|
|        10|      42|          30.0|
|         2|      42|          25.0|
|         3|      44|          28.0|
|         4|      42|          35.0|
|         5|      41|          24.0|
|         6|      45|          42.0|
|         7|      40|          36.0|
|         8|      41|          21.0|
|         9|      40|          23.0|
+----------+--------+--------------+



### 3. The marketing team wants to group all these second stores by product category, so they can focus on different stores by using the same advertisement approach. As they don’t care about internal id’s, please provide one row per product category_name and include all the stores within that row.

In [27]:
groupedStoresDF = secondMostSellingDF \
  .join(productsDF, secondMostSellingDF["product_id"] == productsDF["product_id"]) \
  .select(productsDF["categories.category_name"].alias("category_name"), secondMostSellingDF["store_id"]) \
  .groupBy("category_name") \
  .agg(collect_list("store_id").alias("stores"))

groupedStoresDF.show(truncate=False)

+-----------------+------------+
|category_name    |stores      |
+-----------------+------------+
|[Cereal]         |[42, 41]    |
|[Vegetables]     |[42, 40]    |
|[Cookies, Cereal]|[40]        |
|[Dairy product]  |[42, 42, 44]|
|[Cookies]        |[45, 41]    |
+-----------------+------------+



### 4. Now, let’s imagine that the integration team is developing a new version for the stores model. They will send this new data for the “version 2” of stores available only for some countries. (That means that you will be receiving both versions simultaneously). They have changed the meaning of store_id and country, so the new store_id has the country prepended to it like “FR99” and the country field is omitted.

In [36]:
storesV2DF = spark.read.csv("../data/stores_v2.csv", header=True)

# Separar la columna "store_id" en el DataFrame "storesV2DF"
storesV2DF = storesV2DF.select(
    col("store_id").substr(3, 2).alias("store_id"), 
    col("store_id").substr(1, 2).alias("country"), "version")

# Actualizar el DataFrame "storesDF" para incorporar los datos del DataFrame "storesV2DF"
storesDF = storesDF.union(storesV2DF)

# Mostrar el DataFrame integrado resultante
storesDF.show()

+--------+-------+-------+
|store_id|country|version|
+--------+-------+-------+
|      40|     DE|      1|
|      41|     DE|      1|
|      42|     DE|      1|
|      43|     DE|      1|
|      44|     DE|      1|
|      45|     DE|      1|
|      46|     DE|      1|
|      47|     DE|      2|
+--------+-------+-------+



### 5. Considering this ETL complete, we need to productize it: prepare it on the version control system, automatically deploy the changes, ensure that there is no software regression on new releases, and check that the process works fine. Which steps would you take?

To productize the ETL process and ensure smooth deployment and regression testing, here are some steps you can take:

Version Control System: Set up a version control system (e.g., Git) to manage your ETL code and configurations. Commit your code to the repository, and create branches or tags for different releases or versions.

Continuous Integration/Continuous Deployment (CI/CD): Implement a CI/CD pipeline to automate the deployment process. Use tools like Jenkins, GitLab CI/CD, or AWS CodePipeline to automate the build, testing, and deployment stages.

Automated Testing: Develop comprehensive automated tests for your ETL process. This includes unit tests to validate individual components and integration tests to verify the end-to-end functionality. Use testing frameworks like PyTest or Spark Testing Base for Spark-related testing.

Regression Testing: As part of the CI/CD pipeline, include regression tests to ensure that new releases or changes do not introduce software regressions. These tests should cover critical functionalities and edge cases to identify any issues or data inconsistencies.

Monitoring and Logging: Implement robust monitoring and logging mechanisms to capture and track the ETL process's performance and errors. Use tools like ELK Stack (Elasticsearch, Logstash, Kibana) or Apache Airflow's monitoring capabilities to gain visibility into the process's execution.

Documentation: Document your ETL process, including the overall architecture, data flow, and dependencies. Capture the configuration parameters and any specific deployment requirements. This documentation will help ensure consistency and facilitate future updates or maintenance.

Rollback Strategy: Define a rollback strategy in case any issues arise during the deployment of a new version. This may involve rolling back to the previous version, taking appropriate backups, or implementing a blue-green deployment approach to switch between versions seamlessly.

Continuous Improvement: Continuously monitor the ETL process's performance and reliability. Collect feedback from users, monitor system metrics, and address any identified issues or bottlenecks. Use this feedback to drive continuous improvement of the ETL process.

By following these steps, you can ensure the stability, reliability, and scalability of your ETL process while automating the deployment and testing processes. This helps in maintaining data consistency, reducing manual efforts, and enabling faster releases with confidence.

## AWS Data Architecture with Cloud-Native Services:

![Pregunta5-2.PNG](attachment:Pregunta5-2.PNG)

# Data Architecture Exercise

### We want you to design a data system. Let us think of an actual use case: a team is developing an app for Lidl’s cycling service that we are about to launch in Deutschland.
Consider the following scenarios:

• An external bike provider is responsible for the stock and maintenance of the bikes. They send us their master data (about the bikes) in CSV files through an FTP connection and leave the files in a cloud bucket once every 24h at night.

• The mobile app tracks the user’s behavior regarding the bike’s service and sends many events (more than 1 million/second) as soon as they are created in compressed format. 

The needs of our system:
• We need to handle and be able to query the app’s events in real-time.

• Somehow, we need to coordinate the info of our bike provider with the availability, the status, and id of the bikes before 6:30 am (the hour that our bike service opens).

• We need real-time monitoring dashboards.

• The relevant data for every stakeholder should be available as soon as possible (data science team, data analyst, business, developers, etc.).

• You must choose the solution cost-wise (mind efficiency, there is no need to justify the detail of costs).

The challenge deliverable:

− Draw a diagram (flow or persona, there’s no need to be a low level one) with your data architecture solution.
Bear in mind that you need to be able to defend your choice vigorously. We do not want you to spend too much time on this challenge since it is just an excuse to have something to discuss in the interview so consider that you can be asked for things like:

• Explain the data architecture you have chosen to solve the problem (kappa, lambda, lake house, etc.).

o Justify your decision.

o Explain the components and technologies you used very briefly.

• How would you test and monitor?

• How to govern/audit this architecture?

• How would you make this system resilient and fault-tolerant?

• How would you handle and configure the throughput?

![DataArchitectureFlowDiagram.PNG](attachment:DataArchitectureFlowDiagram.PNG)