Skip to content

Commit

Permalink
Merge pull request #26 from PocketRelay/dev
Browse files Browse the repository at this point in the history
Merging improvements and fixes for v0.5.3
  • Loading branch information
jacobtread committed Jul 2, 2023
2 parents 9fd7e8a + 90c0c9b commit a1b4eec
Show file tree
Hide file tree
Showing 12 changed files with 365 additions and 286 deletions.
409 changes: 201 additions & 208 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "pocket-relay"
version = "0.5.2"
version = "0.5.3"
description = "Pocket Relay Server"
readme = "README.md"
keywords = ["EA", "PocketRelay", "MassEffect"]
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Expand Up @@ -8,7 +8,7 @@ RUN apk add curl
WORKDIR /app

# Download server executable
RUN curl -LJ -o pocket-relay-linux https://github.com/PocketRelay/Server/releases/download/v0.5.2/pocket-relay-linux
RUN curl -LJ -o pocket-relay-linux https://github.com/PocketRelay/Server/releases/download/v0.5.3/pocket-relay-linux

# Make the server executable
RUN chmod +x ./pocket-relay-linux
Expand Down
6 changes: 3 additions & 3 deletions src/middleware/blaze_upgrade.rs
Expand Up @@ -116,12 +116,12 @@ impl BlazeUpgrade {
}

/// Extracts the host address from the provided headers map
fn extract_host(headers: &HeaderMap) -> Option<String> {
fn extract_host(headers: &HeaderMap) -> Option<Box<str>> {
// Get the port header
let header = headers.get(HEADER_HOST)?;
// Convert the header to a string
let header = header.to_str().ok()?;
Some(header.to_string())
Some(Box::from(header))
}
}

Expand Down Expand Up @@ -170,7 +170,7 @@ where
};

// Get the client host
let host: String = match BlazeUpgrade::extract_host(headers) {
let host: Box<str> = match BlazeUpgrade::extract_host(headers) {
Some(value) => value,
None => return Box::pin(ready(Err(BlazeUpgradeError::CannotUpgrade))),
};
Expand Down
6 changes: 3 additions & 3 deletions src/services/game/mod.rs
Expand Up @@ -83,7 +83,7 @@ pub struct GameSnapshot {
/// The game attributes
pub attributes: AttrMap,
/// Snapshots of the game players
pub players: Vec<GamePlayerSnapshot>,
pub players: Box<[GamePlayerSnapshot]>,
}

/// Attributes map type
Expand All @@ -109,7 +109,7 @@ pub struct GamePlayerSnapshot {
/// The player ID of the snapshot
pub player_id: PlayerID,
/// The player name of the snapshot
pub display_name: String,
pub display_name: Box<str>,
/// The player net data of the snapshot if collected
pub net: Option<NetData>,
}
Expand Down Expand Up @@ -139,7 +139,7 @@ impl GamePlayer {
pub fn snapshot(&self, include_net: bool) -> GamePlayerSnapshot {
GamePlayerSnapshot {
player_id: self.player.id,
display_name: self.player.display_name.clone(),
display_name: Box::from(self.player.display_name.as_ref()),
net: if include_net {
Some(self.net.clone())
} else {
Expand Down
8 changes: 4 additions & 4 deletions src/services/leaderboard/mod.rs
Expand Up @@ -123,7 +123,7 @@ impl Leaderboard {
/// on their value.
///
/// `ty` The leaderboard type
async fn compute(ty: &LeaderboardType) -> Vec<LeaderboardEntry> {
async fn compute(ty: &LeaderboardType) -> Box<[LeaderboardEntry]> {
let start_time = Instant::now();

// The amount of players to process in each database request
Expand Down Expand Up @@ -180,7 +180,7 @@ impl Leaderboard {

debug!("Computed leaderboard took: {:.2?}", start_time.elapsed());

values
values.into_boxed_slice()
}
}

Expand Down Expand Up @@ -228,7 +228,7 @@ fn compute_n7_player(db: &'static DatabaseConnection, player: Player) -> Lf {
let rating: u32 = total_promotions * 30 + total_level;
Ok(LeaderboardEntry {
player_id: player.id,
player_name: player.display_name,
player_name: player.display_name.into_boxed_str(),
// Rank is not computed yet at this stage
rank: 0,
value: rating,
Expand All @@ -248,7 +248,7 @@ fn compute_cp_player(db: &'static DatabaseConnection, player: Player) -> Lf {
.unwrap_or(0);
Ok(LeaderboardEntry {
player_id: player.id,
player_name: player.display_name,
player_name: player.display_name.into_boxed_str(),
// Rank is not computed yet at this stage
rank: 0,
value,
Expand Down
9 changes: 4 additions & 5 deletions src/services/leaderboard/models.rs
Expand Up @@ -11,7 +11,7 @@ pub struct LeaderboardEntry {
/// The ID of the player this entry is for
pub player_id: PlayerID,
/// The name of the player this entry is for
pub player_name: String,
pub player_name: Box<str>,
/// The ranking of this entry (Position in the leaderboard)
pub rank: usize,
/// The value this ranking is based on
Expand All @@ -23,7 +23,7 @@ pub struct LeaderboardEntry {
/// no longer be considered valid
pub struct LeaderboardGroup {
/// The values stored in this entity group
pub values: Vec<LeaderboardEntry>,
pub values: Box<[LeaderboardEntry]>,
/// The time at which this entity group will become expired
pub expires: SystemTime,
}
Expand All @@ -34,7 +34,7 @@ impl LeaderboardGroup {

/// Creates a new leaderboard group which has an expiry time set
/// to the LIFETIME and uses the provided values
pub fn new(values: Vec<LeaderboardEntry>) -> Self {
pub fn new(values: Box<[LeaderboardEntry]>) -> Self {
let expires = SystemTime::now() + Self::LIFETIME;
Self { expires, values }
}
Expand All @@ -43,10 +43,9 @@ impl LeaderboardGroup {
/// is already considered to be expired. Used to hand out
/// a value while computed to prevent mulitple computes happening
pub fn dummy() -> Self {
let values = Vec::with_capacity(0);
Self {
expires: SystemTime::UNIX_EPOCH,
values,
values: Box::new([]),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/services/mod.rs
Expand Up @@ -17,7 +17,7 @@ pub struct Services {
pub game_manager: Link<GameManager>,
pub matchmaking: Link<Matchmaking>,
pub leaderboard: Link<Leaderboard>,
pub retriever: Option<Retriever>,
pub retriever: Link<Retriever>,
pub sessions: Link<AuthedSessions>,
pub tokens: Tokens,
}
Expand Down
160 changes: 133 additions & 27 deletions src/services/retriever/mod.rs
@@ -1,7 +1,11 @@
//! Retriever system for connecting and retrieving data from the official
//! Mass Effect 3 servers.

use std::fmt::{Debug, Display};
use std::{
fmt::{Debug, Display},
ops::Add,
time::{Duration, SystemTime},
};

use self::origin::OriginFlowService;
use crate::{
Expand All @@ -18,35 +22,101 @@ use blaze_pk::{
};
use blaze_ssl_async::{stream::BlazeStream, BlazeError};
use futures_util::{SinkExt, StreamExt};
use interlink::prelude::*;
use log::{debug, error, log_enabled};
use models::InstanceRequest;
use origin::OriginFlow;
use reqwest;
use serde::Deserialize;
use thiserror::Error;
use tokio::io;
use tokio_util::codec::Framed;

mod models;

pub mod origin;

/// Structure for the retrievier system which contains the host address
/// for the official game server in order to make further connections
#[derive(Service)]
pub struct Retriever {
instance: OfficialInstance,
// Optional official instance if fetching is possible
instance: Option<OfficialInstance>,

/// Optional service for creating origin flows if enabled
pub origin_flow: Option<OriginFlowService>,
origin_flow: Option<OriginFlowService>,
}

#[derive(Message)]
#[msg(rtype = "Result<OriginFlow, GetFlowError>")]
pub struct GetOriginFlow;

#[derive(Debug, Error)]
pub enum GetFlowError {
#[error("Retriever is disabled or unavailable")]
Unavailable,
#[error("Unable to obtain retriever instance")]
Instance,
#[error("Failed to obtain session")]
Session,
#[error("Origin authentication is not enabled")]
OriginDisabled,
}

impl Handler<GetOriginFlow> for Retriever {
type Response = Sfr<Retriever, GetOriginFlow>;

fn handle(
&mut self,
_msg: GetOriginFlow,
_ctx: &mut interlink::service::ServiceContext<Self>,
) -> Self::Response {
Sfr::new(move |act: &mut Retriever, _ctx| {
Box::pin(async move {
let mut instance = act.instance.as_ref().ok_or(GetFlowError::Unavailable)?;

// Obtain a new instance if the current one is expired
if instance.expiry < SystemTime::now() {
debug!("Current official instance is outdated.. retrieving a new instance");

instance = match OfficialInstance::obtain().await {
Ok(value) => act.instance.insert(value),
Err(err) => {
act.instance = None;
error!("Official server instance expired but failed to obtain new instance: {}", err);
return Err(GetFlowError::Instance);
}
};
}

let session = instance.session().await.ok_or(GetFlowError::Session)?;
let flow = act
.origin_flow
.as_ref()
.ok_or(GetFlowError::OriginDisabled)?
.create(session);

Ok(flow)
})
})
}
}

/// Connection details for an official server instance
struct OfficialInstance {
/// The host address of the official server
host: String,
/// The port of the official server.
port: u16,
/// The time the instance should expire at
expiry: SystemTime,
}

/// Errors that could occur while attempting to obtain
/// an official server instance details
#[derive(Debug, Error)]
pub enum InstanceError {
#[error("Failed to request lookup from google: {0}")]
#[error("Failed to request lookup from cloudflare: {0}")]
LookupRequest(#[from] reqwest::Error),
#[error("Failed to lookup server response empty")]
MissingValue,
Expand All @@ -57,7 +127,32 @@ pub enum InstanceError {
}

impl OfficialInstance {
/// Time an official instance should be considered valid for (2 hours)
const LIFETIME: Duration = Duration::from_secs(60 * 60 * 2);

/// The hostname for the redirector server
///
/// If this service goes down the same logic is available
/// from https://winter15.gosredirector.ea.com:42230/redirector/getServerInstance
/// using an XML structure:
///
/// <?xml version="1.0" encoding="UTF-8"?>
/// <serverinstancerequest>
/// <blazesdkversion>3.15.6.0</blazesdkversion>
/// <blazesdkbuilddate>Dec 21 2012 12:47:10</blazesdkbuilddate>
/// <clientname>MassEffect3-pc</clientname>
/// <clienttype>CLIENT_TYPE_GAMEPLAY_USER</clienttype>
/// <clientplatform>pc</clientplatform>
/// <clientskuid>pc</clientskuid>
/// <clientversion>05427.124</clientversion>
/// <dirtysdkversion>8.14.7.1</dirtysdkversion>
/// <environment>prod</environment>
/// <clientlocale>1701729619</clientlocale>
/// <name>masseffect-3-pc</name>
/// <platform>Windows</platform>
/// <connectionprofile>standardSecure_v3</connectionprofile>
/// <istrial>0</istrial>
/// </serverinstancerequest>
const REDIRECTOR_HOST: &str = "gosredirector.ea.com";
/// The port for the redirector server.
const REDIRECT_PORT: Port = 42127;
Expand All @@ -81,8 +176,14 @@ impl OfficialInstance {
let InstanceNet { host, port } = instance.net;
let host: String = host.into();

debug!("Retriever setup complete. (Host: {} Port: {})", &host, port);
Ok(OfficialInstance { host, port })
debug!(
"Retriever instance obtained. (Host: {} Port: {})",
&host, port
);

let expiry = SystemTime::now().add(Self::LIFETIME);

Ok(OfficialInstance { host, port, expiry })
}

/// Attempts to resolve the address of the official gosredirector. First attempts
Expand Down Expand Up @@ -110,9 +211,17 @@ impl OfficialInstance {
}
}

// Attempt to lookup using google HTTP DNS
let url = format!("https://dns.google/resolve?name={host}&type=A");
let mut response: LookupResponse = reqwest::get(url).await?.json().await?;
// Attempt to lookup using cloudflares DNS over HTTP

let client = reqwest::Client::new();
let url = format!("https://cloudflare-dns.com/dns-query?name={host}&type=A");
let mut response: LookupResponse = client
.get(url)
.header("Accept", "application/dns-json")
.send()
.await?
.json()
.await?;

response
.answer
Expand All @@ -134,17 +243,17 @@ impl Retriever {
/// ip address of the gosredirector.ea.com host and then creates a
/// connection to the redirector server and obtains the IP and Port
/// of the Official server.
pub async fn new(config: RetrieverConfig) -> Option<Retriever> {
if !config.enabled {
return None;
}

let instance = match OfficialInstance::obtain().await {
Ok(value) => value,
Err(error) => {
error!("Failed to setup redirector: {}", error);
return None;
pub async fn new(config: RetrieverConfig) -> Link<Retriever> {
let instance = if config.enabled {
match OfficialInstance::obtain().await {
Ok(value) => Some(value),
Err(error) => {
error!("Failed to setup redirector: {}", error);
None
}
}
} else {
None
};

let origin_flow = if config.origin_fetch {
Expand All @@ -155,17 +264,12 @@ impl Retriever {
None
};

Some(Retriever {
let this = Retriever {
instance,
origin_flow,
})
}
};

/// Creates a stream to the main server and wraps it with a
/// session returning that session. Will return None if the
/// stream failed.
pub async fn session(&self) -> Option<OfficialSession> {
self.instance.session().await
this.start()
}
}

Expand Down Expand Up @@ -197,6 +301,8 @@ pub enum RetrieverError {
pub type RetrieverResult<T> = Result<T, RetrieverError>;

impl OfficialSession {
/// Creates a session with an official server at the provided
/// `host` and `port`
async fn connect(host: &str, port: Port) -> Result<OfficialSession, BlazeError> {
let stream = BlazeStream::connect((host, port)).await?;
Ok(Self {
Expand Down

0 comments on commit a1b4eec

Please sign in to comment.