Skip to content

Commit

Permalink
AsyncFixer
Browse files Browse the repository at this point in the history
  • Loading branch information
sarahelsaig committed Dec 4, 2020
1 parent c5cc830 commit b220e3a
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 49 deletions.
@@ -1,4 +1,4 @@
using AdvancedDLSupport;
using AdvancedDLSupport;
using Hast.Transformer.Abstractions.SimpleMemory;
using Microsoft.Extensions.Logging;
using System;
Expand Down Expand Up @@ -367,8 +367,12 @@ public void SetSoftRegister(uint address, ulong value)
_currentSlot = (_currentSlot + 1) % BufferCount;

currentSlot = _currentSlot;

// We are inside a lock.
#pragma warning disable AsyncFixer02 // Long-running or blocking operations inside an async method
_slotDispatch[currentSlot] = job = _slotDispatch[currentSlot]
.ContinueWith(_ => RunJob(currentSlot, data, ignoreResponse).Result);
.ThenAsync(() => RunJob(currentSlot, data, ignoreResponse).Result);
#pragma warning restore AsyncFixer02 // Long-running or blocking operations inside an async method
}

var jobResult = await job;
Expand Down
Expand Up @@ -80,7 +80,7 @@ public abstract class OpenClCommunicationService : CommunicationServiceBase
.Frequency;
}

var kernelBinary = File.ReadAllBytes(implementation.BinaryPath);
var kernelBinary = await File.ReadAllBytesAsync(implementation.BinaryPath);
_binaryOpenCl.CreateBinaryKernel(kernelBinary, KernelName);

_devicePoolPopulator.PopulateDevicePoolIfNew(() =>
Expand Down
11 changes: 7 additions & 4 deletions Hast.Communication/Helpers/EthernetCommunicationHelpers.cs
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
Expand Down Expand Up @@ -33,9 +33,9 @@ public static async Task<UdpReceiveResult> UdpSendAndReceiveAsync(byte[] datagra
/// <param name="targetEndpoint">Endpoint where the datagram needs to be sent. Possibly it is a broadcast address.</param>
/// <param name="receiveTimeoutMilliseconds">Timout within the answer datagram needs to arrive.</param>
/// <returns>Result objects containing UDP datagram received from the remote host. It is empty if nothing has arrived.</returns>
public static async Task<IEnumerable<UdpReceiveResult>> UdpSendAndReceiveAllAsync(byte[] datagram, IPEndPoint bindingEndpoint, IPEndPoint targetEndpoint, int receiveTimeoutMilliseconds)
public static Task<IEnumerable<UdpReceiveResult>> UdpSendAndReceiveAllAsync(byte[] datagram, IPEndPoint bindingEndpoint, IPEndPoint targetEndpoint, int receiveTimeoutMilliseconds)
{
return await UdpSendAndReceiveAnyAsync(client => client.ReceiveAllAsync(receiveTimeoutMilliseconds),
return UdpSendAndReceiveAnyAsync(client => client.ReceiveAllAsync(receiveTimeoutMilliseconds),
datagram, bindingEndpoint, targetEndpoint, receiveTimeoutMilliseconds);
}

Expand All @@ -47,12 +47,15 @@ private static async Task<T> UdpSendAndReceiveAnyAsync<T>(Func<UdpClient, Task<T
using (var receiverClient = CreateUdpClient(bindingEndpoint))
using (var senderClient = CreateUdpClient(bindingEndpoint))
{
// It's handled by the Task.WhenAll below.
#pragma warning disable AsyncFixer04 // Fire & forget async call inside a using block
var receiveTask = receiverTaskFactory(receiverClient);
var sendTask = senderClient.SendAsync(datagram, datagram.Length, targetEndpoint);
#pragma warning restore AsyncFixer04 // Fire & forget async call inside a using block

await Task.WhenAll(receiveTask, sendTask);

return receiveTask.Result;
return await receiveTask;
}
}

Expand Down
6 changes: 3 additions & 3 deletions Hast.Communication/Services/EthernetCommunicationService.cs
@@ -1,4 +1,4 @@
using Hast.Communication.Constants;
using Hast.Communication.Constants;
using Hast.Communication.Constants.CommunicationConstants;
using Hast.Communication.Exceptions;
using Hast.Communication.Models;
Expand Down Expand Up @@ -89,7 +89,7 @@ public override string ChannelName
{
// We send an execution signal to make the FPGA ready to receive the data stream.
var executionCommandTypeByte = new byte[] { (byte)CommandTypes.Execution };
stream.Write(executionCommandTypeByte, 0, executionCommandTypeByte.Length);
await stream.WriteAsync(executionCommandTypeByte, 0, executionCommandTypeByte.Length);

var executionCommandTypeResponseByte = await GetBytesFromStream(stream, 1);

Expand All @@ -110,7 +110,7 @@ public override string ChannelName

// Sending data to the FPGA board.
var segment = memory.GetUnderlyingArray();
stream.Write(segment.Array, segment.Offset, memory.Length);
await stream.WriteAsync(segment.Array, segment.Offset, memory.Length);


// Read the first batch of the TcpServer response bytes that will represent the execution time.
Expand Down
65 changes: 36 additions & 29 deletions Hast.Communication/Services/SerialPortCommunicationService.cs
Expand Up @@ -254,35 +254,12 @@ private async Task<IEnumerable<string>> GetFpgaPortNames(IHardwareExecutionConte

for (int i = 0; i < ports.Length; i++)
{
serialPortPingingTasks[i] = Task.Factory.StartNew(portNameObject =>
{
using (var serialPort = CreateSerialPort(executionContext))
{
var taskCompletionSource = new TaskCompletionSource<bool>();
serialPort.DataReceived += (sender, e) =>
{
if (serialPort.ReadByte() == Serial.Signals.Ready)
{
fpgaPortNames.Add(serialPort.PortName);
taskCompletionSource.SetResult(true);
}
};
serialPort.PortName = (string)portNameObject;
try
{
serialPort.Open();
serialPort.Write(CommandTypes.WhoIsAvailable);
}
catch (IOException) { }
catch (UnauthorizedAccessException) { } // This happens if the port is used by another app.
// Waiting a maximum of 3s for a response from the port.
taskCompletionSource.Task.Wait(3000);
}
}, ports[i]);
serialPortPingingTasks[i] = Task.Factory.StartNew(
portNameObject => RunPort(portNameObject, executionContext, fpgaPortNames),
ports[i],
default,
TaskCreationOptions.None,
TaskScheduler.Default);
}

await Task.WhenAll(serialPortPingingTasks);
Expand Down Expand Up @@ -310,5 +287,35 @@ private SerialPort CreateSerialPort(IHardwareExecutionContext executionContext)

return serialPort;
}

private void RunPort(object portNameObject, IHardwareExecutionContext executionContext, ConcurrentBag<string> fpgaPortNames)
{
using (var serialPort = CreateSerialPort(executionContext))
{
var taskCompletionSource = new TaskCompletionSource<bool>();

serialPort.DataReceived += (sender, e) =>
{
if (serialPort.ReadByte() == Serial.Signals.Ready)
{
fpgaPortNames.Add(serialPort.PortName);
taskCompletionSource.SetResult(true);
}
};

serialPort.PortName = (string)portNameObject;

try
{
serialPort.Open();
serialPort.Write(CommandTypes.WhoIsAvailable);
}
catch (IOException) { }
catch (UnauthorizedAccessException) { } // This happens if the port is used by another app.

// Waiting a maximum of 3s for a response from the port.
taskCompletionSource.Task.Wait(3000);
}
}
}
}
17 changes: 10 additions & 7 deletions Hast.Remote.Client/RemoteTransformer.cs
@@ -1,4 +1,4 @@
using Hast.Common.Models;
using Hast.Common.Models;
using Hast.Layer;
using Hast.Remote.Bridge.Models;
using Hast.Transformer.Abstractions;
Expand All @@ -21,12 +21,7 @@ public async Task<IHardwareDescription> Transform(IList<string> assemblyPaths, I
{
var apiClient = ApiClientFactory.CreateApiClient(configuration.RemoteClientConfiguration());

var assemblyContainers = assemblyPaths
.Select(path => new AssemblyContainer
{
Name = Path.GetFileNameWithoutExtension(path),
FileContent = File.ReadAllBytes(path)
});
var assemblyContainers = assemblyPaths.Select(GetAssemblyContainers);

var apiConfiguration = new HardwareGenerationConfiguration
{
Expand Down Expand Up @@ -127,5 +122,13 @@ public async Task<IHardwareDescription> Transform(IList<string> assemblyPaths, I
throw new RemoteTransformationException("Remote transformation failed because Hastlayer Remote Services returned an unexpected response. This might be because authorization failed (check if you mistyped your credentials) or because there is some issue with the service. If this error persists please get in touch with us under https://hastlayer.com/contact.", ex);
}
}

private AssemblyContainer GetAssemblyContainers(string path) =>
new AssemblyContainer
{
Name = Path.GetFileNameWithoutExtension(path),
FileContent = File.ReadAllBytes(path),
};

}
}
Expand Up @@ -24,7 +24,7 @@ public async Task Run(IHastlayer hastlayer, IHardwareRepresentation hardwareRepr
foreach (var number in new uint[] { 15, 13, 20 })
{
Console.WriteLine("primeCalculator.IsPrimeNumber: {0}", number);
var result = primeCalculator.IsPrimeNumber(
var result = primeCalculator.IsPrimeNumberSync(
number,
hastlayer,
hardwareRepresentation.HardwareGenerationConfiguration);
Expand Down
2 changes: 1 addition & 1 deletion Samples/Hast.Samples.SampleAssembly/PrimeCalculator.cs
Expand Up @@ -174,7 +174,7 @@ private bool IsPrimeNumberInternal(uint number)
// hardware entry point members, nor are they used by any other transformed member). Thus you can do anything
// in them that is not Hastlayer-compatible.

public bool IsPrimeNumber(uint number, IHastlayer hastlayer = null, IHardwareGenerationConfiguration configuration = null)
public bool IsPrimeNumberSync(uint number, IHastlayer hastlayer = null, IHardwareGenerationConfiguration configuration = null)
{
return RunIsPrimeNumber(number, memory => Task.Run(() => IsPrimeNumber(memory)), hastlayer, configuration).Result;
}
Expand Down

0 comments on commit b220e3a

Please sign in to comment.