<a href="https://colab.research.google.com/github/earo12/ETL_PepsiCo/blob/main/ETL_PepsiCo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# First we need to import some necessary libraries
# Since the PDF doesn't mention what kind of source we're going to need, I'm going to create them with PySpark
# It can be done with other data tools such as Pandas, or a SQL database and even a NoSQL database such as Mongo
# However, I think that the standard way to manipulate data is with Spark so I'm going to use it in this project in particular
# I'm writing this code in Colab, but actually, it can be run in practically every enviroment that incorporates Spark
# If your environment doesn't have installed Spark, please make sure to run this line first:
!pip install pyspark



In [23]:
# Now we can import the necessary libraries:

from pyspark.sql import SparkSession, Row # Session from Spark, it endes up when we close this notebook
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType # Fields recquired for the three tables needed in the challenge

import random # Libraries for insert the data
from datetime import datetime, timedelta
import shutil
from google.colab import files
from pyspark.sql.functions import monotonically_increasing_id


In [24]:
# We can create the Spark Session as sp:

sp = SparkSession.builder.master("local[*]").appName("PepsiCoCase").getOrCreate()




# Extract part

In [25]:
#First we need to insert the data, in the PDF it doesn't make any mention of how
# First let's define the lists for every dataframe, I'm supposing three different locations and three different products
# However it can be made with practically all the locations and products you want:

locations = ["Mexico", "United States", "Canada"]
products = ["Doritos", "Saladitas", "Sabritas", "Gamesa"]
today = datetime.today()


# Also I define the empty lists in which the data is going to be inserted:

inventory_data = []
demand_data = []
replenishment_data = []

# Transform part

In [26]:
# The first table of inventory has data available from one year to today, so to insert the data
# We can use a triple for to achieve it:

for days_in_year in range(365):
  today_date = today - timedelta(days = days_in_year)
  for location in locations:
    for product in products:
      inventory_data.append(Row(
                                location = location,
                                product = product,
                                date = today_date.strftime('%Y-%m-%d'),
                                inventory_quantity = random.randint(0, 1000))) #This line means that the quantity of inventory can be from 0 to 1000



In [27]:
# Now we can make the same process for the other two tables:

# For the demand table in the 60 days before (since it is recquired for the dashboard):

for snap_date in range(60):  # Last 60 days for the snapshots
    snap_dt = today - timedelta(days=snap_date)
    for days_ahead in range(14):  # Projections for the following 14 days
        project_dt = snap_dt + timedelta(days = days_ahead)
        for location in locations:
            for product in products:
                demand_data.append(Row(
                    location = location,
                    product = product,
                    date = project_dt.strftime("%Y-%m-%d"),
                    demand_quantity = random.randint(0, 1000),
                    snapshot_date = snap_dt.strftime("%Y-%m-%d") # Here I put the values for the snapshot_date
                ))


In [28]:
# Finally we can make the same process with the final table which has data available from 3 years:

for days_three_years in range(3 * 365):  # 3 years = 3 * 365
    dt = today - timedelta(days = days_three_years)
    for location in locations:
        for product in products:
            replenishment_data.append(Row(
                location = location,
                product = product,
                date = dt.strftime("%Y-%m-%d"),
                replenishment_quantity=random.randint(0, 1000)
            ))


In [29]:
# We can create the dataframes in Spark:

inventory_df = sp.createDataFrame(inventory_data)
demand_df = sp.createDataFrame(demand_data)
replenishment_df = sp.createDataFrame(replenishment_data)

In [32]:
# Now, we're going to make a dashboard in Power BI, so basically it's going to be needed a PK in every dataframe
# This can be done easy with the monotonically_increasing function
inventory_df  = inventory_df.withColumn("id_inventory", monotonically_increasing_id())
demand_df = demand_df.withColumn("id_demand", monotonically_increasing_id())
replenishment_df = replenishment_df.withColumn("id_replenishment", monotonically_increasing_id())

# Also we can reorder the columns to put the id at the beginnig:
inventory_df = inventory_df.select("id_inventory", "location", "product", "date", "inventory_quantity")
demand_df = demand_df.select("id_demand", "location", "product", "date", "demand_quantity", "snapshot_date")
replenishment_df = replenishment_df.select("id_replenishment", "location", "product", "date", "replenishment_quantity")

In [35]:
# To validate the information is done as we expected, we can use the show method
# IMPORTANT, please remember that Spark has a lazy evaluation
# So please make sure to only use this method only if necessary, to keep Spark as lazy as possible

#inventory_df.show()
#demand_df.show()
#replenishment_df.show()

+----------------+-------------+---------+----------+----------------------+
|id_replenishment|     location|  product|      date|replenishment_quantity|
+----------------+-------------+---------+----------+----------------------+
|               0|       Mexico|  Doritos|2024-12-20|                   423|
|               1|       Mexico|Saladitas|2024-12-20|                   861|
|               2|       Mexico| Sabritas|2024-12-20|                   281|
|               3|       Mexico|   Gamesa|2024-12-20|                     2|
|               4|United States|  Doritos|2024-12-20|                   959|
|               5|United States|Saladitas|2024-12-20|                   788|
|               6|United States| Sabritas|2024-12-20|                   527|
|               7|United States|   Gamesa|2024-12-20|                    91|
|               8|       Canada|  Doritos|2024-12-20|                   829|
|               9|       Canada|Saladitas|2024-12-20|                   463|

# Load part

In [36]:
# Finally, we can save the dataframes as csv (since we don't have any destination to save the data)
# However, it can have other destinations, such as an S3-Bucket, a Redshift Database, a Blob Storage in Azure and a big etcetera
# It's a good point to remember that, Spark will partitionate the final output, so, if we want to have only one partition, it means, only one csv
# We can use the coalesce method:

output_path = "/content/data/"
inventory_df.coalesce(1).write.csv(f"{output_path}inventory", header=True, mode="overwrite")
demand_df.coalesce(1).write.csv(f"{output_path}demand", header=True, mode="overwrite")
replenishment_df.coalesce(1).write.csv(f"{output_path}replenishment", header=True, mode="overwrite")

# Basically, the route will be /content/data/inventory, /content/data/demand, /content/data/replenishment in all the cases


In [37]:
# Dataframes with Spark need to be converted to zip, since it can download too many files at the same time
shutil.make_archive("/content/data_files", 'zip', "/content/data/")

# Finally, if we want to download the files in our computer, we can use it with the following line:
files.download("/content/data_files.zip")

# This will download a .zip in your computer, which has the three files of the three tables
# Make sure that in every file it has the respective csv of each table

# Once again, this is done with Google Colab, I'm not sure if this last line will result in an error if you're trying this in a Jupyter Notebook for example

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>