Skip to content

Commit

Permalink
Refactor data_storing.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
Seeker14491 committed Aug 3, 2020
1 parent 3094c7f commit 6f97935
Showing 1 changed file with 61 additions and 58 deletions.
119 changes: 61 additions & 58 deletions src/data_storing.rs
@@ -1,43 +1,45 @@
use crate::common::DistanceData;
use anyhow::Error;
use futures::{
prelude::*,
stream::{FuturesOrdered, FuturesUnordered},
FutureExt, TryFutureExt, TryStreamExt,
};
use steamworks::ugc::PublishedFileVisibility;
use tokio_postgres::error::SqlState;

pub async fn run(db: &mut tokio_postgres::Client, data: DistanceData) -> Result<(), Error> {
let mut tr_owned = db.transaction().await?;
let tr = &tr_owned;
let mut transaction_owned = db.transaction().await?;
let transaction = &transaction_owned;

println!("Clearing the database");
tr.batch_execute("TRUNCATE levels, users CASCADE").await?;
transaction
.batch_execute("TRUNCATE levels, users CASCADE")
.await?;

println!("Inserting users into the database");
let stmt = &tr
let stmt = &transaction
.prepare("INSERT INTO users VALUES ($1, $2) ON CONFLICT DO NOTHING")
.await?;
data.users
.iter()
.map(|user| async move {
tr.execute(stmt, &[&(user.steam_id as i64), &user.name])
transaction
.execute(stmt, &[&(user.steam_id as i64), &user.name])
.map_ok(drop)
.await
})
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<_>>()
.try_for_each(|_| future::ok(()))
.await?;

println!("Inserting levels into the database");
let stmt = &tr
let stmt = &transaction
.prepare("INSERT INTO levels (name, is_sprint, is_challenge, is_stunt) VALUES ($1, $2, $3, $4) RETURNING id")
.await?;
let level_ids: Vec<i32> = data
.levels
.iter()
.map(|level| async move {
let row = &tr
let row = &transaction
.query(
stmt,
&[
Expand All @@ -57,31 +59,29 @@ pub async fn run(db: &mut tokio_postgres::Client, data: DistanceData) -> Result<
.await?;

println!("Inserting the rest of the data into the database");
let wld_stmt = &tr
let wld_stmt = &transaction
.prepare("INSERT INTO workshop_level_details VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)")
.await?;
let sprint_stmt = &tr
let sprint_stmt = &transaction
.prepare("INSERT INTO sprint_leaderboard_entries VALUES ($1, $2, $3, $4, $5)")
.await?;
let challenge_stmt = &tr
let challenge_stmt = &transaction
.prepare("INSERT INTO challenge_leaderboard_entries VALUES ($1, $2, $3, $4, $5)")
.await?;
let stunt_stmt = &tr
let stunt_stmt = &transaction
.prepare("INSERT INTO stunt_leaderboard_entries VALUES ($1, $2, $3, $4, $5)")
.await?;
let futs = FuturesUnordered::new();
level_ids
.iter()
.zip(data.levels.iter())
.map(|(level_id, level)| {
if let Some(details) = &level.workshop_level_details {
let visibility = match details.visibility {
PublishedFileVisibility::Public => "public",
PublishedFileVisibility::FriendsOnly => "friends_only",
PublishedFileVisibility::Private => "private",
};
let fut = async move {
tr.execute(
for (level_id, level) in level_ids.iter().zip(data.levels.iter()) {
if let Some(details) = &level.workshop_level_details {
let visibility = match details.visibility {
PublishedFileVisibility::Public => "public",
PublishedFileVisibility::FriendsOnly => "friends_only",
PublishedFileVisibility::Private => "private",
};
let fut = async move {
transaction
.execute(
wld_stmt,
&[
&level_id,
Expand All @@ -101,13 +101,14 @@ pub async fn run(db: &mut tokio_postgres::Client, data: DistanceData) -> Result<
)
.map_ok(drop)
.await
};
futs.push(fut.boxed());
}
};
futs.push(fut.boxed());
}

for entry in &level.sprint_entries {
let fut = async move {
tr.execute(
for entry in &level.sprint_entries {
let fut = async move {
transaction
.execute(
sprint_stmt,
&[
&level_id,
Expand All @@ -119,13 +120,14 @@ pub async fn run(db: &mut tokio_postgres::Client, data: DistanceData) -> Result<
)
.map_ok(drop)
.await
};
futs.push(fut.boxed());
}
};
futs.push(fut.boxed());
}

for entry in &level.challenge_entries {
let fut = async move {
tr.execute(
for entry in &level.challenge_entries {
let fut = async move {
transaction
.execute(
challenge_stmt,
&[
&level_id,
Expand All @@ -137,13 +139,14 @@ pub async fn run(db: &mut tokio_postgres::Client, data: DistanceData) -> Result<
)
.map_ok(drop)
.await
};
futs.push(fut.boxed());
}
};
futs.push(fut.boxed());
}

for entry in &level.stunt_entries {
let fut = async move {
tr.execute(
for entry in &level.stunt_entries {
let fut = async move {
transaction
.execute(
stunt_stmt,
&[
&level_id,
Expand All @@ -155,30 +158,29 @@ pub async fn run(db: &mut tokio_postgres::Client, data: DistanceData) -> Result<
)
.map_ok(drop)
.await
};
futs.push(fut.boxed());
}
})
.for_each(drop);
};
futs.push(fut.boxed());
}
}

futs.try_collect().await?;
futs.try_for_each(|_| future::ok(())).await?;

println!("Updating 'last_updated' timestamp");
{
let tr = &mut tr_owned;
let tr_2 = tr.transaction().await?;
let result = tr_2
let transaction = &mut transaction_owned;
let nested_transaction = transaction.transaction().await?;
let result = nested_transaction
.batch_execute("INSERT INTO metadata (last_updated) VALUES (now())")
.await;
match result {
// No timestamp existed
Ok(_) => {
tr_2.commit().await?;
nested_transaction.commit().await?;
}
// Timestamp already existed
Err(e) if e.code() == Some(&SqlState::UNIQUE_VIOLATION) => {
tr_2.rollback().await?;
tr.batch_execute("UPDATE metadata SET last_updated = now()")
Err(e) if e.code() == Some(&tokio_postgres::error::SqlState::UNIQUE_VIOLATION) => {
nested_transaction.rollback().await?;
transaction.batch_execute("UPDATE metadata SET last_updated = now()")
.await?;
}
// Some other error
Expand All @@ -188,7 +190,8 @@ pub async fn run(db: &mut tokio_postgres::Client, data: DistanceData) -> Result<
}
}

tr_owned.commit().await?;
println!("Committing changes");
transaction_owned.commit().await?;

Ok(())
}

0 comments on commit 6f97935

Please sign in to comment.