In [3]:
import os
import re
import pandas as pd
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from forex_python.converter import CurrencyRates
from sqlalchemy import create_engine

In [135]:
df = pd.read_csv('bookings.csv')

In [136]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19662 entries, 0 to 19661
Data columns (total 9 columns):
 #   Column           Non-Null Count  Dtype 
---  ------           --------------  ----- 
 0   booking_id       19662 non-null  object
 1   restaurant_id    19662 non-null  object
 2   restaurant_name  19662 non-null  object
 3   client_id        19662 non-null  object
 4   client_name      19662 non-null  object
 5   amount           19662 non-null  object
 6   guests           19662 non-null  int64 
 7   date             19662 non-null  object
 8   country          19662 non-null  object
dtypes: int64(1), object(8)
memory usage: 1.4+ MB


In [137]:
df["date"]

0        11/01/2017
1        10-08-2016
2        29/01/2015
3        10/11/2013
4        02/11/2013
            ...    
19657    06/03/2018
19658    30/04/2017
19659    20/01/2019
19660    21/10/2001
19661    14/08/2019
Name: date, Length: 19662, dtype: object

In [138]:
# Convert the date column to datetime data type while handling invalid formats
df['date'] = df['date'].str.replace('/', '-')
df['date'] = pd.to_datetime(df['date'], format='%d-%m-%Y', errors='coerce')

In [92]:
df.country.unique()

array(['Italia', 'France', 'United Kingdom', 'España'], dtype=object)

In [93]:
df.amount

0         90,04 €
1        105,32 €
2          £11.30
3          £86.57
4          £64.28
           ...   
19657     £155.80
19658     81,20 €
19659     £118.01
19660    182,84 €
19661    184,05 €
Name: amount, Length: 19662, dtype: object

In [94]:
df['amount'].apply(lambda x: re.sub(r'[\s\d.,]', '', str(x))).unique()

array(['€', '£'], dtype=object)

In [97]:
df['amount'].replace(r'[^\d.,]+', '', regex=True).replace(',', '.', regex=True).astype(float).unique()

array([ 90.04, 105.32,  11.3 , ..., 118.01, 182.84, 184.05])

In [67]:
currency_symbols = {
    '£': 'GBP',
    '€': 'EUR',
}

In [68]:
c = CurrencyRates()
base_currency = 'EUR'
exchange_rates = c.get_rates(base_currency)

In [69]:
exchange_rates

{'USD': 1.078,
 'JPY': 150.24,
 'BGN': 1.9558,
 'CZK': 23.666,
 'DKK': 7.4505,
 'GBP': 0.85795,
 'HUF': 368.73,
 'PLN': 4.4605,
 'RON': 4.9566,
 'SEK': 11.673,
 'CHF': 0.9716,
 'ISK': 149.5,
 'NOK': 11.612,
 'TRY': 25.1244,
 'AUD': 1.6023,
 'BRL': 5.2965,
 'CAD': 1.4362,
 'CNY': 7.6839,
 'HKD': 8.4493,
 'IDR': 15996.72,
 'INR': 88.89,
 'KRW': 1391.11,
 'MXN': 18.7356,
 'MYR': 4.9739,
 'NZD': 1.7627,
 'PHP': 60.426,
 'SGD': 1.448,
 'THB': 37.277,
 'ZAR': 20.1806}

In [70]:
def convert_to_euro(amount, currency):
    currency_code = currency_symbols.get(currency, base_currency)
    return amount / exchange_rates[currency_code] if currency_code in exchange_rates else amount

In [71]:
df['currency'] = df['amount'].apply(lambda x: re.sub(r'[\s\d.,]', '', str(x)))

In [76]:
df['amount'] = df['amount'].replace(r'[^\d.,]+', '', regex=True).replace(',', '.', regex=True).astype(float)

In [77]:
df['amount_euro'] = df.apply(lambda row: convert_to_euro(row['amount'], row['currency']), axis=1)

In [78]:
df[["amount", "currency", "amount_euro"]]

Unnamed: 0,amount,currency,amount_euro
0,90.04,€,90.040000
1,105.32,€,105.320000
2,11.30,£,13.170931
3,86.57,£,100.903316
4,64.28,£,74.922781
...,...,...,...
19657,155.80,£,181.595664
19658,81.20,€,81.200000
19659,118.01,£,137.548808
19660,182.84,€,182.840000


In [98]:
def preprocess_and_load_data():
    import re
    from forex_python.converter import CurrencyRates

    # Create a SQLAlchemy engine to connect to PostgreSQL
    engine = create_engine('postgresql://airflow:admin@localhost/airflow')

    # Execute a query to select all rows from the bookings_table and read the results into a DataFrame
    df = pd.read_sql_query("SELECT * FROM bookings_table;", engine)

    # Define the currency symbols mapping to currency codes
    currency_symbols = {
        '£': 'GBP',
        '€': 'EUR',
    }

    # Fetch the latest exchange rates using forex-python
    c = CurrencyRates()
    base_currency = 'EUR'
    exchange_rates = c.get_rates(base_currency)

    # Function to convert amount to Euro currency
    def convert_to_euro(amount, currency):
        currency_code = currency_symbols.get(currency, base_currency)
        return amount / exchange_rates[currency_code] if currency_code in exchange_rates else amount

    # Preprocess the amount column to extract and put the currency symbol in a new currency column
    df['currency'] = df['amount'].apply(lambda x: re.sub(r'[\s\d.,]', '', str(x)))

    # Preprocess the amount column to remove currency symbols, white spaces and to convert amounts to float
    df['amount'] = df['amount'].replace(r'[^\d.,]+', '', regex=True).replace(',', '.', regex=True).astype(float)

    # Convert each amount to Euro currency using the convert_to_euro function
    df['amount_euro'] = df.apply(lambda row: convert_to_euro(row['amount'], row['currency']), axis=1)

    # Load data into a staging preprocessed table
    df.to_sql('preprocessed_table', engine, if_exists='replace', index=False)

In [99]:
preprocess_and_load_data()