Skip to content

Commit

Permalink
Merge remote-tracking branch 'remotes/lable/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
maikebing committed Nov 30, 2021
2 parents 1570a2c + a4a655c commit d81844e
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 38 deletions.
1 change: 1 addition & 0 deletions IoTSharp/Extensions/DataExtension.cs
Expand Up @@ -66,6 +66,7 @@ internal static Dic PreparingData<L>(this ApplicationDbContext _context, Diction
{
var tx = tl.First();
tx.FillKVToMe(kp);
// TODO:jy 待重新设计主键
tx.DateTime = DateTime.Now;
_context.Set<L>().Update(tx).State = EntityState.Modified;
}
Expand Down
42 changes: 31 additions & 11 deletions IoTSharp/Storage/EFStorage.cs
Expand Up @@ -9,6 +9,8 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Dic = System.Collections.Generic.Dictionary<string, System.Exception>;


namespace IoTSharp.Storage
{
Expand All @@ -17,8 +19,8 @@ public class EFStorage : IStorage
private readonly AppSettings _appSettings;
private readonly ILogger _logger;
private readonly IServiceScopeFactory _scopeFactor;
private readonly IServiceScope scope;
private readonly ApplicationDbContext _context;
//private readonly IServiceScope scope;
//private readonly ApplicationDbContext _context;

public EFStorage(ILogger<EFStorage> logger, IServiceScopeFactory scopeFactor
, IOptions<AppSettings> options
Expand All @@ -27,21 +29,27 @@ public class EFStorage : IStorage
_appSettings = options.Value;
_logger = logger;
_scopeFactor = scopeFactor;
scope = scopeFactor.CreateScope();
_context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();
//scope = scopeFactor.CreateScope();
//_context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();
}

public Task<List<TelemetryDataDto>> GetTelemetryLatest(Guid deviceId)
{
var devid = from t in _context.TelemetryLatest
using var scope = _scopeFactor.CreateScope();
using var context = scope.ServiceProvider.GetService<ApplicationDbContext>();

var devid = from t in context.TelemetryLatest
where t.DeviceId == deviceId
select new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() };
return devid.AsNoTracking().ToListAsync();
}

public Task<List<TelemetryDataDto>> GetTelemetryLatest(Guid deviceId, string keys)
{
var devid = from t in _context.TelemetryLatest
using var scope = _scopeFactor.CreateScope();
using var context = scope.ServiceProvider.GetService<ApplicationDbContext>();

var devid = from t in context.TelemetryLatest
where t.DeviceId == deviceId && keys.Split(',', ' ', ';').Contains(t.KeyName)

select new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() };
Expand All @@ -51,31 +59,43 @@ public Task<List<TelemetryDataDto>> GetTelemetryLatest(Guid deviceId, string key

public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, string keys, DateTime begin)
{
var kv = from t in _context.TelemetryData
using var scope = _scopeFactor.CreateScope();
using var context = scope.ServiceProvider.GetService<ApplicationDbContext>();

var kv = from t in context.TelemetryData
where t.DeviceId == deviceId && keys.Split(',', ' ', ';').Contains(t.KeyName) && t.DateTime >= begin
select new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() };
return kv.AsNoTracking().ToListAsync();
}

public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, string keys, DateTime begin, DateTime end)
{
var kv = from t in _context.TelemetryData
using var scope = _scopeFactor.CreateScope();
using var context = scope.ServiceProvider.GetService<ApplicationDbContext>();

var kv = from t in context.TelemetryData
where t.DeviceId == deviceId && keys.Split(',', ' ', ';').Contains(t.KeyName) && t.DateTime >= begin && t.DateTime < end
select new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() };
return kv.AsNoTracking().ToListAsync();
}

public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, DateTime begin)
{
var kv = from t in _context.TelemetryData
using var scope = _scopeFactor.CreateScope();
using var context = scope.ServiceProvider.GetService<ApplicationDbContext>();

var kv = from t in context.TelemetryData
where t.DeviceId == deviceId && t.DateTime >= begin
select new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() };
return kv.AsNoTracking().ToListAsync();
}

public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, DateTime begin, DateTime end)
{
var kv = from t in _context.TelemetryData
using var scope = _scopeFactor.CreateScope();
using var context = scope.ServiceProvider.GetService<ApplicationDbContext>();

var kv = from t in context.TelemetryData
where t.DeviceId == deviceId && t.DateTime >= begin && t.DateTime < end
select new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() };
return kv.AsNoTracking().ToListAsync();
Expand All @@ -87,7 +107,7 @@ public virtual async Task<(bool result, List<TelemetryData> telemetries)> Store
List<TelemetryData> telemetries = new List<TelemetryData>();
try
{
using (var _scope = _scopeFactor.CreateScope())
using (var scope = _scopeFactor.CreateScope())
{
using (var _dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
Expand Down
36 changes: 20 additions & 16 deletions IoTSharp/Storage/ShardingStorage.cs
Expand Up @@ -22,7 +22,7 @@ public class ShardingStorage : IStorage
{
private readonly AppSettings _appSettings;
private readonly ILogger _logger;
private readonly IServiceScope scope;
//private readonly IServiceScope scope;
private readonly IServiceScopeFactory _scopeFactor;


Expand All @@ -32,19 +32,19 @@ public class ShardingStorage : IStorage
{
_appSettings = options.Value;
_logger = logger;
scope = scopeFactor.CreateScope();
//scope = scopeFactor.CreateScope();
_scopeFactor = scopeFactor;
}

public Task<List<TelemetryDataDto>> GetTelemetryLatest(Guid deviceId)
{
try
{
using (var _scope = _scopeFactor.CreateScope())
using (var scope = _scopeFactor.CreateScope())
{
using (var _context = _scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
using (var context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var devid = from t in _context.TelemetryLatest
var devid = from t in context.TelemetryLatest
where t.DeviceId == deviceId
select new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() };

Expand All @@ -63,11 +63,11 @@ public Task<List<TelemetryDataDto>> GetTelemetryLatest(Guid deviceId, string key
{
try
{
using (var _scope = _scopeFactor.CreateScope())
using (var scope = _scopeFactor.CreateScope())
{
using (var _context = _scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
using (var context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var devid = from t in _context.TelemetryLatest
var devid = from t in context.TelemetryLatest
where t.DeviceId == deviceId && keys.Split(',', ' ', ';').Contains(t.KeyName)

select new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() };
Expand All @@ -94,12 +94,12 @@ public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, string key
{
try
{
using (var _scope = _scopeFactor.CreateScope())
using (var scope = _scopeFactor.CreateScope())
{
using (var _context = _scope.ServiceProvider.GetService<IShardingDbAccessor>())
using (var context = scope.ServiceProvider.GetService<IShardingDbAccessor>())
{
var lst = new List<TelemetryDataDto>();
var kv = _context.GetIShardingQueryable<TelemetryData>()
var kv = context.GetIShardingQueryable<TelemetryData>()
.Where(t => t.DeviceId == deviceId && keys.Split(',', ' ', ';').Contains(t.KeyName) && t.DateTime >= begin && t.DateTime < end)
.ToList().Select(t => new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() });
return kv.ToList();
Expand All @@ -123,10 +123,12 @@ public Task<List<TelemetryDataDto>> LoadTelemetryAsync(Guid deviceId, DateTime b
{
return Task.Run(() =>
{
using (var _context = scope.ServiceProvider.GetService<IShardingDbAccessor>())
using var scope = _scopeFactor.CreateScope();
using (var context = scope.ServiceProvider.GetService<IShardingDbAccessor>())
{
var lst = new List<TelemetryDataDto>();
var kv = _context.GetIShardingQueryable<TelemetryData>()
var kv = context.GetIShardingQueryable<TelemetryData>()
.Where(t => t.DeviceId == deviceId && t.DateTime >= begin && t.DateTime < end)
.ToList().Select(t => new TelemetryDataDto() { DateTime = t.DateTime, KeyName = t.KeyName, Value = t.ToObject() });
return kv.ToList();
Expand All @@ -141,6 +143,8 @@ public async Task<(bool result, List<TelemetryData> telemetries)> StoreTelemetry

try
{
using var scope = _scopeFactor.CreateScope();

using (var db = scope.ServiceProvider.GetService<IShardingDbAccessor>())
{
var lst = new List<TelemetryData>();
Expand All @@ -166,11 +170,11 @@ public async Task<(bool result, List<TelemetryData> telemetries)> StoreTelemetry

try
{
using (var _scope = _scopeFactor.CreateScope())
using (var scope = _scopeFactor.CreateScope())
{
using (var _dbContext = _scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
using (var dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>())
{
var result1 = await _dbContext.SaveAsync<TelemetryLatest>(msg.MsgBody, msg.DeviceId, msg.DataSide);
var result1 = await dbContext.SaveAsync<TelemetryLatest>(msg.MsgBody, msg.DeviceId, msg.DataSide);
result1.exceptions?.ToList().ForEach(ex =>
{
_logger.LogError(ex.Value, $"{ex.Key} {ex.Value.Message} {ex.Value.InnerException?.Message}");
Expand Down
31 changes: 20 additions & 11 deletions IoTSharp/Storage/TimescaleDBStorage.cs
Expand Up @@ -10,36 +10,45 @@ namespace IoTSharp.Storage
{
public class TimescaleDBStorage : EFStorage
{
private readonly ApplicationDbContext _context;
//private readonly ApplicationDbContext _context;

/// <summary>
/// 解决单例注入问题 jy
/// </summary>
readonly IServiceScopeFactory _scopeFactor;

public TimescaleDBStorage(ILogger<TimescaleDBStorage> logger, IServiceScopeFactory scopeFactor
, IOptions<AppSettings> options
) : base(logger, scopeFactor, options)
{
var scope = scopeFactor.CreateScope();
_context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();
//var scope = scopeFactor.CreateScope();
//_context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();
_scopeFactor = scopeFactor;
}

private bool _needcrtate = false;
private volatile bool _needcrtate = false;

public override async Task<(bool result, List<TelemetryData> telemetries)> StoreTelemetryAsync(RawMsg msg)
public override Task<(bool result, List<TelemetryData> telemetries)> StoreTelemetryAsync(RawMsg msg)
{
if (!_needcrtate)
{
var have = _context.Database.ExecuteScalar<long>("SELECT count(0) FROM _timescaledb_catalog.hypertable where table_name='TelemetryData';");
//解决单例注入问题 jy
using var scope = _scopeFactor.CreateScope();
using var context = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();
var have = context.Database.ExecuteScalar<long>("SELECT count(0) FROM _timescaledb_catalog.hypertable where table_name='TelemetryData';");
if (have == 0)
{
_context.Database.ExecuteNonQuery("SELECT create_hypertable('\"TelemetryData\"', 'DateTime', 'DeviceId', 2, create_default_indexes=>FALSE);");
_context.Database.ExecuteNonQuery("CREATE INDEX ON \"TelemetryData\" (\"KeyName\", \"DateTime\" DESC);");
_context.Database.ExecuteNonQuery("CREATE INDEX ON \"TelemetryData\" (\"DataSide\", \"DateTime\" DESC);");
_context.Database.ExecuteNonQuery("CREATE INDEX ON \"TelemetryData\" (\"Type\", \"DateTime\" DESC);");
context.Database.ExecuteNonQuery("SELECT create_hypertable('\"TelemetryData\"', 'DateTime', 'DeviceId', 2, create_default_indexes=>FALSE);");
context.Database.ExecuteNonQuery("CREATE INDEX ON \"TelemetryData\" (\"KeyName\", \"DateTime\" DESC);");
context.Database.ExecuteNonQuery("CREATE INDEX ON \"TelemetryData\" (\"DataSide\", \"DateTime\" DESC);");
context.Database.ExecuteNonQuery("CREATE INDEX ON \"TelemetryData\" (\"Type\", \"DateTime\" DESC);");
}
else
{
_needcrtate = true;
}
}
return await base.StoreTelemetryAsync(msg);
return base.StoreTelemetryAsync(msg);
}
}
}

0 comments on commit d81844e

Please sign in to comment.