In [0]:
#!import "../DataModel/DataStructure"

In [0]:
using System.IO;
using System.Text;
using Systemorph.Vertex.DataSetReader.Csv;
using Systemorph.Vertex.Session;
using Systemorph.Vertex.Import.Builders;
using Systemorph.Vertex.Import.Mappings;
using Systemorph.Vertex.DataSetReader;
using Systemorph.Vertex.Export.Excel.Builders;
using Systemorph.Vertex.Export;
using Systemorph.Vertex.FileStorage;

public record StreamWrapper(Stream Stream, bool WillBeReused);

public static string ProcessNotification(this object obj) => obj is ActivityMessageNotification amn ? amn.Message : ""; 

In [0]:
public record ImportExportActivityActivity : KeyedRecord
{
    public string Username {get; init;}

    public DateTime StartDateTime {get; init;}

    public DateTime EndDateTime {get; init;}

    public ActivityLogStatus Status {get; init;}

    public string Category {get; init;}

    [Conversion(typeof(JsonConverter<string[]>))]
    public string[] ErrorMessages {get; init;}

    [Conversion(typeof(JsonConverter<string[]>))]
    public string[] WarningMessages {get; init;}

    [Conversion(typeof(JsonConverter<string[]>))]
    public string[] InfoMessages {get; init;}

    public Guid SourceId {get; init;} // Check if converting to Guid causes issues. If not, leave Guid.

    public ImportExportActivity(ActivityLog log, ISessionVariable session)
    {
        Id = Guid.NewGuid();
        Username = session.User.Name;
        StartDateTime = log.StartDateTime;
        EndDateTime = log.FinishDateTime;
        Status = log.Status;
        ErrorMessages = log.Errors.Select(x => x.ProcessNotification()).Distinct().ToArray();
        WarningMessages = log.Warnings.Select(x => x.ProcessNotification()).Distinct().ToArray();
        InfoMessages = log.Infos.Select(x => x.ProcessNotification()).Distinct().ToArray();
    }

    public ImportExportActivity(Guid id)
    {
        Id = id;
    }

}

In [0]:
public abstract record ImportWithKey : KeyedRecord
{
    public DateTime CreationTime {get; init;}
    public byte[] SerializedContent {get; init;}
    public uint? Length {get; init;}
    public string Format {get; init;}
    protected ImportOptions Options {get; set;}
    protected IDataSetImportVariable DataSetReader{get; set;}

    public async Task<ImportWithKey> InitializeImportDataAsync(ISessionVariable session)
    {
        var stream = await GenerateStreamWrapperAsync(session);
        var formatAndContent = await GetInformationFromStreamAsync(stream, session);
        return this with{CreationTime = DateTime.UtcNow, 
                            Format = formatAndContent.Format ?? Options.Format,
                            SerializedContent = formatAndContent.Content,
                            Length = SerializedContent != null ? (uint)SerializedContent.Length : null,
                        };
    }

    private async Task<StreamWrapper> GenerateStreamWrapperAsync(ISessionVariable session)
    {
        StreamWrapper stream = Options switch
        {
            FileImportOptions fio => new StreamWrapper(await fio.Storage.ReadAsync(fio.FileName, session.CancellationToken), true),
            StreamImportOptions streamImportOptions => new StreamWrapper(streamImportOptions.Stream, false),
            StringImportOptions stringImportOptions => new StreamWrapper(new MemoryStream(Encoding.ASCII.GetBytes(stringImportOptions.Content)), true),
            DataSetImportOptions dataSetImportOptions => new StreamWrapper(new MemoryStream(Encoding.ASCII.GetBytes(DataSetCsvSerializer.Serialize(dataSetImportOptions.DataSet))), true),
            _ => null
        };
        return stream;
    }

    private async Task<(string Format, byte[] Content)> GetInformationFromStreamAsync(StreamWrapper stream, ISessionVariable session)
    {
        byte[] content;
        string format;
        using (MemoryStream ms = new MemoryStream())
        {
            await stream.Stream.CopyToAsync(ms);
            content = ms.ToArray();
            ms.Position = 0;
            var dsRes = await DataSetReader.ReadFromStream(ms).ExecuteAsync();
            format = dsRes.Format;
            if (stream.WillBeReused)
                stream.Stream.Position = 0;
            else
            {
                stream.Stream.Close();
                await stream.Stream.DisposeAsync();
            }
        }
        return (format, content);
    }
}

In [0]:
public record KeyedFile : ImportWithKey
{
    public string Name {get; init;}

    public string Directory {get; init;}
    
    public string ContentType {get; init;}

    [Conversion(typeof(JsonConverter<string[]>))]
    public string[] Partition {get; init;}

    public string Source {get; init;}

    public KeyedFile(FileImportOptions options, IDataSetImportVariable importVariable)
    {
        Options = options;
        DataSetReader = importVariable;
        string fileName = options.FileName; 
        Id = Guid.NewGuid();
        Name = Path.GetFileName(fileName);
        Directory = Path.GetDirectoryName(fileName); 
        ContentType = Path.GetExtension(fileName);
        Source = options.Storage.GetType().Name; 
        Partition =  GetInvolvedPartitions(options);
        // Andrey Katz: Options.TargetDataSource.Partion.GetCurrent(?? What do we put here, different classes might posess various partitions, e.g. Yield Curve has none ??)
    }

    public KeyedFile(Guid id)
    {
        Id = id;
        Options = null;
        DataSetReader = null;
    }
    

    private string[] GetInvolvedPartitions(ImportOptions options)
    {
        // TODO
        //Andrey Katz: Get all the relevant partitions here 
        return null;
    }
}

In [0]:
public record KeyedStringImport : ImportWithKey
{
    public string Content {get; init;}

    public KeyedStringImport(StringImportOptions options, IDataSetImportVariable importVariable)
    {
        Options = options;
        DataSetReader = importVariable;
        Id = Guid.NewGuid();
        Content = options.Content;
    }

    public KeyedStringImport(Guid id)
    {
        Id = id;
        Options = null;
        DataSetReader = null;
    }
    
}

In [0]:
public record KeyedDataSetImport : ImportWithKey
{
    public KeyedDataSetImport(DataSetImportOptions options, IDataSetImportVariable importVariable)
    {
        DataSetReader = importVariable;
        Options = options;
        Id = Guid.NewGuid();
    }

    public KeyedDataSetImport(Guid id)
    {
        Id = id;
        Options = null;
        DataSetReader = null;
    }
    
}

In [0]:
public record KeyedStreamImport : ImportWithKey
{
    public KeyedStreamImport(StreamImportOptions options, IDataSetImportVariable importVariable)
    {
        DataSetReader = importVariable;
        Options = options;
        Id = Guid.NewGuid();
    }

    public KeyedStreamImport(Guid id)
    {
        Id = id;
        Options = null;
        DataSetReader = null;
    }
    
}

In [0]:
public static async Task<ActivityLog> ExecuteWithStoreActivityAsync(this ImportOptionsBuilder builder, 
                                                                                ISessionVariable session, 
                                                                                IDataSource dataSource, 
                                                                                IDataSetImportVariable importVariable)
{
    var log = await builder.ExecuteAsync();
    var options = builder.GetImportOptions();
    var activity = new KeyedActivity(log, session);
    bool importSucceeded = true;
    switch(options)
    {
        case FileImportOptions fio:
            var keyedFile = new KeyedFile(Guid.NewGuid());
            try
            {
                keyedFile = await (new KeyedFile(fio, importVariable)).InitializeImportDataAsync(session) as KeyedFile;
            }
            catch (Exception)
            {
                importSucceeded = false;
            }
            activity = activity with {SourceId = keyedFile.Id, 
                                        Category = "Import from File"};
            if (importSucceeded) await dataSource.UpdateAsync<KeyedFile>(keyedFile.RepeatOnce());
            break;
        case StringImportOptions sgio:
            var keyedStringImport = new KeyedStringImport(Guid.NewGuid());
            try
            {  
                keyedStringImport = await (new KeyedStringImport(sgio, importVariable)).InitializeImportDataAsync(session) as KeyedStringImport;
            }
            catch (Exception)
            {
                importSucceeded = false;
            }
            activity = activity with {SourceId = keyedStringImport.Id, 
                                    Category = "Import from String"};
            if (importSucceeded) await dataSource.UpdateAsync<KeyedStringImport>(keyedStringImport.RepeatOnce());
            break;
        case StreamImportOptions smio:
            var keyedStreamImport = new KeyedStreamImport(Guid.NewGuid());
            try
            {
                keyedStreamImport = await (new KeyedStreamImport(smio, importVariable)).InitializeImportDataAsync(session) as KeyedStreamImport;
            }
            catch (Exception)
            {
                importSucceeded = false;
            }
            activity = activity with {SourceId = keyedStreamImport.Id, 
                                    Category = "Import from Stream"};
            if (importSucceeded) await dataSource.UpdateAsync<KeyedStreamImport>(keyedStreamImport.RepeatOnce());
            break;
        case DataSetImportOptions dsio:
            var keyedDataSetImport = new KeyedDataSetImport(Guid.NewGuid());
            try
            {
                keyedDataSetImport = await (new KeyedDataSetImport(dsio, importVariable)).InitializeImportDataAsync(session) as KeyedDataSetImport;
            }
            catch(Exception)
            {
                importSucceeded = false;
            }
            activity = activity with {SourceId = keyedDataSetImport.Id, 
                                    Category = "Import from Data Set"};
            if (importSucceeded) await dataSource.UpdateAsync<KeyedDataSetImport>(keyedDataSetImport.RepeatOnce());
            break;
        default:
            throw new Exception("Import Options object is not an instance of an appropriate class.");
            break;
    }                              
    await dataSource.UpdateAsync<KeyedActivity>(activity.RepeatOnce());
    await dataSource.CommitAsync(); 
    return log;
}