Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ch7 invalid sub 2 new db schema + subscription confirmation #6

Merged
merged 17 commits into from
Mar 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ actix-web = "4.5.1"
chrono = { version = "0.4.35", default-features = false, features = ["clock"] }
config = "0.14.0"
log = "0.4.21"
rand = { version = "0.8.5", features = ["std_rng"] }
reqwest = { version = "0.12.1", default-features = false, features = ["json", "rustls-tls"] }
secrecy = { version = "0.8.0", features = ["serde"] }
serde = { version = "1.0.197", features = ["derive"] }
Expand Down Expand Up @@ -54,3 +55,4 @@ quickcheck = "0.9.2"
quickcheck_macros = "0.9.1"
wiremock = "0.6.0"
serde_json = "1.0.115"
linkify = "0.10.0"
1 change: 1 addition & 0 deletions configuration/local.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
application:
host: 127.0.0.1
base_url: "http://127.0.0.1"
database:
require_ssl: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- Add migration script here
-- 1. backfill null status to `confirmed` in `subscriptions` table
-- 2. and then mark status column as NOT NULL
-- Make 1 and 2 in a single transaction
BEGIN;
UPDATE subscriptions
SET status = 'confirmed'
WHERE status IS NULL;
ALTER TABLE subscriptions ALTER COLUMN status SET NOT NULL;
COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Add migration script here
CREATE TABLE subscription_tokens (
subscription_token TEXT NOT NULL,
subscriber_id uuid NOT NULL
REFERENCES subscriptions (id),
PRIMARY KEY (subscription_token)
);
5 changes: 5 additions & 0 deletions spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ services:
routes:
- path: /
envs:
# We use DO's APP_URL to inject the dynamically
# provisioned base url as an environment variable
- key: APP_APPLICATION__BASE_URL
scope: RUN_TIME
value: ${APP_URL}
- key: APP_DATABASE__USERNAME
scope: RUN_TIME
value: ${newsletter.USERNAME}
Expand Down
1 change: 1 addition & 0 deletions src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct DatabaseSettings {
pub struct ApplicationSettings {
pub host: String,
pub port: u16,
pub base_url: String,
}

#[derive(serde::Deserialize, Clone)]
Expand Down
2 changes: 2 additions & 0 deletions src/routes/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
mod health_check;
mod subscriptions;
mod subscriptions_confirm;

pub use health_check::*;
pub use subscriptions::*;
pub use subscriptions_confirm::*;
138 changes: 119 additions & 19 deletions src/routes/subscriptions.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use actix_web::{web, HttpResponse, Responder};
use chrono::Utc;
use sqlx::PgPool;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use sqlx::{Executor, PgPool, Postgres, Transaction};
use uuid::Uuid;

use crate::domain::{NewSubscriber, SubscriberEmail, SubscriberName};
use crate::{
domain::{NewSubscriber, SubscriberEmail, SubscriberName},
email_client::EmailClient,
startup::ApplicationBaseUrl,
};

#[derive(serde::Deserialize)]
pub struct FormData {
Expand All @@ -24,49 +29,144 @@ impl TryFrom<FormData> for NewSubscriber {

#[tracing::instrument(
name = "Adding a new subscriber", // span message, fn name by default
skip(form, db_pool), // skip these two fields in the span
skip(form, db_pool, email_client), // skip these two fields in the span
fields(
subscriber_email = %form.email,
subscriber_name = %form.name
)
)]
pub async fn subscribe(form: web::Form<FormData>, db_pool: web::Data<PgPool>) -> impl Responder {
pub async fn subscribe(
form: web::Form<FormData>,
db_pool: web::Data<PgPool>,
email_client: web::Data<EmailClient>,
base_url: web::Data<ApplicationBaseUrl>,
) -> impl Responder {
// `web::Form` is a wrapper around `FormData`
// `form.0` gives us access to the underlying `FormData`
let new_subscriber = match form.0.try_into() {
Ok(sub) => sub,
Err(_) => return HttpResponse::BadRequest().finish(),
};

match insert_subscriber(&new_subscriber, &db_pool).await {
Ok(_) => HttpResponse::Ok().finish(),
Err(_) => HttpResponse::InternalServerError().finish(),
let mut txn = match db_pool.begin().await {
Ok(t) => t,
Err(_) => return HttpResponse::InternalServerError().finish(),
};

let subscriber_id = match insert_subscriber(&new_subscriber, &mut txn).await {
Ok(id) => id,
Err(_) => return HttpResponse::InternalServerError().finish(),
};
let subscription_token = generate_subscription_token();

if store_token(&mut txn, subscriber_id, &subscription_token)
.await
.is_err()
{
return HttpResponse::InternalServerError().finish();
}

if send_confirmation_email(
new_subscriber,
&email_client,
&base_url.0,
&subscription_token,
)
.await
.is_err()
{
return HttpResponse::InternalServerError().finish();
}

if txn.commit().await.is_err() {
return HttpResponse::InternalServerError().finish();
}

HttpResponse::Ok().finish()
}

#[tracing::instrument(
name = "Saving new subscriber details in the database",
skip(form, pool)
skip(form, txn)
)]
async fn insert_subscriber(form: &NewSubscriber, pool: &PgPool) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO subscriptions (id, email, name, subscribed_at, status)
VALUES ($1, $2, $3, $4, 'confirmed')
"#,
Uuid::new_v4(),
async fn insert_subscriber(
form: &NewSubscriber,
txn: &mut Transaction<'_, Postgres>,
) -> Result<Uuid, sqlx::Error> {
let uuid = Uuid::new_v4();
let query = sqlx::query!(
r#"INSERT INTO subscriptions (id, email, name, subscribed_at, status)
VALUES ($1, $2, $3, $4, 'pending_confirmation')"#,
uuid,
form.email.as_ref(),
form.name.as_ref(),
Utc::now()
)
.execute(pool)
.await
.map_err(|e| {
);
txn.execute(query).await.map_err(|e| {
tracing::error!("Failed to execute query: {:?}", e);
e
// Using the `?` operator to return early
// if the function failed, returning a sqlx::Error
// We will talk about error handling in depth later!
})?;
Ok(uuid)
}

#[tracing::instrument(
name = "Send a confirmation email to a new subscriber",
skip(email_client, form, base_url, subscription_token)
)]
pub async fn send_confirmation_email(
form: NewSubscriber,
email_client: &EmailClient,
base_url: &str,
subscription_token: &str,
) -> Result<(), reqwest::Error> {
let confirmation_link = format!(
"{}/subscriptions/confirm?subscription_token={}",
base_url, subscription_token
);
let plain_body = format!(
"Welcome to our newsletter!\nVisit {} to confirm your subscription.",
confirmation_link
);
let html_body = format!(
"Welcome to our newsletter!<br />\
Click <a href=\"{}\">here</a> to confirm your subscription.",
confirmation_link
);

email_client
.send_email(form.email, "welcome", &html_body, &plain_body)
.await
}

#[tracing::instrument(
name = "Storing subscription token in the database",
skip(txn, subscriber_token)
)]
pub async fn store_token(
txn: &mut Transaction<'_, Postgres>,
subscriber_id: Uuid,
subscriber_token: &str,
) -> Result<(), sqlx::Error> {
let query = sqlx::query!(
r#"INSERT INTO subscription_tokens (subscription_token, subscriber_id)
VALUES ($1, $2)"#,
subscriber_token,
subscriber_id
);
txn.execute(query).await.map_err(|e| {
tracing::error!("Failed to execute query: {:?}", e);
e
})?;
Ok(())
}

fn generate_subscription_token() -> String {
let mut rng = thread_rng();
std::iter::repeat_with(|| rng.sample(Alphanumeric))
.map(char::from)
.take(25)
.collect()
}
70 changes: 70 additions & 0 deletions src/routes/subscriptions_confirm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use actix_web::{web, HttpResponse, Responder};
use sqlx::PgPool;
use uuid::Uuid;

#[derive(serde::Deserialize, Debug)]
pub struct Parameters {
pub subscription_token: String,
}

#[tracing::instrument(name = "Confirm a pending subscriber", skip(params, db_pool))]
pub async fn confirm(params: web::Query<Parameters>, db_pool: web::Data<PgPool>) -> impl Responder {
let id = match get_subscriber_id_from_token(&db_pool, &params.subscription_token).await {
Ok(id) => id,
Err(_) => return HttpResponse::BadRequest().finish(),
};

match id {
None => HttpResponse::Unauthorized().finish(),
Some(subscriber_id) => {
if confirm_subscriber(&db_pool, subscriber_id).await.is_err() {
return HttpResponse::InternalServerError().finish();
}
HttpResponse::Ok().finish()
}
}
}

#[tracing::instrument(name = "Mark subcriber as confirmed", skip(id, pool))]
pub async fn confirm_subscriber(pool: &PgPool, id: Uuid) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
UPDATE subscriptions
SET status = 'confirmed'
WHERE id = $1
"#,
id
)
.execute(pool)
.await
.map_err(|e| {
tracing::error!(
"Failed to update subscriber status in the database: {:?}",
e
);
e
})?;
Ok(())
}

#[tracing::instrument(name = "Retrieving subscriber ID from the database", skip(token, pool))]
async fn get_subscriber_id_from_token(
pool: &PgPool,
token: &str,
) -> Result<Option<Uuid>, sqlx::Error> {
let result = sqlx::query!(
r#"
SELECT subscriber_id
FROM subscription_tokens
WHERE subscription_token = $1
"#,
token,
)
.fetch_optional(pool)
.await
.map_err(|e| {
tracing::error!("Failed to execute query: {:?}", e);
e
})?;
Ok(result.map(|r| r.subscriber_id))
}
Loading