Data Extraction and Transformation

One challenge we have at Stadium Goods is correctly reporting international orders when sales are made in different currencies. To solve this, we pull reference exchange rate data that can be used to calculate and report international sales in different currencies. The European Central Bank provides an API to collect this data, documentation can be found here - the data tab should provide the information needed for the excercise.

Using python, write a script that will pull exchange rate data that can be used for reporting. For this exercise, we are looking for data for February 9-10, 2023 using the Key Euro Area Indicators dataflow. Pull data for all daily currencies against the Euro.

Using fake ecommerce data using the API documented here - pull data for all products in the ‘Shoes’ category. Assuming the data for these products is in USD, create a new pandas dataframe that includes all of the products, the price in USD, the price in EUR, and the date of the exchange rate that was used. 

In [348]:
import requests
import pandas as pd
import numpy as np
import json
import requests
import urllib.request
import xml.etree.ElementTree as ET
import csv
import io
from datetime import datetime


# set the API endpoint URL to get data from api

#Get all products by category shoes

url = "https://api.escuelajs.co/api/v1/categories/4/products"
url_data = json.load(urllib.request.urlopen(url))
shoes=pd.json_normalize(url_data)
products=shoes[['creationAt','id','category.name','title','price']]

In [350]:
# set the API endpoint URL to get data from api

#Get  data from ECB SDMX 2.1 RESTful web service

eurobank='https://sdw-wsrest.ecb.europa.eu/service/data/EXR/M.USD+GBP+JPY.EUR.SP00.A'


headers = {'Accept':'text/csv'}
params = {'startPeriod': '2023-02-09', 'endPeriod': '2023-02-10'}

# send a GET request to the API endpoint
response = requests.get(eurobank, headers=headers)
response

#check the response status code (200 means success)
if response.status_code == 200:
    data = response.content.decode('utf-8')
    df = pd.read_csv(io.StringIO(data))
    print(df.head())
    # Parse the CSV data
    #reader = csv.DictReader(data.splitlines())
    #rows = [row for row in reader]
    # Process the data
    #for row in rows:
        #print(row)
else:
    print('Error retrieving data:', response.text)

                    KEY FREQ CURRENCY CURRENCY_DENOM EXR_TYPE EXR_SUFFIX  \
0  EXR.M.GBP.EUR.SP00.A    M      GBP            EUR     SP00          A   
1  EXR.M.GBP.EUR.SP00.A    M      GBP            EUR     SP00          A   
2  EXR.M.GBP.EUR.SP00.A    M      GBP            EUR     SP00          A   
3  EXR.M.GBP.EUR.SP00.A    M      GBP            EUR     SP00          A   
4  EXR.M.GBP.EUR.SP00.A    M      GBP            EUR     SP00          A   

  TIME_PERIOD  OBS_VALUE OBS_STATUS OBS_CONF  ...  COMPILATION  COVERAGE  \
0     1999-01   0.702913          A        F  ...          NaN       NaN   
1     1999-02   0.688505          A        F  ...          NaN       NaN   
2     1999-03   0.671270          A        F  ...          NaN       NaN   
3     1999-04   0.665018          A        F  ...          NaN       NaN   
4     1999-05   0.658252          A        F  ...          NaN       NaN   

  DECIMALS  NAT_TITLE SOURCE_AGENCY  SOURCE_PUB                   TITLE  \
0        5 

In [351]:
df1=df[['CURRENCY','OBS_VALUE','TIME_PERIOD']].copy()
df1.sort_values(['TIME_PERIOD'])
products['CURRENCY']='USD'
database=pd.merge(products,df1,how='outer',on='CURRENCY')
database['price_euro']=round(database.price*database.OBS_VALUE,2)
database=database[['category.name','price','price_euro','TIME_PERIOD']]
database.rename(columns={'price':'price_usd', 'TIME_PERIOD':'time_period'},inplace=True)
database['date']=datetime.now()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  products['CURRENCY']='USD'


In [353]:
database.to_csv('C:/Users\/LUCAS BARBOSA/Desktop/my_project/jupyter/database.csv')

Data Engineering in Production

For these questions please provide a brief (1 paragraph) explanation. Diagrams can be included if it is helpful, but not required.

Using typical sales data as an example, how would you ensure that a data pipeline is kept up to date with accurate data? What tools or process might you use so that sales data is updated daily?

1)First of all apply some data quality process as duplicate entries, missing data. Checking for data consistency between different data sources. 
For instance, if you have sales data that is collected from multiple sources you  want to ensure that the data from each source is consistent and accurate.
Regularly review and optimize the data pipeline to ensure that it is efficient and effective.

2)Our sales and product data is constantly changing - returns can affect previous sales, pricing changes can affect product data tables, etc. - how would you go about building a data pipeline that is able to add new data while also changing or updating existing data that has changed at the source system?

In [None]:
2)#how would you go about building a data pipeline that is able to add new data while also changing or updating
#existing data that has changed at the source system?
existing_data = pd.read_csv('database.csv')



# Filter data that already exists in target system
new_data = pd.DataFrame(data).merge(existing_data, on=['company', 'date'], how='left', indicator=True)
new_data = new_data[new_data['_merge'] == 'left_only']

# Append new data to existing data
all_data = pd.concat([existing_data, new_data])

# Write data to target system
all_data.to_csv('database.csv', index=False)
Implement data versioning: We can use a Python library like git to implement versioning:

import os
import subprocess


# Add data to Git repository
subprocess.run(['git', 'add', 'stocks.csv'])

import logging
import sentry_sdk

# Configure logging
logging.basicConfig(filename='pipeline.log', level=logging.INFO)

# Configure Sentry
sentry_sdk.init(dsn='your-sentry-dsn')

# Log errors and exceptions
try:
    # Run data pipeline code here
except Exception as e:
    logging.exception(e)
    sentry_sdk.capture_exception(e)