Skip to content

Commit

Permalink
Merge pull request #9 from Dorianteffo/dev_dorian
Browse files Browse the repository at this point in the history
subs details
  • Loading branch information
Dorianteffo committed Mar 15, 2024
2 parents 900076b + 5f30590 commit 1913efa
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cd_data_generator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
branches:
- master
paths:
- 'data_generator/**'
- 'data_generator/**/*.py'


jobs:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci_data_generator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
branches:
- master
paths:
- 'data_generator/**'
- 'data_generator/**/*.py'

jobs:
ci:
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
tf-var.txt
*.pem
airflow/
Lambda_ingestion/
Lambda_ingestion/
DBT/
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@
* Before running the terraform apply
export TF_VAR_pwd_db=your_db_password

* Create schema in the RDS database
* Snowflake structure
```
RAW : database to store raw data coming from Airbyte (schemas : postgres_airbyte )
ANALYTICS : the production data (schemas: staging, intermediate, marts(finance))
DBT_DEV: the dev database (the same schemas as the production database)
```

* To ingest data from RDS to Snowflake

* SSH tunneling : ssh -L 8000:private_instance_private_ip:8000 ssh_user@public_server_ip -N

Binary file added assets/oltp_data_model.PNG
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added assets/rds_snowflake_connection.PNG
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added assets/rds_snowflake_sync.PNG
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 3 additions & 5 deletions data_generator/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
## Prerequisites

Before pushing the code, we need to create two schema in the RDS database :
* test
To connect to the database, you can use DBEAVER, and the RDS is in the private subnets, you will need to specify the SSH Tunnel (EC2 instance in a public subnet in the same VPC)
Before pushing the code, we need to create this schema in the database:
* app


The python code will run every 5 hours and load 500 new rows to the tables.

Run `make generate-data`in the EC2 instance
The python code will run every 2 hours and load 100 new rows to the tables.
2 changes: 1 addition & 1 deletion data_generator/data/generate_data/bank.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def random_swift_bic():

def generate_bank_data() -> list:
bank_data = []
for i in range(100):
for i in range(500):
common_record = {
"id": random.randint(1000, 9999),
"account_number": random_number(),
Expand Down
21 changes: 20 additions & 1 deletion data_generator/data/generate_data/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def generate_subscriptions_data(transactions: list) -> list:
subscriptions = []
for record in transactions:
subscription_data = {
"id": record["id"],
"id": random.randint(1000, 9999),
"plan": random_plan(),
"status": random_status(),
"payment_method": random_payment_method(),
Expand All @@ -57,3 +57,22 @@ def generate_subscriptions_data(transactions: list) -> list:
subscriptions.append(subscription_data)

return subscriptions



def generate_random_subscription_details(transactions : list):
subscriptions_details = []

for record in transactions :
data = {
"id" : random.randint(1000, 9999),
"user_id": record["user_id"],
"revenue" : round(random.uniform(10, 1000), 2),
"quantity" : random.randint(1, 10),
"discount_amount" : round(random.uniform(0, 100), 2),
"rating" : random.randint(1, 5)
}

subscriptions_details.append(data)

return subscriptions_details
8 changes: 6 additions & 2 deletions data_generator/data/main_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from generate_data.bank import generate_bank_data
from generate_data.credit_card import generate_credit_card_data
from generate_data.stripe_transactions import generate_transactions_data
from generate_data.subscription import generate_subscriptions_data
from generate_data.subscription import generate_subscriptions_data, generate_random_subscription_details
from generate_data.user import generate_users_data, get_user_ids_from_data
from load_to_postgres import close_conn, create_conn, load_table
from utils.db import WarehouseConnection, get_warehouse_creds
Expand All @@ -25,6 +25,9 @@ def main():
subscriptions_data = generate_subscriptions_data(transactions_data)
subscriptions_df = pd.DataFrame(subscriptions_data)

subscriptions_details_data = generate_random_subscription_details(transactions_data)
subscriptions_details_df = pd.DataFrame(subscriptions_details_data)

users_data = generate_users_data(
get_user_ids_from_data(credit_card_data, subscriptions_data),
credit_card_data,
Expand All @@ -46,14 +49,15 @@ def main():
load_table(
subscriptions_df, engine, 'subscription', schema_name, load_mode
)
load_table(subscriptions_details_df, engine, 'subscription_details', schema_name, load_mode)
load_table(users_df, engine, 'user', schema_name, load_mode)

close_conn(engine)


if __name__ == '__main__':
# generate data every 2 hours
schedule.every(2).hours.do(main)
schedule.every(2).minutes.do(main)

while True:
schedule.run_pending()
Expand Down

0 comments on commit 1913efa

Please sign in to comment.