# Install needed dependency

In [1]:
%%capture
!pip install numpy pandas sqlalchemy psycopg2

# Importing packages & Reading data

In [2]:
import pandas as pd
import numpy as np
import sqlalchemy
from sqlalchemy import create_engine
import json
from datetime import timedelta
import requests

## Reading Files:
each file contains a different format.

## First file
The first file contains a json like data but is missing the wrapping around.
So we will create a function that will read each line and parse the line as json and add it to array.
at the end return it as DataFrame object.

## Second file
Its a basic json file which can be read using the `read_json` function

## Third file
Its a csv like file using a `-` as the dilimiter.

## Reading the first file:

In [3]:
def read_raw_department_budget():
    with open("raw-department-budget.txt",mode="r") as file:
        lines = [json.loads(line) for line in file]
    return pd.DataFrame(lines)

In [4]:
df_depratment_budget = read_raw_department_budget()
df_depratment_budget

Unnamed: 0,sub_dep_id,sub_dep_name,department_id,budget
0,1,managers,1,3000
1,2,managers2,1,1500
2,1,sales support john,2,2000
3,2,sales support joe,2,1000
4,3,sales support johnson,2,2500
5,4,sales support eduards,2,2500


## Reading the second file:

In [5]:
df_depratment_budget_2 = pd.read_json("raw-department-budget2.txt")
df_depratment_budget_2

Unnamed: 0,sub_dep_id,sub_dep_name,department_id,budget
0,1,IT purchases,3,2000
1,2,IT maintenance,3,1500
2,3,IT other,3,1000


## Reading the third file:

In [6]:
df_depratment = pd.read_csv("raw-department.txt", sep="-")
df_depratment

Unnamed: 0,department_id,department_name
0,1,General
1,2,Sales Support
2,3,IT


## Creating `department_budget`

### Merge And Concat
Concating the first two tables, as they are two parts of the same table.

Then merging into the third table to get the departments.

In [7]:
# Concat the raw-department files because they have the same datafrmae
data_departments_budget = pd.concat([df_depratment_budget,df_depratment_budget_2], axis=0,ignore_index=True)
# Merge the result of the concatination.
data_merge = pd.merge(left=df_depratment,right=data_departments_budget,on="department_id")
data_merge

Unnamed: 0,department_id,department_name,sub_dep_id,sub_dep_name,budget
0,1,General,1,managers,3000
1,1,General,2,managers2,1500
2,2,Sales Support,1,sales support john,2000
3,2,Sales Support,2,sales support joe,1000
4,2,Sales Support,3,sales support johnson,2500
5,2,Sales Support,4,sales support eduards,2500
6,3,IT,1,IT purchases,2000
7,3,IT,2,IT maintenance,1500
8,3,IT,3,IT other,1000


### Grouping columns
We only need the following columns:
- department_id
- department_name
- budget

In [8]:
df_budget = data_merge.groupby(["department_id","department_name"])[["budget"]].sum()
df_budget

Unnamed: 0_level_0,Unnamed: 1_level_0,budget
department_id,department_name,Unnamed: 2_level_1
1,General,4500
2,Sales Support,8000
3,IT,4500


# Storing data in database
Note: this is a localhost database.

Create a connection to the database and create the new table

In [9]:
# Change the information as needed.
user = "postgres"
password = "123123"
db_name = "chinook"
hostname = "localhost"
connection_string = f"postgresql+psycopg2://{user}:{password}@{hostname}/{db_name}"
engine = create_engine(connection_string)

In [10]:
with engine.connect() as conn:
    # if_exists will throw error if the table already exists
    try:
        df_budget.to_sql(name="department_budget",con=conn,if_exists='fail')
    except:
        print("Table already exists.")

Table already exists.


In [11]:
# Check if the database was created successfully.
with engine.connect() as conn:
    budget = pd.read_sql_table('department_budget',conn,schema='stg')

In [12]:
budget

Unnamed: 0,department_id,department_name,budget
0,1,General,4500
1,2,Sales Support,8000
2,3,IT,4500


# API CURRENCIES
The bank of isreal provide an [api](https://www.boi.org.il/media/tzxbuhhj/extracting-representative-exchange-rates-from-the-new-series-database.pdf) to convert from any currency to nis.

> **NOTE:** there is a way to get dates by `from-to` but it doesn't work well.
>
> If the days provided are closed market days then it won't give back any data.
>
> Therefor the api just grabs the entire dataset.

In [13]:
# Proff of api
def get_currency_rate_in_ils(currency = 'USD'):
    """
    currency - 3 letters of the wanted currency
    
    get the currency exchange rates with dates from bank of isreal api
    and returns a filtered DataFrame for the currency provided.
    """
    df = pd.read_csv("https://edge.boi.gov.il/FusionEdgeServer/sdmx/v2/data/dataflow/BOI.STATISTICS/EXR/1.0/?c%5BDATA_TYPE%5D=OF00&format=csv")
    # Filter only the needed currency
    df = df[df['BASE_CURRENCY'] == currency.upper()][['TIME_PERIOD','OBS_VALUE']].reset_index(drop=True)
    df['TIME_PERIOD'] = pd.to_datetime(df['TIME_PERIOD'])
    return df

## Create a currency table

Invoices holds the needed dates to filter from the api.

For this reason we need to read the data from and get only the `invoicedate` column.

In [14]:
with engine.connect() as conn:
    df_invoice = pd.read_sql_table("invoice",conn,schema='stg')
df_invoice.head()

Unnamed: 0,invoiceid,customerid,invoicedate,billingaddress,billingcity,billingstate,billingcountry,billingpostalcode,total,last_update
0,1,2,2018-01-01,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98,2023-01-04 12:05:40.330691
1,2,4,2018-01-02,Ullevålsveien 14,Oslo,,Norway,0171,3.96,2023-01-04 12:05:40.332702
2,3,8,2018-01-03,Grétrystraat 63,Brussels,,Belgium,1000,5.94,2023-01-04 12:05:40.334024
3,4,14,2018-01-06,8210 111 ST NW,Edmonton,AB,Canada,T6G 2C7,8.91,2023-01-04 12:05:40.335723
4,5,23,2018-01-11,69 Salem Street,Boston,MA,USA,2113,13.86,2023-01-04 12:05:40.336992


In [15]:
# Check that the date datatype is set currectly.
df_invoice.dtypes

invoiceid                     int64
customerid                    int64
invoicedate          datetime64[ns]
billingaddress               object
billingcity                  object
billingstate                 object
billingcountry               object
billingpostalcode            object
total                       float64
last_update          datetime64[ns]
dtype: object

In [16]:
# Use the api to get the dataframe.
df_usd_ils = get_currency_rate_in_ils()
df_usd_ils.head()

Unnamed: 0,TIME_PERIOD,OBS_VALUE
0,1948-05-15,2.5e-05
1,1949-09-19,3.6e-05
2,1954-01-01,0.00018
3,1962-02-10,0.0003
4,1967-11-19,0.00035


In [17]:
# Get only the relevent dates for the invoice data.
df_usd_ils = df_usd_ils[df_usd_ils['TIME_PERIOD'].isin(df_invoice['invoicedate'])].reset_index(drop=True)
df_usd_ils.head()

Unnamed: 0,TIME_PERIOD,OBS_VALUE
0,2018-01-02,3.457
1,2018-01-03,3.46
2,2018-01-04,3.448
3,2018-01-05,3.446
4,2018-01-09,3.444


In [18]:
# sorting both lists for later usage of `merge_asof` function.
df_invoice_sorted = df_invoice.sort_values("invoicedate")
df_usd_ils_sorted = df_usd_ils.sort_values("TIME_PERIOD")

In [19]:
# merge_asof only works on sorted lists, therefor
# both lists were sorted, so the new `df_unique_dates` has
# left only the unique values sorted.
df_unique_dates = pd.DataFrame(df_invoice_sorted["invoicedate"].unique(),columns=["dates"])
df_unique_dates

Unnamed: 0,dates
0,2018-01-01
1,2018-01-02
2,2018-01-03
3,2018-01-04
4,2018-01-05
...,...
1587,2022-12-18
1588,2022-12-19
1589,2022-12-20
1590,2022-12-21


In [20]:
# merge_asof get etiher the value which is the same or
# the *nearest* value to it. Therefor is a good choice
# for merging based on dates with missing datas from the
# api.
df_dim_currency = pd.merge_asof(df_unique_dates,df_usd_ils_sorted,left_on="dates",right_on="TIME_PERIOD",direction="nearest",allow_exact_matches=True)
# drop unneeded columns.
df_dim_currency = df_dim_currency.drop(columns="TIME_PERIOD")
# change the column name
df_dim_currency = df_dim_currency.rename(columns={"OBS_VALUE":"ils_value"})
# check that everything worked
df_dim_currency.head()

Unnamed: 0,dates,ils_value
0,2018-01-01,3.457
1,2018-01-02,3.457
2,2018-01-03,3.46
3,2018-01-04,3.448
4,2018-01-05,3.446


## Creating the table in the database

In [21]:
# The data from the api will probably be updated every day or week,
# so when the dataframe is converted to sql it will append to the
# table if its already exists.
with engine.connect() as conn:
    df_dim_currency.to_sql(name="dim_currency",con=conn,schema='dwh',index=False,if_exists='append')
    # check if table does exists
    dim_currency = pd.read_sql_table("dim_currency",con=conn,schema='dwh')
dim_currency.head()

Unnamed: 0,dates,ils_value
0,2018-01-01,3.457
1,2018-01-02,3.457
2,2018-01-03,3.46
3,2018-01-04,3.448
4,2018-01-05,3.446
