# Notebook Description

The aim of this notebook is to create a fact table into a PostgreSQL-Database. 
The fact table - *transaction* - will be used by data analysts to create reports and predictions about the companies performance. 
The granularity of the table is transaction id - each row represents a unique transaction. 
The table includes keys to the following dimensions:

1. Customer (**customer_id**)
2. Transaction Date (**transaction_date**) 
3. Payment Provider (**provider**)

The measure of the fact table is:

1. Transaction Amount (**amount_eur**) 

This would allow the data-analysts to create reports and predictions about the companies performance over selected time periods and per customer. The third dimension - payment provider - is needed since the transaction data is coming from two different payment providers. 

The notebook should be triggered on a daily basis. This scheduling is dependent on the freshness interval of the payment data. Since payment data from Provider B is ingested on a daily basis, and this is the "slowest" of the two, the scheduling frequency of this notebook is set to be the same.

# Notebook Skeleton: 


## Extract
1. Extract  <br>

    1.1. Load data from Payment Provider A using REST-API <br>
    
    1.2. Load data from Payment Provider B using CSVs 

## Transform

2. Transform  <br>

    2.1. **Compatibility**  <br>
        2.1.1. Match transaction date format  <br>
        2.1.2. Match the currency in "amounts"  <br>
        2.1.3. Enforce compatibility with data ingestion frequency (daily vs. streaming)  <br>
        
    2.2. **Decryption** - Decrypt customer_id of both datasets <br>
    
    2.3. **Create bigger picture** <br>
        2.3.1. Select common columns between the two datasets - limiting that to what is needed <br>
        2.3.2. Create a new id that is unique across both datasets <br>
        2.3.3. Concatenate the two datasets <br>

## Load
3. Load  <br>

    3.1. Establish connection to Postgres database <br>
    
    3.2. Write fact table to database  <br>
        3.2.1. Write from scratch if the table doesn't already exist <br>
        3.2.2. Append to exisitng table <br>

# Extract

## Extract payment data through REST-API

In [None]:
import requests
import pandas as pd
import json

# Specify API URL
api_url = "https://jsonplaceholder.typicode.com/todos/1"

# Send a GET request 
response = requests.get(api_url)

# Get response in JSON
json_data = response.json()

# Convert JSON to Pandas DataFrame
payment_provider_a_raw = pd.read_json(json_data, orient='index').T

## Extract payment data through CSV
- The CSV files are stored locally 
- Since the CSV files are daily exports, the following directory structure is imposed
- ```/provider_b/year/month/day/payment_data.csv```

In [None]:
import os
import glob
import pandas as pd
import datetime

# Set directory containing CSV files
now = datetime.datetime.now()
one_day = datetime.timedelta(days=1)
load_time = now - one_day
year = load_time.year
month = load_time.month
day = load_time.day

directory = f'/provider_b/{year}/{month}/{day}/payment_data'

# Get all CSV files in directory
csv_files = glob.glob(os.path.join(directory, '*.csv'))

# Loop through CSV files and read them into Pandas DataFrames
dfs = []
for file in csv_files:
    df = pd.read_csv(file)
    dfs.append(df)

# Concatenate all DataFrames into one
payment_provider_b_raw = pd.concat(dfs)

# Transform

In [None]:
# Ensure that the two dataframes comform when it comes to transaction timestamp.
def select_newest_timestamp(df):
    df['transaction_date'] = pd.to_datetime(df['transaction_date'])
    return df.loc[df['transaction_date'] == df['transaction_date'].max(), 'timestamp'].iloc[0]

newest_timestamp = select_newest_timestamp(payment_provider_b)

# Use this timestamp to filter out transactions from payment provider A that are newer
def filter_dataframe(df, timestamp):
    df['transaction_timestamp'] = pd.to_datetime(df['transaction_timestamp'])
    filtered_df = df[df['transaction_timestamp'] <= timestamp]
    return filtered_df

payment_provider_a_transformed = filter_dataframe(payment_provider_a_raw, newest_timestamp)

In [None]:
# Create a column in both datasets that include the name of the payment provider
payment_provider_a_transformed = payment_provider_a_transformed.assign(provider='A')
payment_provider_b_transformed = payment_provider_b_transformed.assign(provider='B')

In [None]:
# Since transaction_id is unique in each respective table, we can use this value hashed with 'provider' value
# to create a new transaction_id that would be unique after the two dfs are concatenated
import hashlib

def generate_unique_id(row):
    # Concatenate values of two columns
    string_to_hash = str(row['id']) + str(row['provider'])
    
    # Apply hash function to generate unique ID
    unique_id = hashlib.sha256(string_to_hash.encode()).hexdigest()
    
    return unique_id

# Apply the generate_unique_id() function to each row of the dataframe
payment_provider_a_transformed['id_hashed'] = payment_provider_a_transformed.apply(generate_unique_id, axis=1)
payment_provider_b_transformed['id_hashed'] = payment_provider_a_transformed.apply(generate_unique_id, axis=1)

# By the same logic, customer_id (present in both dfs) also needs to be regenerated to ensure uniqueness across the two tables
def generate_unique_customer_id(row):
    # Concatenate values of two columns
    string_to_hash = str(row['id']) + str(row['provider'])
    
    # Apply hash function to generate unique ID
    unique_id = hashlib.sha256(string_to_hash.encode()).hexdigest()
    
    return unique_id

When data is not uniform across providers, it means that different providers have different data formats, structures, and quality levels for the same type of data. This can make it difficult to compare and analyze data from different providers because the data may not be directly comparable or may require significant preprocessing to make it comparable.

For example, if you are trying to compare sales data from two different retailers, one retailer may provide sales data in a format that includes more detailed information than the other retailer’s sales data. This can make it difficult to compare the two datasets directly without first preprocessing the data to ensure that they are comparable.

To circumvent this, there are multiple things to be done. The simplest is to reduce the two datasets to the minimum dimensions and facts required. In this case: transaction_id (the id_hashed column created above), timestamp, customer_id, amount, currency, 

If the two providers have different currency formats, that needs to be resolved. All currencies need to be converted to one currency (e.g. EURO) to allow aggregation / summation by the data analysts. To convert currencies, there are different methods and a decision on which method to choose needs to be a common decision by multiple stakeholders (data team, data / business analysts). One example would be to set up a dataset that ingests daily currency rates using a central bank API. Another would be to use that same dataset to use an average monthly conversion rate.

For the purpose of this challenge, I'll assume that the data from provider A contains a currency column and an amount column (i.e. the amount is given in local currency). The transformation would be to convert this into a uniform currency (EURO) based on a conversion rate given in the table (average conversion rate corresponding to the month the transaction took place in). The other dataset from Provider B already contains the amounts in EUROs. 

In [None]:
def create_amount_eur_column(df):
    df['amount_eur'] = df['amount'] * df['avg_conversion_rate']
    return df

payment_provider_a_transformed = create_amount_eur_column(payment_provider_a_transformed)

Another potential discrepancy between the two datasets would be the timestamp / date format of the transaction. It could be that Provider A provides timestamp information about the transaction while Provider B only provides the date the transaction took place. Since the purpose of analysis is to study performance over time, it would suffice to use a "day" granularity. This granularity would be the "least common denominator" between the two datasets. 

In [None]:
def timestamp_to_date(df, column_name):
    df["transaction_date"] = pd.to_datetime(df[column_name]).dt.date
    return df

payment_provider_a_transformed = timestamp_to_date(payment_provider_a_transformed, "timestamp")

When ```customer_id``` is encrypted in a pandas dataframe, it means that the actual customer ID values are replaced with a different set of values that are not easily recognizable as customer IDs. This is done to protect sensitive information and prevent unauthorized access to customer data.

Encryption is a process of converting plain text into a coded form that cannot be read by unauthorized users.

Since ```customer_id``` is encrypted in both dataframes, it needs to be decrypted. This is to allow joining with the customer table in the internal Postgres database later on. For this example, assume that the Crypto.Cipher module is used to decrypt the values in the specified column using the AES algorithm. The key variable contains the encryption key that was used to encrypt the data. The cipher variable is created using this key and the AES encryption mode. We also assume that both dataframes have been encrypted using the same encryption algorithm. 

In [None]:
from Crypto.Cipher import AES

def decrypt_column(df, column_name):
    key = b'Sixteen byte key'
    cipher = AES.new(key, AES.MODE_EAX, nonce=df['nonce'].iloc[0])
    df[column_name] = df[column_name].apply(lambda x: cipher.decrypt(x).decode('utf-8'))
    return df

payment_provider_a_transformed = decrypt_column(payment_provider_a_transformed, "customer_id")
payment_provider_b_transformed = decrypt_column(payment_provider_b_transformed, "customer_id")

In [None]:
# Now the two transformed dataframes can be concatenated to build the bigger picture
def rename_column(df, old_name, new_name):
    df.rename(columns={old_name: new_name}, inplace=True)
    return df

def select_and_concatenate(df1, df2):
    selected_cols = ['id_hash', 'customer_id', 'transaction_date', 'amount_eur', 'provider']
    df1 = df1[selected_cols]
    df2 = df2[selected_cols]
    result = pd.concat([df1, df2])
    return rename_column(result, "id_hash", "transaction_id")

payment_data_transformed = select_and_concatenate(payment_provider_a_transformed, payment_provider_b_transformed)

# Load

Use the ```psycopg2``` module to connect to a Postgres database and the SQLAlchemy module to create an engine for SQL Alchemy.

The ```load_dataframe_to_postgres``` function takes two arguments: the pandas dataframe to load into the database and the name of the table to write to.

The function first connects to the Postgres database using the ```psycopg2.connect``` method. It then creates an engine for SQL Alchemy using the ```create_engine``` method.

The function checks if the specified table already exists in the database using the engine.has_table method. If it does exist, the function appends the dataframe to the existing table using the df.to_sql method with ```if_exists='append'```. If it doesn’t exist, the function writes the dataframe to a new table using the ```df.to_sql``` method with ```if_exists='replace'```.

In [None]:
import psycopg2
from sqlalchemy import create_engine

def load_dataframe_to_postgres(df, table_name):
    # Connect to Postgres database
    conn = psycopg2.connect(
        host="localhost",
        database="mydatabase",
        user="myusername",
        password="mypassword"
    )

    # Create engine for SQL Alchemy
    engine = create_engine('postgresql://myusername:mypassword@localhost/mydatabase')

    # Check if table exists
    if engine.has_table(table_name):
        # Append dataframe to existing table
        df.to_sql(table_name, engine, if_exists='append', index=False)
    else:
        # Write dataframe to new table
        df.to_sql(table_name, engine, if_exists='replace', index=False)

    # Close connection
    conn.close()
    
# Load fact dataframe into Postgres database
load_dataframe_to_postgres(payment_data_transformed, "transaction")

After the fact table - transaction - is written to the database, a BI tool can be used to create the data model. The data model is what the data analysts would be able to access through the BI tool. 

This is a sample SQL query for illustration purpose: 

```SQL SELECT *
FROM transaction t
LEFT JOIN customer c
ON t.customer_id = c.customer_id;```

These are sample SQL queries for analysis:

**Question**: "What is the total amount of transactions month-to-month this year?"

```SQL 
SELECT DATE_TRUNC('month', transaction_date) AS month, 
SUM(amount) AS total_amount
FROM transaction
WHERE EXTRACT(YEAR FROM transaction_date) = EXTRACT(YEAR FROM CURRENT_DATE)
GROUP BY month```

**Question**: "What is the average number of transactions performed by customers?"

```SQL SELECT AVG(num_transactions) AS avg_transactions
FROM (
  SELECT COUNT(*) AS num_transactions
  FROM transaction
  GROUP BY customer_id
) subquery```