Skip to content

Commit

Permalink
Fixes Code crash due to multiple requests on reset
Browse files Browse the repository at this point in the history
  • Loading branch information
Carlos Cabanero committed Jan 25, 2022
1 parent d911bfc commit 32a47b7
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 79 deletions.
12 changes: 12 additions & 0 deletions Blink.xcodeproj/project.pbxproj
Expand Up @@ -106,6 +106,7 @@
BD81522D2739A91D002BB169 /* BlinkLogging.swift in Sources */ = {isa = PBXBuildFile; fileRef = BD9EA20A271F62ED00874007 /* BlinkLogging.swift */; };
BD81522E2739A91D002BB169 /* Publisher.swift in Sources */ = {isa = PBXBuildFile; fileRef = BD9EA20C271F664D00874007 /* Publisher.swift */; };
BD8152542743FF84002BB169 /* skstore.swift in Sources */ = {isa = PBXBuildFile; fileRef = BD8152532743FF84002BB169 /* skstore.swift */; };
BD835DD427A0BD19002C37D7 /* ReplaySubject.swift in Sources */ = {isa = PBXBuildFile; fileRef = BD835DD027A0BD19002C37D7 /* ReplaySubject.swift */; };
BD896F7B26CEAD37004313E6 /* FileTranslatorCache.swift in Sources */ = {isa = PBXBuildFile; fileRef = BD896F7A26CEAD37004313E6 /* FileTranslatorCache.swift */; };
BD896F8926CFDB17004313E6 /* Atomics in Frameworks */ = {isa = PBXBuildFile; productRef = BD896F8826CFDB17004313E6 /* Atomics */; };
BD8BBF5525F829B00084705F /* SEKeyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = BD8BBF0825F819970084705F /* SEKeyTests.swift */; };
Expand Down Expand Up @@ -761,6 +762,7 @@
BD7810A42640C36100114700 /* NWConnection+WriterTo.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "NWConnection+WriterTo.swift"; sourceTree = "<group>"; };
BD81521C27387D1F002BB169 /* Certificates.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Certificates.swift; sourceTree = "<group>"; };
BD8152532743FF84002BB169 /* skstore.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = skstore.swift; sourceTree = "<group>"; };
BD835DD027A0BD19002C37D7 /* ReplaySubject.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ReplaySubject.swift; sourceTree = "<group>"; };
BD896F7A26CEAD37004313E6 /* FileTranslatorCache.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FileTranslatorCache.swift; sourceTree = "<group>"; };
BD8BBF0825F819970084705F /* SEKeyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SEKeyTests.swift; sourceTree = "<group>"; };
BD8BBFAF25F947710084705F /* Keys.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Keys.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -1528,6 +1530,14 @@
path = Security;
sourceTree = "<group>";
};
BD835DCF27A0BD19002C37D7 /* Publisher */ = {
isa = PBXGroup;
children = (
BD835DD027A0BD19002C37D7 /* ReplaySubject.swift */,
);
path = Publisher;
sourceTree = "<group>";
};
BD8D898025DC535600E55D9E /* keys */ = {
isa = PBXGroup;
children = (
Expand Down Expand Up @@ -1590,6 +1600,7 @@
BDBFA3062728914F00C77798 /* BlinkCode */ = {
isa = PBXGroup;
children = (
BD835DCF27A0BD19002C37D7 /* Publisher */,
BD8DB640279B2FA200497C88 /* SSH */,
BD81521C27387D1F002BB169 /* Certificates.swift */,
BD8DB645279B512900497C88 /* CodeFileSystem.swift */,
Expand Down Expand Up @@ -2787,6 +2798,7 @@
BD81522D2739A91D002BB169 /* BlinkLogging.swift in Sources */,
BD81522E2739A91D002BB169 /* Publisher.swift in Sources */,
BD8DB648279B512900497C88 /* CodeFileSystemService.swift in Sources */,
BD835DD427A0BD19002C37D7 /* ReplaySubject.swift in Sources */,
BD8DB642279B2FA200497C88 /* SSHClient.swift in Sources */,
BD67FC79272B30F300C1EE75 /* Messages.swift in Sources */,
BDBFA3212728925C00C77798 /* WebSocketServer.swift in Sources */,
Expand Down
76 changes: 46 additions & 30 deletions BlinkCode/CodeFileSystemService.swift
Expand Up @@ -47,24 +47,30 @@ struct MountEntry: Codable {
}

class TranslatorControl {
let translator: Translator
let connectionControl: SSHClientControl
var translator: Translator? = nil
var builder: AnyPublisher<Translator, Error>
var connectionControl: SSHClientControl? = nil

init(_ translator: Translator, connectionControl: SSHClientControl) {
self.translator = translator
self.connectionControl = connectionControl
var isConnected: Bool { translator?.isConnected ?? false }

init(_ builder: AnyPublisher<Translator, Error>) {
self.builder = builder
}

// init(_ translator: Translator, connectionControl: SSHClientControl) {
// self.translator = translator
// self.connectionControl = connectionControl
// }

deinit {
self.connectionControl.cancel()
self.connectionControl?.cancel()
}
}

public class CodeFileSystemService: CodeSocketDelegate {

let server: WebSocketServer
let log: BlinkLogger

public let port: UInt16
var tokens: [Int: MountEntry] = [:]
var tokenIdx = 0;
Expand All @@ -73,11 +79,11 @@ public class CodeFileSystemService: CodeSocketDelegate {

private let finishedCallback: ((Error?) -> ())
func finished(_ error: Error?) { finishedCallback(error) }

public var state: NWListener.State {
server.listener.state
}

public func registerMount(name: String, root: String) -> Int {
tokenIdx += 1
tokens[tokenIdx] = MountEntry(name: name, root: root)
Expand Down Expand Up @@ -114,12 +120,12 @@ public class CodeFileSystemService: CodeSocketDelegate {
self.port = port.rawValue
self.server = try WebSocketServer(listenOn: port, tls: tls)
self.finishedCallback = finished

self.log = CodeFileSystemLogger.log("FileSystem")

self.server.delegate = self
}

func getRoot(token: Int, version: Int) -> WebSocketServer.ResponsePublisher {
if let mount = self.tokens[token] {
return .just((try! JSONEncoder().encode(mount), nil)).eraseToAnyPublisher()
Expand Down Expand Up @@ -174,52 +180,62 @@ public class CodeFileSystemService: CodeSocketDelegate {
let rootPath = uri.rootPath

if let host = rootPath.host,
let tRef = translators[host],
tRef.translator.isConnected {
return CodeFileSystem(.just(tRef.translator), uri: uri)
let tRef = translators[host] {
if tRef.translator != nil && tRef.isConnected {
return CodeFileSystem(tRef.builder, uri: uri)
} else if tRef.translator == nil {
return CodeFileSystem(tRef.builder, uri: uri)
}
}


// If we have a host, check the builders, otherwise it is local
// If there is a builder, check if there is a Translator it is connected

switch(rootPath.protocolIdentifier) {
case "blinksftp":
guard let hostAlias = rootPath.host else {
throw WebSocketError(message: "Missing host on rootpath")
}

let translator = AnyPublisher(SSHClient
.dial(hostAlias, withConfigProvider: SSHClientFileProviderConfig.config)
.flatMap { connControl in
Just(connControl.connection)
.flatMap { $0.requestSFTP() }
.tryMap { try SFTPTranslator(on: $0) }
.receive(on: self.server.queue)
.map { t -> Translator in
self.translators[hostAlias] = TranslatorControl(t, connectionControl: connControl)
self.translators[hostAlias]?.translator = t
self.translators[hostAlias]?.connectionControl = connControl
return t
}
})

}
.shareReplay(maxValues: 1)
)

translators[hostAlias] = TranslatorControl(translator)
return CodeFileSystem(translator, uri: uri)

case "blinkfs":
// The local one does not need to be saved.
return CodeFileSystem(.just(BlinkFiles.Local()), uri: uri)
default:
throw WebSocketError(message: "Unknown protocol - \(rootPath.protocolIdentifier)")
}
}
}
}
}

public struct RootPath {
let url: URL // should be private

//var fullPath: String { url.absoluteString }
var protocolIdentifier: String { url.scheme! }
var host: String? { url.host }
var filesAtPath: String { url.path }

init(_ rootPath: String) {
self.url = URL(string: rootPath)!
}

init(_ url: URL) {
self.url = url
}
Expand All @@ -241,13 +257,13 @@ class SSHClientFileProviderConfig {

let host = try bkConfig.bkSSHHost(title)
let hostName = host.hostName

if let signers = bkConfig.signer(forHost: host) {
signers.forEach { (signer, name) in
_ = agent.loadKey(signer, aka: name, constraints: consts)
}
}

for (signer, name) in bkConfig.defaultSigners() {
_ = agent.loadKey(signer, aka: name, constraints: consts)
}
Expand Down Expand Up @@ -281,7 +297,7 @@ struct CodeFileSystemLogger {
.sinkToOutput()
}
)

let dateFormatter = DateFormatter()
dateFormatter.dateFormat = "MMM dd YYYY, HH:mm:ss"
if let file = try? FileLogging(to: BlinkPaths.blinkCodeErrorLogURL()) {
Expand Down
79 changes: 79 additions & 0 deletions BlinkCode/Publisher/ReplaySubject.swift
@@ -0,0 +1,79 @@
//////////////////////////////////////////////////////////////////////////////////
//
// B L I N K
//
// Copyright (C) 2016-2019 Blink Mobile Shell Project
//
// This file is part of Blink.
//
// Blink is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Blink is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Blink. If not, see <http://www.gnu.org/licenses/>.
//
// In addition, Blink is also subject to certain additional terms under
// GNU GPL version 3 section 7.
//
// You should have received a copy of these additional terms immediately
// following the terms and conditions of the GNU General Public License
// which accompanied the Blink Source Code. If not, see
// <http://www.github.com/blinksh/blink>.
//
////////////////////////////////////////////////////////////////////////////////


import Combine
import Foundation


extension Publisher {
func shareReplay(maxValues: Int = 0) -> AnyPublisher<Output, Failure> {
multicast(subject: ReplaySubject(maxValues: maxValues)).autoconnect().eraseToAnyPublisher()
}
}

final class ReplaySubject<Input, Failure: Error>: Subject {
typealias Output = Input
private var recording = Record<Input, Failure>.Recording()
private let stream = PassthroughSubject<Input, Failure>()
private let maxValues: Int
private let lock = NSRecursiveLock()
private var completed = false

init(maxValues: Int = 0) {
self.maxValues = maxValues
}
func send(subscription: Subscription) {
subscription.request(maxValues == 0 ? .unlimited : .max(maxValues))
}
func send(_ value: Input) {
lock.lock(); defer { lock.unlock() }
recording.receive(value)
stream.send(value)
if recording.output.count == maxValues {
send(completion: .finished)
}
}
func send(completion: Subscribers.Completion<Failure>) {
lock.lock(); defer { lock.unlock() }
if !completed {
completed = true
recording.receive(completion: completion)
}
stream.send(completion: completion)
}
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Input == S.Input {
lock.lock(); defer { lock.unlock() }
Record(recording: self.recording)
.append(self.stream)
.receive(subscriber: subscriber)
}
}

0 comments on commit 32a47b7

Please sign in to comment.